kabu_defi_market/
new_pool_actor.rs1use alloy_network::Network;
2use alloy_provider::Provider;
3use eyre::Result;
4use std::sync::Arc;
5use tokio::sync::broadcast::error::RecvError;
6use tracing::{debug, error};
7
8use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
9use kabu_core_actors_macros::{Consumer, Producer};
10use kabu_core_blockchain::Blockchain;
11use kabu_types_entities::PoolLoaders;
12use kabu_types_events::{LoomTask, MessageBlockLogs};
13
14use crate::logs_parser::process_log_entries;
15
16pub async fn new_pool_worker<P, N>(
17 log_update_rx: Broadcaster<MessageBlockLogs>,
18 pools_loaders: Arc<PoolLoaders<P, N>>,
19 tasks_tx: Broadcaster<LoomTask>,
20) -> WorkerResult
21where
22 N: Network,
23 P: Provider<N> + Send + Sync + Clone + 'static,
24{
25 subscribe!(log_update_rx);
26
27 loop {
28 tokio::select! {
29 msg = log_update_rx.recv() => {
30 debug!("Log update");
31
32 let log_update : Result<MessageBlockLogs, RecvError> = msg;
33 match log_update {
34 Ok(log_update_msg)=>{
35 process_log_entries(
36 log_update_msg.inner.logs,
37 &pools_loaders,
38 tasks_tx.clone(),
39 ).await?
40 }
41 Err(e)=>{
42 error!("block_update error {}", e)
43 }
44 }
45
46 }
47 }
48 }
49}
50
51#[derive(Consumer, Producer)]
52pub struct NewPoolLoaderActor<P, N>
53where
54 N: Network,
55 P: Provider<N> + Send + Sync + Clone + 'static,
56{
57 pool_loaders: Arc<PoolLoaders<P, N>>,
58 #[consumer]
59 log_update_rx: Option<Broadcaster<MessageBlockLogs>>,
60 #[producer]
61 tasks_tx: Option<Broadcaster<LoomTask>>,
62}
63
64impl<P, N> NewPoolLoaderActor<P, N>
65where
66 N: Network,
67 P: Provider<N> + Send + Sync + Clone + 'static,
68{
69 pub fn new(pool_loaders: Arc<PoolLoaders<P, N>>) -> Self {
70 NewPoolLoaderActor { log_update_rx: None, pool_loaders, tasks_tx: None }
71 }
72
73 pub fn on_bc(self, bc: &Blockchain) -> Self {
74 Self { log_update_rx: Some(bc.new_block_logs_channel()), tasks_tx: Some(bc.tasks_channel()), ..self }
75 }
76}
77
78impl<P, N> Actor for NewPoolLoaderActor<P, N>
79where
80 N: Network,
81 P: Provider<N> + Send + Sync + Clone + 'static,
82{
83 fn start(&self) -> ActorResult {
84 let task = tokio::task::spawn(new_pool_worker(
85 self.log_update_rx.clone().unwrap(),
86 self.pool_loaders.clone(),
87 self.tasks_tx.clone().unwrap(),
88 ));
89 Ok(vec![task])
90 }
91
92 fn name(&self) -> &'static str {
93 "NewPoolLoaderActor"
94 }
95}