kraken_sdk/
aggregator.rs

1use crate::models::{Candle, Trade};
2
3pub struct TradeAggregator {
4    interval_seconds: u64,
5    current_candle: Option<Candle>,
6}
7
8impl TradeAggregator {
9    pub fn new(interval_seconds: u64) -> Self {
10        Self {
11            interval_seconds,
12            current_candle: None,
13        }
14    }
15
16    pub fn update(&mut self, trade: &Trade) {
17        let price = trade.price.parse::<f64>().unwrap_or(0.0);
18        let volume = trade.volume.parse::<f64>().unwrap_or(0.0);
19        let time = trade.time.parse::<f64>().unwrap_or(0.0) as u64;
20
21        // Determine the start time of the candle this trade belongs to
22        let candle_start = (time / self.interval_seconds) * self.interval_seconds;
23
24        if let Some(candle) = &mut self.current_candle {
25            if candle.start_time == candle_start {
26                // Update existing candle
27                candle.high = candle.high.max(price);
28                candle.low = candle.low.min(price);
29                candle.close = price;
30                candle.volume += volume;
31                return;
32            } else {
33                // This trade belongs to a new candle (or we missed some, but we assume stream is roughly ordered)
34                // In a real system, we might buffer or handle out-of-order trades.
35                // For this SDK, we'll just start a new one.
36                // The caller should have checked `check_flush` before calling update if they wanted the closed candle.
37            }
38        }
39
40        // Start new candle
41        self.current_candle = Some(Candle {
42            open: price,
43            high: price,
44            low: price,
45            close: price,
46            volume,
47            start_time: candle_start,
48            interval_seconds: self.interval_seconds,
49        });
50    }
51
52    /// Checks if the current candle is "done" based on the new time, returning it if so.
53    /// This is a simplified logic: we return the *previous* candle if the *new* time belongs to a later interval.
54    pub fn check_flush(&mut self, new_trade_time: f64) -> Option<Candle> {
55        let time = new_trade_time as u64;
56        let new_candle_start = (time / self.interval_seconds) * self.interval_seconds;
57
58        if let Some(candle) = &self.current_candle {
59            if new_candle_start > candle.start_time {
60                // The time has moved to the next interval. The current candle is closed.
61                return self.current_candle.take();
62            }
63        }
64        None
65    }
66}