kabu_defi_market/
pool_loader_actor.rs

1use std::collections::{BTreeMap, HashMap};
2use std::marker::PhantomData;
3use std::sync::Arc;
4
5use alloy_network::Network;
6use alloy_primitives::Address;
7use alloy_provider::Provider;
8use alloy_rpc_types::TransactionRequest;
9use eyre::Result;
10use tracing::{debug, error, info};
11
12use kabu_core_actors::{run_sync, subscribe, Actor, ActorResult, Broadcaster, Producer, SharedState, WorkerResult};
13use kabu_core_actors::{Accessor, Consumer};
14use kabu_core_actors_macros::{Accessor, Consumer, Producer};
15use kabu_core_blockchain::{Blockchain, BlockchainState};
16use kabu_node_debug_provider::DebugProviderExt;
17use kabu_types_entities::required_state::RequiredStateReader;
18use kabu_types_entities::{Market, MarketState, PoolClass, PoolId, PoolLoaders, PoolWrapper, SwapDirection};
19use kabu_types_events::{LoomTask, MarketEvents};
20
21use kabu_types_blockchain::{get_touched_addresses, KabuDataTypes, KabuDataTypesEVM};
22use kabu_types_entities::pool_config::PoolsLoadingConfig;
23use revm::{Database, DatabaseCommit, DatabaseRef};
24use tokio::sync::Semaphore;
25
26const MAX_CONCURRENT_TASKS: usize = 20;
27
28pub async fn pool_loader_worker<P, PL, N, DB>(
29    client: P,
30    pool_loaders: Arc<PoolLoaders<PL, N>>,
31    pools_config: PoolsLoadingConfig,
32    market: SharedState<Market>,
33    market_state: SharedState<MarketState<DB>>,
34    tasks_rx: Broadcaster<LoomTask>,
35    market_events_tx: Broadcaster<MarketEvents>,
36) -> WorkerResult
37where
38    N: Network<TransactionRequest = TransactionRequest>,
39    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
40    PL: Provider<N> + Send + Sync + Clone + 'static,
41    DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
42{
43    let mut processed_pools = HashMap::new();
44    let semaphore = std::sync::Arc::new(Semaphore::new(pools_config.threads().unwrap_or(MAX_CONCURRENT_TASKS)));
45
46    subscribe!(tasks_rx);
47    loop {
48        if let Ok(task) = tasks_rx.recv().await {
49            let LoomTask::FetchAndAddPools(pools) = task;
50
51            for (pool_id, pool_class) in pools {
52                // Check if pool already exists
53                if processed_pools.insert(pool_id, true).is_some() {
54                    continue;
55                }
56
57                let sema_clone = semaphore.clone();
58                let client_clone = client.clone();
59                let market_clone = market.clone();
60                let market_state = market_state.clone();
61                let pool_loaders_clone = pool_loaders.clone();
62                let market_events_tx_clone = market_events_tx.clone();
63
64                tokio::task::spawn(async move {
65                    match sema_clone.acquire().await {
66                        Ok(permit) => {
67                            match fetch_and_add_pool_by_pool_id(
68                                client_clone,
69                                market_clone,
70                                market_state,
71                                pool_loaders_clone,
72                                pool_id,
73                                pool_class,
74                            )
75                            .await
76                            {
77                                Ok((pool_id, swap_path_idx_vec)) => {
78                                    info!(%pool_id, %pool_class, "Pool loaded successfully");
79                                    run_sync!(market_events_tx_clone.send(MarketEvents::NewPoolLoaded { pool_id, swap_path_idx_vec }))
80                                }
81                                Err(error) => {
82                                    error!(%error, %pool_id, %pool_class, "failed fetch_and_add_pool_by_address");
83                                }
84                            }
85
86                            drop(permit);
87                        }
88                        Err(error) => {
89                            error!(%error, "failed acquire semaphore");
90                        }
91                    }
92                });
93            }
94        }
95    }
96}
97
98/// Fetch pool data, add it to the market and fetch the required state
99pub async fn fetch_and_add_pool_by_pool_id<P, PL, N, DB, LDT>(
100    client: P,
101    market: SharedState<Market>,
102    market_state: SharedState<MarketState<DB>>,
103    pool_loaders: Arc<PoolLoaders<PL, N, LDT>>,
104    pool_id: PoolId,
105    pool_class: PoolClass,
106) -> Result<(PoolId, Vec<usize>)>
107where
108    N: Network<TransactionRequest = LDT::TransactionRequest>,
109    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
110    PL: Provider<N> + Send + Sync + Clone + 'static,
111    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
112    LDT: KabuDataTypesEVM + 'static,
113{
114    debug!(%pool_id, %pool_class, "Fetching pool");
115
116    let pool = pool_loaders.load_pool_without_provider(pool_id, &pool_class).await?;
117    fetch_state_and_add_pool::<P, N, DB, LDT>(client, market.clone(), market_state.clone(), pool).await
118}
119
120pub async fn fetch_state_and_add_pool<P, N, DB, LDT>(
121    client: P,
122    market: SharedState<Market>,
123    market_state: SharedState<MarketState<DB>>,
124    pool_wrapped: PoolWrapper,
125) -> Result<(PoolId, Vec<usize>)>
126where
127    N: Network<TransactionRequest = LDT::TransactionRequest>,
128    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
129    DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
130    LDT: KabuDataTypesEVM,
131{
132    match pool_wrapped.get_state_required() {
133        Ok(required_state) => match RequiredStateReader::<LDT>::fetch_calls_and_slots::<N, P>(client, required_state, None).await {
134            Ok(state) => {
135                let pool_address = pool_wrapped.get_address();
136                {
137                    let updated_addresses = get_touched_addresses(&state);
138
139                    let mut market_state_write_guard = market_state.write().await;
140                    market_state_write_guard.apply_geth_update(state);
141                    // TODO : Fix disable cells
142                    let address = match pool_address {
143                        PoolId::Address(addr) => addr,
144                        PoolId::B256(_) => Address::ZERO,
145                    };
146                    market_state_write_guard.config.disable_cell_vec(address, pool_wrapped.get_read_only_cell_vec());
147
148                    let pool_tokens = pool_wrapped.get_tokens();
149
150                    for updated_address in updated_addresses {
151                        if !pool_tokens.contains(&updated_address) {
152                            market_state_write_guard.config.add_force_insert(updated_address);
153                        }
154                    }
155
156                    drop(market_state_write_guard);
157                }
158
159                let directions_vec = pool_wrapped.get_swap_directions();
160                let pool_manager_cells = pool_wrapped.get_pool_manager_cells();
161                let pool_id = pool_wrapped.get_pool_id();
162
163                let mut directions_tree: BTreeMap<PoolWrapper, Vec<SwapDirection>> = BTreeMap::new();
164                directions_tree.insert(pool_wrapped.clone(), directions_vec);
165
166                let start_time = std::time::Instant::now();
167                let mut market_write_guard = market.write().await;
168                debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
169                // Ignore error if pool already exists because it was maybe already added by e.g. db pool loader
170                let _ = market_write_guard.add_pool(pool_wrapped);
171
172                let swap_paths = market_write_guard.build_swap_path_vec(&directions_tree)?;
173                let swap_paths_added = market_write_guard.add_paths(swap_paths);
174
175                for (pool_manager_address, cells_vec) in pool_manager_cells {
176                    for cell in cells_vec {
177                        market_write_guard.add_pool_manager_cell(pool_manager_address, pool_id, cell)
178                    }
179                }
180
181                debug!(elapsed = start_time.elapsed().as_micros(),  market = %market_write_guard, "market_guard path added");
182
183                drop(market_write_guard);
184                debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write releases");
185
186                Ok((pool_id, swap_paths_added))
187            }
188            Err(e) => {
189                error!("{}", e);
190                Err(e)
191            }
192        },
193        Err(e) => {
194            error!("{}", e);
195            Err(e)
196        }
197    }
198}
199
200#[derive(Accessor, Consumer, Producer)]
201pub struct PoolLoaderActor<P, PL, N, DB>
202where
203    N: Network,
204    P: Provider<N> + Send + Sync + Clone + 'static,
205    PL: Provider<N> + Send + Sync + Clone + 'static,
206    DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + Default + 'static,
207{
208    client: P,
209    pool_loaders: Arc<PoolLoaders<PL, N>>,
210    pools_config: PoolsLoadingConfig,
211    #[accessor]
212    market: Option<SharedState<Market>>,
213    #[accessor]
214    market_state: Option<SharedState<MarketState<DB>>>,
215    #[consumer]
216    tasks_rx: Option<Broadcaster<LoomTask>>,
217    #[producer]
218    market_events_channel_tx: Option<Broadcaster<MarketEvents>>,
219    _n: PhantomData<N>,
220}
221
222impl<P, PL, N, DB> PoolLoaderActor<P, PL, N, DB>
223where
224    N: Network,
225    P: Provider<N> + Send + Sync + Clone + 'static,
226    PL: Provider<N> + Send + Sync + Clone + 'static,
227    DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + Default + 'static,
228{
229    pub fn new(client: P, pool_loaders: Arc<PoolLoaders<PL, N>>, pools_config: PoolsLoadingConfig) -> Self {
230        Self {
231            client,
232            pool_loaders,
233            pools_config,
234            market: None,
235            market_state: None,
236            tasks_rx: None,
237            market_events_channel_tx: None,
238            _n: PhantomData,
239        }
240    }
241
242    pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
243        Self {
244            market: Some(bc.market()),
245            market_state: Some(state.market_state_commit()),
246            tasks_rx: Some(bc.tasks_channel()),
247            market_events_channel_tx: Some(bc.market_events_channel()),
248            ..self
249        }
250    }
251}
252
253impl<P, PL, N, DB> Actor for PoolLoaderActor<P, PL, N, DB>
254where
255    N: Network<TransactionRequest = TransactionRequest>,
256    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
257    PL: Provider<N> + Send + Sync + Clone + 'static,
258    DB: Database + DatabaseRef + DatabaseCommit + Default + Send + Sync + Clone + 'static,
259{
260    fn start(&self) -> ActorResult {
261        let task = tokio::task::spawn(pool_loader_worker(
262            self.client.clone(),
263            self.pool_loaders.clone(),
264            self.pools_config.clone(),
265            self.market.clone().unwrap(),
266            self.market_state.clone().unwrap(),
267            self.tasks_rx.clone().unwrap(),
268            self.market_events_channel_tx.clone().unwrap(),
269        ));
270        Ok(vec![task])
271    }
272
273    fn name(&self) -> &'static str {
274        "PoolLoaderActor"
275    }
276}