1use crc32fast::Hasher;
2use serde::{Deserialize, Deserializer};
3use serde_json::Value;
4use std::collections::BTreeMap;
5
6#[derive(Debug, Clone, Deserialize)]
7#[serde(untagged)]
8pub enum KrakenEvent {
9 Heartbeat(Heartbeat),
10 SystemStatus(SystemStatus),
11 SubscriptionStatus(SubscriptionStatus),
12 Data(Vec<Value>), }
14
15#[derive(Debug, Clone, Deserialize)]
16pub struct Heartbeat {
17 pub event: String, }
19
20#[derive(Debug, Clone, Deserialize)]
21pub struct SystemStatus {
22 pub event: String, pub connection_id: Option<u64>,
24 pub status: String, pub version: String,
26}
27
28#[derive(Debug, Clone, Deserialize)]
29pub struct SubscriptionStatus {
30 pub event: String, pub status: Option<String>, pub pair: Option<String>,
33 pub channel_name: Option<String>,
34 pub subscription: Option<SubscriptionInfo>,
35 pub error_message: Option<String>,
36}
37
38#[derive(Debug, Clone, Deserialize)]
39pub struct SubscriptionInfo {
40 pub name: String,
41}
42
43#[derive(Debug, Clone)]
46pub struct TradeData {
47 pub channel_id: u64,
48 pub data: Vec<Trade>,
49 pub channel_name: String,
50 pub pair: String,
51}
52
53#[derive(Debug, Clone)]
54pub struct Trade {
55 pub price: String,
56 pub volume: String,
57 pub time: String,
58 pub side: String, pub order_type: String, pub misc: String,
61}
62
63#[derive(Debug, Clone)]
64pub struct OrderBookData {
65 pub channel_id: u64,
66 pub asks: Vec<OrderBookEntry>,
67 pub bids: Vec<OrderBookEntry>,
68 pub is_snapshot: bool,
69 pub channel_name: String,
70 pub pair: String,
71 pub checksum: Option<String>,
72}
73
74#[derive(Debug, Clone)]
75pub struct OrderBookEntry {
76 pub price: String,
77 pub volume: String,
78 pub timestamp: String,
79}
80
81impl<'de> Deserialize<'de> for OrderBookEntry {
83 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
84 where
85 D: Deserializer<'de>,
86 {
87 let v: Vec<String> = Deserialize::deserialize(deserializer)?;
88 Ok(OrderBookEntry {
89 price: v.first().cloned().unwrap_or_default(),
90 volume: v.get(1).cloned().unwrap_or_default(),
91 timestamp: v.get(2).cloned().unwrap_or_default(),
92 })
93 }
94}
95
96#[derive(Debug, Clone, Deserialize)]
97pub struct OrderBookRegion {
98 #[serde(default)]
99 pub a: Vec<OrderBookEntry>, #[serde(default)]
101 pub b: Vec<OrderBookEntry>, #[serde(default)]
103 pub as_: Vec<OrderBookEntry>, #[serde(default)]
105 pub bs: Vec<OrderBookEntry>, }
107
108impl KrakenEvent {
109 pub fn try_into_trade_data(self) -> Option<TradeData> {
110 if let KrakenEvent::Data(mut vec) = self {
111 if vec.len() >= 4 && vec[2].as_str() == Some("trade") {
114 let pair = vec.pop()?.as_str()?.to_string();
115 let _channel_name = vec.pop()?.as_str()?.to_string();
116 let trades_value = vec.pop()?;
117 let channel_id = vec.pop()?.as_u64()?;
118
119 let trades: Vec<Trade> = serde_json::from_value(trades_value).ok()?;
120
121 return Some(TradeData {
122 channel_id,
123 data: trades,
124 channel_name: "trade".to_string(),
125 pair,
126 });
127 }
128 }
129 None
130 }
131
132 pub fn try_into_orderbook_data(self) -> Option<OrderBookData> {
133 if let KrakenEvent::Data(mut vec) = self {
134 let pair = vec.pop()?.as_str()?.to_string();
142 let channel_name = vec.pop()?.as_str()?.to_string();
143
144 if !channel_name.starts_with("book") {
145 return None;
146 }
147
148 let channel_id = vec.remove(0).as_u64()?;
149
150 let mut asks = Vec::new();
152 let mut bids = Vec::new();
153 let mut is_snapshot = false;
154 let mut checksum: Option<String> = None;
155
156 for value in vec {
157 if let Ok(obj) =
163 serde_json::from_value::<serde_json::Map<String, Value>>(value.clone())
164 {
165 if let Some(a_val) = obj.get("a") {
166 if let Ok(mut list) =
167 serde_json::from_value::<Vec<OrderBookEntry>>(a_val.clone())
168 {
169 asks.append(&mut list);
170 }
171 }
172 if let Some(b_val) = obj.get("b") {
173 if let Ok(mut list) =
174 serde_json::from_value::<Vec<OrderBookEntry>>(b_val.clone())
175 {
176 bids.append(&mut list);
177 }
178 }
179 if let Some(c_val) = obj.get("c") {
180 if let Some(s) = c_val.as_str() {
182 checksum = Some(s.to_string());
183 }
184 }
185 if let Some(as_val) = obj.get("as") {
186 is_snapshot = true;
187 if let Ok(mut list) =
188 serde_json::from_value::<Vec<OrderBookEntry>>(as_val.clone())
189 {
190 asks.append(&mut list);
191 }
192 }
193 if let Some(bs_val) = obj.get("bs") {
194 is_snapshot = true;
195 if let Ok(mut list) =
196 serde_json::from_value::<Vec<OrderBookEntry>>(bs_val.clone())
197 {
198 bids.append(&mut list);
199 }
200 }
201 }
202 }
203
204 return Some(OrderBookData {
205 channel_id,
206 asks,
207 bids,
208 is_snapshot,
209 channel_name,
210 pair,
211 checksum,
212 });
213 }
214 None
215 }
216}
217
218#[derive(Debug, Default)]
219pub struct LocalOrderBook {
220 pub asks: BTreeMap<String, String>, pub bids: BTreeMap<String, String>,
232}
233
234impl LocalOrderBook {
235 pub fn new() -> Self {
236 Self::default()
237 }
238
239 pub fn update(&mut self, data: &OrderBookData) {
240 if data.is_snapshot {
241 self.asks.clear();
242 self.bids.clear();
243 }
244
245 for entry in &data.asks {
246 let price = &entry.price;
247 let volume = &entry.volume;
248 if volume == "0.00000000" || volume == "0.0" || volume == "0" {
249 self.asks.remove(price);
250 } else {
251 self.asks.insert(price.clone(), volume.clone());
252 }
253 }
254
255 for entry in &data.bids {
256 let price = &entry.price;
257 let volume = &entry.volume;
258 if volume == "0.00000000" || volume == "0.0" || volume == "0" {
259 self.bids.remove(price);
260 } else {
261 self.bids.insert(price.clone(), volume.clone());
262 }
263 }
264 }
265
266 pub fn calculate_checksum(&self) -> u32 {
272 let mut hasher = Hasher::new();
273
274 let mut asks: Vec<(&String, &String)> = self.asks.iter().collect();
281 asks.sort_by(|a, b| {
282 let p1 = a.0.parse::<f64>().unwrap_or(0.0);
283 let p2 = b.0.parse::<f64>().unwrap_or(0.0);
284 p1.partial_cmp(&p2).unwrap()
285 });
286
287 let mut bids: Vec<(&String, &String)> = self.bids.iter().collect();
288 bids.sort_by(|a, b| {
289 let p1 = a.0.parse::<f64>().unwrap_or(0.0);
290 let p2 = b.0.parse::<f64>().unwrap_or(0.0);
291 p2.partial_cmp(&p1).unwrap() });
293
294 for (price, volume) in asks.iter().take(10) {
295 let p = price.replace(".", "");
296 let p = p.trim_start_matches('0');
297 let v = volume.replace(".", "");
298 let v = v.trim_start_matches('0');
299 hasher.update(p.as_bytes());
300 hasher.update(v.as_bytes());
301 }
302
303 for (price, volume) in bids.iter().take(10) {
304 let p = price.replace(".", "");
305 let p = p.trim_start_matches('0');
306 let v = volume.replace(".", "");
307 let v = v.trim_start_matches('0');
308 hasher.update(p.as_bytes());
309 hasher.update(v.as_bytes());
310 }
311
312 hasher.finalize()
313 }
314
315 pub fn validate_checksum(&self, remote_checksum: &str) -> bool {
316 if let Ok(remote_val) = remote_checksum.parse::<u32>() {
319 let local_val = self.calculate_checksum();
320 return local_val == remote_val;
321 }
322 false
323 }
324}
325
326impl<'de> Deserialize<'de> for Trade {
328 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
329 where
330 D: Deserializer<'de>,
331 {
332 let v: Vec<String> = Deserialize::deserialize(deserializer)?;
333 Ok(Trade {
334 price: v.first().cloned().unwrap_or_default(),
335 volume: v.get(1).cloned().unwrap_or_default(),
336 time: v.get(2).cloned().unwrap_or_default(),
337 side: v.get(3).cloned().unwrap_or_default(),
338 order_type: v.get(4).cloned().unwrap_or_default(),
339 misc: v.get(5).cloned().unwrap_or_default(),
340 })
341 }
342}
343
344#[derive(Debug, Clone, Copy)]
345pub struct Candle {
346 pub open: f64,
347 pub high: f64,
348 pub low: f64,
349 pub close: f64,
350 pub volume: f64,
351 pub start_time: u64, pub interval_seconds: u64,
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_parse_heartbeat() {
361 let data = r#"{"event":"heartbeat"}"#;
362 let event: KrakenEvent = serde_json::from_str(data).unwrap();
363 match event {
364 KrakenEvent::Heartbeat(_) => assert!(true),
365 _ => assert!(false, "Expected Heartbeat"),
366 }
367 }
368
369 #[test]
370 fn test_parse_trade_data() {
371 let data = r#"[123, [["50000.0", "1.0", "123456.789", "b", "m", ""]], "trade", "XBT/USD"]"#;
372 let event: KrakenEvent = serde_json::from_str(data).unwrap();
373 match event {
374 KrakenEvent::Data(vec) => {
375 assert_eq!(vec.len(), 4);
376 }
378 _ => assert!(false, "Expected Data"),
379 }
380 }
381}