kraken_sdk/lib.rs
1//! # Kraken SDK
2//!
3//! A high-performance, asynchronous Rust SDK for the Kraken WebSocket API.
4//!
5//! ## ✨ Features
6//!
7//! - **Strictly Typed**: Custom Serde deserializers for Kraken's complex JSON arrays.
8//! - **Resilient**: Automatic reconnection and state restoration.
9//! - **Verified**: CRC32 Checksum implementation for Order Book integrity.
10//! - **Fast**: Benchmarked at ~648k msg/sec.
11//!
12//! ## 🚀 Quick Start
13//!
14//! ```rust,no_run
15//! use kraken_sdk::KrakenClient;
16//!
17//! #[tokio::main]
18//! async fn main() {
19//! let client = KrakenClient::new();
20//! let mut rx = client.subscribe_events();
21//!
22//! client.connect().await.unwrap();
23//! client.subscribe(vec!["XBT/USD".to_string()], "trade", None).await.unwrap();
24//!
25//! while let Ok(event) = rx.recv().await {
26//! println!("Received: {:?}", event);
27//! }
28//! }
29//! ```
30
31use eyre::Result;
32use futures_util::{SinkExt, StreamExt};
33use serde::Serialize;
34use tokio::sync::{broadcast, mpsc};
35use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
36use tracing::{error, info, warn};
37
38pub mod aggregator;
39pub mod auth;
40pub mod models;
41use models::KrakenEvent;
42
43#[derive(Debug, Clone)]
44pub enum Command {
45 Subscribe {
46 pairs: Vec<String>,
47 subscription: SubscriptionArgs,
48 },
49}
50
51#[derive(Debug, Clone, Serialize)]
52pub struct SubscriptionArgs {
53 pub name: String,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub token: Option<String>,
56}
57
58pub struct KrakenClient {
59 ws_url: String,
60 event_sender: broadcast::Sender<KrakenEvent>,
61 command_sender: mpsc::Sender<Command>,
62 // We store the receiver in an Option so we can take it out once when connecting
63 command_receiver: std::sync::Mutex<Option<mpsc::Receiver<Command>>>,
64}
65
66impl Default for KrakenClient {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72impl KrakenClient {
73 /// Creates a new `KrakenClient` instance.
74 ///
75 /// This initializes the internal channels but does not connect to the WebSocket yet.
76 /// Call `connect()` to establish the connection.
77 pub fn new() -> Self {
78 let (event_sender, _) = broadcast::channel(100);
79 let (command_sender, command_receiver) = mpsc::channel(100);
80 Self {
81 ws_url: "wss://ws.kraken.com".to_string(),
82 event_sender,
83 command_sender,
84 command_receiver: std::sync::Mutex::new(Some(command_receiver)),
85 }
86 }
87
88 /// Returns a broadcast receiver for Kraken events.
89 ///
90 /// You can call this multiple times to create multiple subscribers (e.g., one for logging, one for trading).
91 pub fn subscribe_events(&self) -> broadcast::Receiver<KrakenEvent> {
92 self.event_sender.subscribe()
93 }
94
95 /// Subscribes to a list of pairs on a specific channel.
96 ///
97 /// # Arguments
98 ///
99 /// * `pairs` - A list of trading pairs (e.g., `vec!["XBT/USD".to_string()]`).
100 /// * `name` - The channel name (e.g., `"trade"`, `"book"`, `"ticker"`).
101 ///
102 /// # Example
103 ///
104 /// ```rust,no_run
105 /// # use kraken_sdk::KrakenClient;
106 /// # async fn example() {
107 /// let client = KrakenClient::new();
108 /// client.subscribe(vec!["XBT/USD".to_string()], "trade", None).await.unwrap();
109 /// # }
110 /// ```
111 pub async fn subscribe(
112 &self,
113 pairs: Vec<String>,
114 name: &str,
115 token: Option<String>,
116 ) -> Result<()> {
117 let cmd = Command::Subscribe {
118 pairs,
119 subscription: SubscriptionArgs {
120 name: name.to_string(),
121 token,
122 },
123 };
124 self.command_sender
125 .send(cmd)
126 .await
127 .map_err(|e| eyre::eyre!("Failed to send command: {}", e))?;
128 Ok(())
129 }
130
131 /// Connects to the Kraken WebSocket API and starts the event loop.
132 ///
133 /// This spawns a background task that handles:
134 /// - WebSocket connection and reconnection.
135 /// - Parsing incoming messages.
136 /// - Sending outgoing commands.
137 /// - Broadcasting events to subscribers.
138 ///
139 /// # Errors
140 ///
141 /// Returns an error if the client has already connected (the command receiver is taken).
142 pub async fn connect(&self) -> Result<()> {
143 info!("Starting Kraken Client...");
144
145 // Take the command receiver
146 let mut command_receiver = self
147 .command_receiver
148 .lock()
149 .unwrap()
150 .take()
151 .ok_or_else(|| eyre::eyre!("Client already connected (receiver taken)"))?;
152
153 let ws_url = self.ws_url.clone();
154 let event_sender = self.event_sender.clone();
155
156 // State to track active subscriptions for re-subscribing
157 // We use a simple list of commands that we've sent.
158 // In a real app, we might want to be smarter (e.g. remove unsubscribes),
159 // but for now, replaying the "Subscribe" commands is sufficient.
160 let mut active_subscriptions: Vec<Command> = Vec::new();
161
162 // Spawn the driver task
163 tokio::spawn(async move {
164 loop {
165 info!("Connecting to {}...", ws_url);
166 let ws_stream = match connect_async(&ws_url).await {
167 Ok((stream, _)) => {
168 info!("Connected to Kraken WebSocket API");
169 stream
170 }
171 Err(e) => {
172 error!("Connection failed: {}. Retrying in 5s...", e);
173 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
174 continue;
175 }
176 };
177
178 let (mut write, mut read) = ws_stream.split();
179
180 // Re-send active subscriptions
181 for cmd in &active_subscriptions {
182 let Command::Subscribe {
183 pairs,
184 subscription,
185 } = cmd;
186 let msg = serde_json::json!({
187 "event": "subscribe",
188 "pair": pairs,
189 "subscription": subscription
190 });
191 if let Err(e) = write.send(Message::Text(msg.to_string())).await {
192 error!("Failed to resubscribe: {}", e);
193 // If we can't send, the connection is likely dead, break to outer loop
194 break;
195 }
196 info!("Resubscribed to {:?}", pairs);
197 }
198
199 loop {
200 tokio::select! {
201 // 1. Handle incoming WS messages
202 msg_opt = read.next() => {
203 match msg_opt {
204 Some(Ok(Message::Text(text))) => {
205 match serde_json::from_str::<KrakenEvent>(&text) {
206 Ok(event) => {
207 let _ = event_sender.send(event);
208 }
209 Err(e) => error!("Parse error: {}", e),
210 }
211 }
212 Some(Ok(Message::Ping(_))) => {}
213 Some(Err(e)) => {
214 error!("WS Error: {}. Reconnecting...", e);
215 break; // Break inner loop to reconnect
216 }
217 None => {
218 warn!("WS Stream ended. Reconnecting...");
219 break; // Break inner loop to reconnect
220 }
221 _ => {}
222 }
223 }
224 // 2. Handle outgoing commands
225 cmd_opt = command_receiver.recv() => {
226 match cmd_opt {
227 Some(cmd) => {
228 match &cmd {
229 Command::Subscribe { pairs, subscription } => {
230 let msg = serde_json::json!({
231 "event": "subscribe",
232 "pair": pairs,
233 "subscription": subscription
234 });
235 if let Err(e) = write.send(Message::Text(msg.to_string())).await {
236 error!("Failed to send subscription: {}", e);
237 break; // Connection likely dead
238 }
239 info!("Sent subscription for {:?}", pairs);
240
241 // Add to active subscriptions
242 active_subscriptions.push(cmd);
243 }
244 }
245 }
246 None => {
247 warn!("Command channel closed. Shutting down client.");
248 return; // Exit the task entirely
249 }
250 }
251 }
252 }
253 }
254
255 // If we broke the inner loop, wait a bit before reconnecting
256 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
257 }
258 });
259
260 Ok(())
261 }
262}