kraken_sdk/
lib.rs

1//! # Kraken SDK
2//!
3//! A high-performance, asynchronous Rust SDK for the Kraken WebSocket API.
4//!
5//! ## ✨ Features
6//!
7//! - **Strictly Typed**: Custom Serde deserializers for Kraken's complex JSON arrays.
8//! - **Resilient**: Automatic reconnection and state restoration.
9//! - **Verified**: CRC32 Checksum implementation for Order Book integrity.
10//! - **Fast**: Benchmarked at ~648k msg/sec.
11//!
12//! ## 🚀 Quick Start
13//!
14//! ```rust,no_run
15//! use kraken_sdk::KrakenClient;
16//!
17//! #[tokio::main]
18//! async fn main() {
19//!     let client = KrakenClient::new();
20//!     let mut rx = client.subscribe_events();
21//!
22//!     client.connect().await.unwrap();
23//!     client.subscribe(vec!["XBT/USD".to_string()], "trade", None).await.unwrap();
24//!
25//!     while let Ok(event) = rx.recv().await {
26//!         println!("Received: {:?}", event);
27//!     }
28//! }
29//! ```
30
31use eyre::Result;
32use futures_util::{SinkExt, StreamExt};
33use serde::Serialize;
34use tokio::sync::{broadcast, mpsc};
35use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
36use tracing::{error, info, warn};
37
38pub mod aggregator;
39pub mod auth;
40pub mod models;
41use models::KrakenEvent;
42
43#[derive(Debug, Clone)]
44pub enum Command {
45    Subscribe {
46        pairs: Vec<String>,
47        subscription: SubscriptionArgs,
48    },
49}
50
51#[derive(Debug, Clone, Serialize)]
52pub struct SubscriptionArgs {
53    pub name: String,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub token: Option<String>,
56}
57
58pub struct KrakenClient {
59    ws_url: String,
60    event_sender: broadcast::Sender<KrakenEvent>,
61    command_sender: mpsc::Sender<Command>,
62    // We store the receiver in an Option so we can take it out once when connecting
63    command_receiver: std::sync::Mutex<Option<mpsc::Receiver<Command>>>,
64}
65
66impl Default for KrakenClient {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72impl KrakenClient {
73    /// Creates a new `KrakenClient` instance.
74    ///
75    /// This initializes the internal channels but does not connect to the WebSocket yet.
76    /// Call `connect()` to establish the connection.
77    pub fn new() -> Self {
78        let (event_sender, _) = broadcast::channel(100);
79        let (command_sender, command_receiver) = mpsc::channel(100);
80        Self {
81            ws_url: "wss://ws.kraken.com".to_string(),
82            event_sender,
83            command_sender,
84            command_receiver: std::sync::Mutex::new(Some(command_receiver)),
85        }
86    }
87
88    /// Returns a broadcast receiver for Kraken events.
89    ///
90    /// You can call this multiple times to create multiple subscribers (e.g., one for logging, one for trading).
91    pub fn subscribe_events(&self) -> broadcast::Receiver<KrakenEvent> {
92        self.event_sender.subscribe()
93    }
94
95    /// Subscribes to a list of pairs on a specific channel.
96    ///
97    /// # Arguments
98    ///
99    /// * `pairs` - A list of trading pairs (e.g., `vec!["XBT/USD".to_string()]`).
100    /// * `name` - The channel name (e.g., `"trade"`, `"book"`, `"ticker"`).
101    ///
102    /// # Example
103    ///
104    /// ```rust,no_run
105    /// # use kraken_sdk::KrakenClient;
106    /// # async fn example() {
107    /// let client = KrakenClient::new();
108    /// client.subscribe(vec!["XBT/USD".to_string()], "trade", None).await.unwrap();
109    /// # }
110    /// ```
111    pub async fn subscribe(
112        &self,
113        pairs: Vec<String>,
114        name: &str,
115        token: Option<String>,
116    ) -> Result<()> {
117        let cmd = Command::Subscribe {
118            pairs,
119            subscription: SubscriptionArgs {
120                name: name.to_string(),
121                token,
122            },
123        };
124        self.command_sender
125            .send(cmd)
126            .await
127            .map_err(|e| eyre::eyre!("Failed to send command: {}", e))?;
128        Ok(())
129    }
130
131    /// Connects to the Kraken WebSocket API and starts the event loop.
132    ///
133    /// This spawns a background task that handles:
134    /// - WebSocket connection and reconnection.
135    /// - Parsing incoming messages.
136    /// - Sending outgoing commands.
137    /// - Broadcasting events to subscribers.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the client has already connected (the command receiver is taken).
142    pub async fn connect(&self) -> Result<()> {
143        info!("Starting Kraken Client...");
144
145        // Take the command receiver
146        let mut command_receiver = self
147            .command_receiver
148            .lock()
149            .unwrap()
150            .take()
151            .ok_or_else(|| eyre::eyre!("Client already connected (receiver taken)"))?;
152
153        let ws_url = self.ws_url.clone();
154        let event_sender = self.event_sender.clone();
155
156        // State to track active subscriptions for re-subscribing
157        // We use a simple list of commands that we've sent.
158        // In a real app, we might want to be smarter (e.g. remove unsubscribes),
159        // but for now, replaying the "Subscribe" commands is sufficient.
160        let mut active_subscriptions: Vec<Command> = Vec::new();
161
162        // Spawn the driver task
163        tokio::spawn(async move {
164            loop {
165                info!("Connecting to {}...", ws_url);
166                let ws_stream = match connect_async(&ws_url).await {
167                    Ok((stream, _)) => {
168                        info!("Connected to Kraken WebSocket API");
169                        stream
170                    }
171                    Err(e) => {
172                        error!("Connection failed: {}. Retrying in 5s...", e);
173                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
174                        continue;
175                    }
176                };
177
178                let (mut write, mut read) = ws_stream.split();
179
180                // Re-send active subscriptions
181                for cmd in &active_subscriptions {
182                    let Command::Subscribe {
183                        pairs,
184                        subscription,
185                    } = cmd;
186                    let msg = serde_json::json!({
187                        "event": "subscribe",
188                        "pair": pairs,
189                        "subscription": subscription
190                    });
191                    if let Err(e) = write.send(Message::Text(msg.to_string())).await {
192                        error!("Failed to resubscribe: {}", e);
193                        // If we can't send, the connection is likely dead, break to outer loop
194                        break;
195                    }
196                    info!("Resubscribed to {:?}", pairs);
197                }
198
199                loop {
200                    tokio::select! {
201                        // 1. Handle incoming WS messages
202                        msg_opt = read.next() => {
203                            match msg_opt {
204                                Some(Ok(Message::Text(text))) => {
205                                    match serde_json::from_str::<KrakenEvent>(&text) {
206                                        Ok(event) => {
207                                            let _ = event_sender.send(event);
208                                        }
209                                        Err(e) => error!("Parse error: {}", e),
210                                    }
211                                }
212                                Some(Ok(Message::Ping(_))) => {}
213                                Some(Err(e)) => {
214                                    error!("WS Error: {}. Reconnecting...", e);
215                                    break; // Break inner loop to reconnect
216                                }
217                                None => {
218                                    warn!("WS Stream ended. Reconnecting...");
219                                    break; // Break inner loop to reconnect
220                                }
221                                _ => {}
222                            }
223                        }
224                        // 2. Handle outgoing commands
225                        cmd_opt = command_receiver.recv() => {
226                            match cmd_opt {
227                                Some(cmd) => {
228                                    match &cmd {
229                                        Command::Subscribe { pairs, subscription } => {
230                                            let msg = serde_json::json!({
231                                                "event": "subscribe",
232                                                "pair": pairs,
233                                                "subscription": subscription
234                                            });
235                                            if let Err(e) = write.send(Message::Text(msg.to_string())).await {
236                                                error!("Failed to send subscription: {}", e);
237                                                break; // Connection likely dead
238                                            }
239                                            info!("Sent subscription for {:?}", pairs);
240
241                                            // Add to active subscriptions
242                                            active_subscriptions.push(cmd);
243                                        }
244                                    }
245                                }
246                                None => {
247                                    warn!("Command channel closed. Shutting down client.");
248                                    return; // Exit the task entirely
249                                }
250                            }
251                        }
252                    }
253                }
254
255                // If we broke the inner loop, wait a bit before reconnecting
256                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
257            }
258        });
259
260        Ok(())
261    }
262}