kabu_defi_market/
new_pool_actor.rs

1use alloy_network::Network;
2use alloy_provider::Provider;
3use eyre::Result;
4use std::sync::Arc;
5use tokio::sync::broadcast::error::RecvError;
6use tracing::{debug, error};
7
8use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
9use kabu_core_actors_macros::{Consumer, Producer};
10use kabu_core_blockchain::Blockchain;
11use kabu_types_entities::PoolLoaders;
12use kabu_types_events::{LoomTask, MessageBlockLogs};
13
14use crate::logs_parser::process_log_entries;
15
16pub async fn new_pool_worker<P, N>(
17    log_update_rx: Broadcaster<MessageBlockLogs>,
18    pools_loaders: Arc<PoolLoaders<P, N>>,
19    tasks_tx: Broadcaster<LoomTask>,
20) -> WorkerResult
21where
22    N: Network,
23    P: Provider<N> + Send + Sync + Clone + 'static,
24{
25    subscribe!(log_update_rx);
26
27    loop {
28        tokio::select! {
29            msg = log_update_rx.recv() => {
30                debug!("Log update");
31
32                let log_update : Result<MessageBlockLogs, RecvError>  = msg;
33                match log_update {
34                    Ok(log_update_msg)=>{
35                        process_log_entries(
36                                log_update_msg.inner.logs,
37                                &pools_loaders,
38                                tasks_tx.clone(),
39                        ).await?
40                    }
41                    Err(e)=>{
42                        error!("block_update error {}", e)
43                    }
44                }
45
46            }
47        }
48    }
49}
50
51#[derive(Consumer, Producer)]
52pub struct NewPoolLoaderActor<P, N>
53where
54    N: Network,
55    P: Provider<N> + Send + Sync + Clone + 'static,
56{
57    pool_loaders: Arc<PoolLoaders<P, N>>,
58    #[consumer]
59    log_update_rx: Option<Broadcaster<MessageBlockLogs>>,
60    #[producer]
61    tasks_tx: Option<Broadcaster<LoomTask>>,
62}
63
64impl<P, N> NewPoolLoaderActor<P, N>
65where
66    N: Network,
67    P: Provider<N> + Send + Sync + Clone + 'static,
68{
69    pub fn new(pool_loaders: Arc<PoolLoaders<P, N>>) -> Self {
70        NewPoolLoaderActor { log_update_rx: None, pool_loaders, tasks_tx: None }
71    }
72
73    pub fn on_bc(self, bc: &Blockchain) -> Self {
74        Self { log_update_rx: Some(bc.new_block_logs_channel()), tasks_tx: Some(bc.tasks_channel()), ..self }
75    }
76}
77
78impl<P, N> Actor for NewPoolLoaderActor<P, N>
79where
80    N: Network,
81    P: Provider<N> + Send + Sync + Clone + 'static,
82{
83    fn start(&self) -> ActorResult {
84        let task = tokio::task::spawn(new_pool_worker(
85            self.log_update_rx.clone().unwrap(),
86            self.pool_loaders.clone(),
87            self.tasks_tx.clone().unwrap(),
88        ));
89        Ok(vec![task])
90    }
91
92    fn name(&self) -> &'static str {
93        "NewPoolLoaderActor"
94    }
95}