kabu_defi_market/
protocol_pool_loader_actor.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use alloy_network::Network;
5use alloy_provider::Provider;
6use tracing::{error, info};
7
8use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer, WorkerResult};
9use kabu_core_actors_macros::Producer;
10use kabu_core_blockchain::Blockchain;
11use kabu_types_entities::PoolLoaders;
12use kabu_types_events::LoomTask;
13use tokio_stream::StreamExt;
14
15async fn protocol_pool_loader_worker<P, PL, N>(
16    _client: P,
17    pool_loaders: Arc<PoolLoaders<PL, N>>,
18    tasks_tx: Broadcaster<LoomTask>,
19) -> WorkerResult
20where
21    N: Network,
22    P: Provider<N> + Send + Sync + Clone + 'static,
23    PL: Provider<N> + Send + Sync + Clone + 'static,
24{
25    for (pool_class, pool_loader) in pool_loaders.map.iter() {
26        let tasks_tx_clone = tasks_tx.clone();
27        if let Ok(mut proto_loader) = pool_loader.clone().protocol_loader() {
28            info!("Protocol loader started for {}", pool_class);
29            tokio::task::spawn(async move {
30                while let Some((pool_id, pool_class)) = proto_loader.next().await {
31                    if let Err(error) = tasks_tx_clone.send(LoomTask::FetchAndAddPools(vec![(pool_id, pool_class)])) {
32                        error!(%error, "tasks_tx.send");
33                    }
34                }
35            });
36        } else {
37            error!("Protocol loader unavailable for {}", pool_class);
38        }
39    }
40
41    Ok("curve_protocol_loader_worker".to_string())
42}
43
44#[derive(Producer)]
45pub struct ProtocolPoolLoaderOneShotActor<P, PL, N>
46where
47    N: Network,
48    P: Provider<N> + Send + Sync + Clone + 'static,
49    PL: Provider<N> + Send + Sync + Clone + 'static,
50{
51    client: P,
52    pool_loaders: Arc<PoolLoaders<PL, N>>,
53    #[producer]
54    tasks_tx: Option<Broadcaster<LoomTask>>,
55    _n: PhantomData<N>,
56}
57
58impl<P, PL, N> ProtocolPoolLoaderOneShotActor<P, PL, N>
59where
60    N: Network,
61    P: Provider<N> + Send + Sync + Clone + 'static,
62    PL: Provider<N> + Send + Sync + Clone + 'static,
63{
64    pub fn new(client: P, pool_loaders: Arc<PoolLoaders<PL, N>>) -> Self {
65        Self { client, pool_loaders, tasks_tx: None, _n: PhantomData }
66    }
67
68    pub fn on_bc(self, bc: &Blockchain) -> Self {
69        Self { tasks_tx: Some(bc.tasks_channel()), ..self }
70    }
71}
72
73impl<P, PL, N> Actor for ProtocolPoolLoaderOneShotActor<P, PL, N>
74where
75    N: Network,
76    P: Provider<N> + Send + Sync + Clone + 'static,
77    PL: Provider<N> + Send + Sync + Clone + 'static,
78{
79    fn start(&self) -> ActorResult {
80        let task =
81            tokio::task::spawn(protocol_pool_loader_worker(self.client.clone(), self.pool_loaders.clone(), self.tasks_tx.clone().unwrap()));
82
83        Ok(vec![task])
84    }
85
86    fn name(&self) -> &'static str {
87        "CurvePoolLoaderOneShotActor"
88    }
89}