kabu_defi_market/
protocol_pool_loader_actor.rs1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use alloy_network::Network;
5use alloy_provider::Provider;
6use tracing::{error, info};
7
8use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer, WorkerResult};
9use kabu_core_actors_macros::Producer;
10use kabu_core_blockchain::Blockchain;
11use kabu_types_entities::PoolLoaders;
12use kabu_types_events::LoomTask;
13use tokio_stream::StreamExt;
14
15async fn protocol_pool_loader_worker<P, PL, N>(
16 _client: P,
17 pool_loaders: Arc<PoolLoaders<PL, N>>,
18 tasks_tx: Broadcaster<LoomTask>,
19) -> WorkerResult
20where
21 N: Network,
22 P: Provider<N> + Send + Sync + Clone + 'static,
23 PL: Provider<N> + Send + Sync + Clone + 'static,
24{
25 for (pool_class, pool_loader) in pool_loaders.map.iter() {
26 let tasks_tx_clone = tasks_tx.clone();
27 if let Ok(mut proto_loader) = pool_loader.clone().protocol_loader() {
28 info!("Protocol loader started for {}", pool_class);
29 tokio::task::spawn(async move {
30 while let Some((pool_id, pool_class)) = proto_loader.next().await {
31 if let Err(error) = tasks_tx_clone.send(LoomTask::FetchAndAddPools(vec![(pool_id, pool_class)])) {
32 error!(%error, "tasks_tx.send");
33 }
34 }
35 });
36 } else {
37 error!("Protocol loader unavailable for {}", pool_class);
38 }
39 }
40
41 Ok("curve_protocol_loader_worker".to_string())
42}
43
44#[derive(Producer)]
45pub struct ProtocolPoolLoaderOneShotActor<P, PL, N>
46where
47 N: Network,
48 P: Provider<N> + Send + Sync + Clone + 'static,
49 PL: Provider<N> + Send + Sync + Clone + 'static,
50{
51 client: P,
52 pool_loaders: Arc<PoolLoaders<PL, N>>,
53 #[producer]
54 tasks_tx: Option<Broadcaster<LoomTask>>,
55 _n: PhantomData<N>,
56}
57
58impl<P, PL, N> ProtocolPoolLoaderOneShotActor<P, PL, N>
59where
60 N: Network,
61 P: Provider<N> + Send + Sync + Clone + 'static,
62 PL: Provider<N> + Send + Sync + Clone + 'static,
63{
64 pub fn new(client: P, pool_loaders: Arc<PoolLoaders<PL, N>>) -> Self {
65 Self { client, pool_loaders, tasks_tx: None, _n: PhantomData }
66 }
67
68 pub fn on_bc(self, bc: &Blockchain) -> Self {
69 Self { tasks_tx: Some(bc.tasks_channel()), ..self }
70 }
71}
72
73impl<P, PL, N> Actor for ProtocolPoolLoaderOneShotActor<P, PL, N>
74where
75 N: Network,
76 P: Provider<N> + Send + Sync + Clone + 'static,
77 PL: Provider<N> + Send + Sync + Clone + 'static,
78{
79 fn start(&self) -> ActorResult {
80 let task =
81 tokio::task::spawn(protocol_pool_loader_worker(self.client.clone(), self.pool_loaders.clone(), self.tasks_tx.clone().unwrap()));
82
83 Ok(vec![task])
84 }
85
86 fn name(&self) -> &'static str {
87 "CurvePoolLoaderOneShotActor"
88 }
89}