kabu_defi_market/
history_pool_loader_actor.rs

1use alloy_network::Network;
2use alloy_provider::Provider;
3use alloy_rpc_types::Filter;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use tracing::{debug, error, info};
7
8use crate::logs_parser::process_log_entries;
9use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer, WorkerResult};
10use kabu_core_actors_macros::Producer;
11use kabu_core_blockchain::Blockchain;
12use kabu_types_blockchain::KabuDataTypesEthereum;
13use kabu_types_entities::PoolLoaders;
14use kabu_types_events::LoomTask;
15
16async fn history_pool_loader_one_shot_worker<P, PL, N>(
17    client: P,
18    pool_loaders: Arc<PoolLoaders<PL, N, KabuDataTypesEthereum>>,
19    tasks_tx: Broadcaster<LoomTask>,
20) -> WorkerResult
21where
22    N: Network,
23    P: Provider<N> + Send + Sync + Clone + 'static,
24    PL: Provider<N> + Send + Sync + Clone + 'static,
25{
26    let mut current_block = client.get_block_number().await?;
27
28    let block_size: u64 = 5;
29
30    for _ in 1..10000 {
31        if current_block < block_size + 1 {
32            break;
33        }
34        current_block -= block_size;
35        debug!("Loading blocks {} {}", current_block, current_block + block_size);
36        let filter = Filter::new().from_block(current_block).to_block(current_block + block_size - 1);
37        match client.get_logs(&filter).await {
38            Ok(logs) => {
39                process_log_entries(logs, pool_loaders.as_ref(), tasks_tx.clone()).await?;
40            }
41            Err(e) => {
42                error!("{}", e)
43            }
44        }
45    }
46    info!("history_pool_loader_worker finished");
47
48    Ok("history_pool_loader_worker".to_string())
49}
50
51#[derive(Producer)]
52pub struct HistoryPoolLoaderOneShotActor<P, PL, N>
53where
54    N: Network,
55    P: Provider<N> + Send + Sync + Clone + 'static,
56    PL: Provider<N> + Send + Sync + Clone + 'static,
57{
58    client: P,
59    pool_loaders: Arc<PoolLoaders<PL, N>>,
60    #[producer]
61    tasks_tx: Option<Broadcaster<LoomTask>>,
62    _n: PhantomData<N>,
63}
64
65impl<P, PL, N> HistoryPoolLoaderOneShotActor<P, PL, N>
66where
67    N: Network,
68    P: Provider<N> + Send + Sync + Clone + 'static,
69    PL: Provider<N> + Send + Sync + Clone + 'static,
70{
71    pub fn new(client: P, pool_loaders: Arc<PoolLoaders<PL, N>>) -> Self {
72        Self { client, pool_loaders, tasks_tx: None, _n: PhantomData }
73    }
74
75    pub fn on_bc(self, bc: &Blockchain) -> Self {
76        Self { tasks_tx: Some(bc.tasks_channel()), ..self }
77    }
78}
79
80impl<P, PL, N> Actor for HistoryPoolLoaderOneShotActor<P, PL, N>
81where
82    N: Network,
83    P: Provider<N> + Send + Sync + Clone + 'static,
84    PL: Provider<N> + Send + Sync + Clone + 'static,
85{
86    fn start(&self) -> ActorResult {
87        let task = tokio::task::spawn(history_pool_loader_one_shot_worker(
88            self.client.clone(),
89            self.pool_loaders.clone(),
90            self.tasks_tx.clone().unwrap(),
91        ));
92        Ok(vec![task])
93    }
94
95    fn name(&self) -> &'static str {
96        "HistoryPoolLoaderOneShotActor"
97    }
98}