kabu_defi_market/
history_pool_loader_actor.rs1use alloy_network::Network;
2use alloy_provider::Provider;
3use alloy_rpc_types::Filter;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use tracing::{debug, error, info};
7
8use crate::logs_parser::process_log_entries;
9use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer, WorkerResult};
10use kabu_core_actors_macros::Producer;
11use kabu_core_blockchain::Blockchain;
12use kabu_types_blockchain::KabuDataTypesEthereum;
13use kabu_types_entities::PoolLoaders;
14use kabu_types_events::LoomTask;
15
16async fn history_pool_loader_one_shot_worker<P, PL, N>(
17 client: P,
18 pool_loaders: Arc<PoolLoaders<PL, N, KabuDataTypesEthereum>>,
19 tasks_tx: Broadcaster<LoomTask>,
20) -> WorkerResult
21where
22 N: Network,
23 P: Provider<N> + Send + Sync + Clone + 'static,
24 PL: Provider<N> + Send + Sync + Clone + 'static,
25{
26 let mut current_block = client.get_block_number().await?;
27
28 let block_size: u64 = 5;
29
30 for _ in 1..10000 {
31 if current_block < block_size + 1 {
32 break;
33 }
34 current_block -= block_size;
35 debug!("Loading blocks {} {}", current_block, current_block + block_size);
36 let filter = Filter::new().from_block(current_block).to_block(current_block + block_size - 1);
37 match client.get_logs(&filter).await {
38 Ok(logs) => {
39 process_log_entries(logs, pool_loaders.as_ref(), tasks_tx.clone()).await?;
40 }
41 Err(e) => {
42 error!("{}", e)
43 }
44 }
45 }
46 info!("history_pool_loader_worker finished");
47
48 Ok("history_pool_loader_worker".to_string())
49}
50
51#[derive(Producer)]
52pub struct HistoryPoolLoaderOneShotActor<P, PL, N>
53where
54 N: Network,
55 P: Provider<N> + Send + Sync + Clone + 'static,
56 PL: Provider<N> + Send + Sync + Clone + 'static,
57{
58 client: P,
59 pool_loaders: Arc<PoolLoaders<PL, N>>,
60 #[producer]
61 tasks_tx: Option<Broadcaster<LoomTask>>,
62 _n: PhantomData<N>,
63}
64
65impl<P, PL, N> HistoryPoolLoaderOneShotActor<P, PL, N>
66where
67 N: Network,
68 P: Provider<N> + Send + Sync + Clone + 'static,
69 PL: Provider<N> + Send + Sync + Clone + 'static,
70{
71 pub fn new(client: P, pool_loaders: Arc<PoolLoaders<PL, N>>) -> Self {
72 Self { client, pool_loaders, tasks_tx: None, _n: PhantomData }
73 }
74
75 pub fn on_bc(self, bc: &Blockchain) -> Self {
76 Self { tasks_tx: Some(bc.tasks_channel()), ..self }
77 }
78}
79
80impl<P, PL, N> Actor for HistoryPoolLoaderOneShotActor<P, PL, N>
81where
82 N: Network,
83 P: Provider<N> + Send + Sync + Clone + 'static,
84 PL: Provider<N> + Send + Sync + Clone + 'static,
85{
86 fn start(&self) -> ActorResult {
87 let task = tokio::task::spawn(history_pool_loader_one_shot_worker(
88 self.client.clone(),
89 self.pool_loaders.clone(),
90 self.tasks_tx.clone().unwrap(),
91 ));
92 Ok(vec![task])
93 }
94
95 fn name(&self) -> &'static str {
96 "HistoryPoolLoaderOneShotActor"
97 }
98}