1use std::collections::{BTreeMap, HashMap};
2use std::marker::PhantomData;
3use std::sync::Arc;
4
5use alloy_network::Network;
6use alloy_primitives::Address;
7use alloy_provider::Provider;
8use alloy_rpc_types::TransactionRequest;
9use eyre::Result;
10use tracing::{debug, error, info};
11
12use kabu_core_actors::{run_sync, subscribe, Actor, ActorResult, Broadcaster, Producer, SharedState, WorkerResult};
13use kabu_core_actors::{Accessor, Consumer};
14use kabu_core_actors_macros::{Accessor, Consumer, Producer};
15use kabu_core_blockchain::{Blockchain, BlockchainState};
16use kabu_node_debug_provider::DebugProviderExt;
17use kabu_types_entities::required_state::RequiredStateReader;
18use kabu_types_entities::{Market, MarketState, PoolClass, PoolId, PoolLoaders, PoolWrapper, SwapDirection};
19use kabu_types_events::{LoomTask, MarketEvents};
20
21use kabu_types_blockchain::{get_touched_addresses, KabuDataTypes, KabuDataTypesEVM};
22use kabu_types_entities::pool_config::PoolsLoadingConfig;
23use revm::{Database, DatabaseCommit, DatabaseRef};
24use tokio::sync::Semaphore;
25
26const MAX_CONCURRENT_TASKS: usize = 20;
27
28pub async fn pool_loader_worker<P, PL, N, DB>(
29 client: P,
30 pool_loaders: Arc<PoolLoaders<PL, N>>,
31 pools_config: PoolsLoadingConfig,
32 market: SharedState<Market>,
33 market_state: SharedState<MarketState<DB>>,
34 tasks_rx: Broadcaster<LoomTask>,
35 market_events_tx: Broadcaster<MarketEvents>,
36) -> WorkerResult
37where
38 N: Network<TransactionRequest = TransactionRequest>,
39 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
40 PL: Provider<N> + Send + Sync + Clone + 'static,
41 DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
42{
43 let mut processed_pools = HashMap::new();
44 let semaphore = std::sync::Arc::new(Semaphore::new(pools_config.threads().unwrap_or(MAX_CONCURRENT_TASKS)));
45
46 subscribe!(tasks_rx);
47 loop {
48 if let Ok(task) = tasks_rx.recv().await {
49 let LoomTask::FetchAndAddPools(pools) = task;
50
51 for (pool_id, pool_class) in pools {
52 if processed_pools.insert(pool_id, true).is_some() {
54 continue;
55 }
56
57 let sema_clone = semaphore.clone();
58 let client_clone = client.clone();
59 let market_clone = market.clone();
60 let market_state = market_state.clone();
61 let pool_loaders_clone = pool_loaders.clone();
62 let market_events_tx_clone = market_events_tx.clone();
63
64 tokio::task::spawn(async move {
65 match sema_clone.acquire().await {
66 Ok(permit) => {
67 match fetch_and_add_pool_by_pool_id(
68 client_clone,
69 market_clone,
70 market_state,
71 pool_loaders_clone,
72 pool_id,
73 pool_class,
74 )
75 .await
76 {
77 Ok((pool_id, swap_path_idx_vec)) => {
78 info!(%pool_id, %pool_class, "Pool loaded successfully");
79 run_sync!(market_events_tx_clone.send(MarketEvents::NewPoolLoaded { pool_id, swap_path_idx_vec }))
80 }
81 Err(error) => {
82 error!(%error, %pool_id, %pool_class, "failed fetch_and_add_pool_by_address");
83 }
84 }
85
86 drop(permit);
87 }
88 Err(error) => {
89 error!(%error, "failed acquire semaphore");
90 }
91 }
92 });
93 }
94 }
95 }
96}
97
98pub async fn fetch_and_add_pool_by_pool_id<P, PL, N, DB, LDT>(
100 client: P,
101 market: SharedState<Market>,
102 market_state: SharedState<MarketState<DB>>,
103 pool_loaders: Arc<PoolLoaders<PL, N, LDT>>,
104 pool_id: PoolId,
105 pool_class: PoolClass,
106) -> Result<(PoolId, Vec<usize>)>
107where
108 N: Network<TransactionRequest = LDT::TransactionRequest>,
109 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
110 PL: Provider<N> + Send + Sync + Clone + 'static,
111 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
112 LDT: KabuDataTypesEVM + 'static,
113{
114 debug!(%pool_id, %pool_class, "Fetching pool");
115
116 let pool = pool_loaders.load_pool_without_provider(pool_id, &pool_class).await?;
117 fetch_state_and_add_pool::<P, N, DB, LDT>(client, market.clone(), market_state.clone(), pool).await
118}
119
120pub async fn fetch_state_and_add_pool<P, N, DB, LDT>(
121 client: P,
122 market: SharedState<Market>,
123 market_state: SharedState<MarketState<DB>>,
124 pool_wrapped: PoolWrapper,
125) -> Result<(PoolId, Vec<usize>)>
126where
127 N: Network<TransactionRequest = LDT::TransactionRequest>,
128 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
129 DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
130 LDT: KabuDataTypesEVM,
131{
132 match pool_wrapped.get_state_required() {
133 Ok(required_state) => match RequiredStateReader::<LDT>::fetch_calls_and_slots::<N, P>(client, required_state, None).await {
134 Ok(state) => {
135 let pool_address = pool_wrapped.get_address();
136 {
137 let updated_addresses = get_touched_addresses(&state);
138
139 let mut market_state_write_guard = market_state.write().await;
140 market_state_write_guard.apply_geth_update(state);
141 let address = match pool_address {
143 PoolId::Address(addr) => addr,
144 PoolId::B256(_) => Address::ZERO,
145 };
146 market_state_write_guard.config.disable_cell_vec(address, pool_wrapped.get_read_only_cell_vec());
147
148 let pool_tokens = pool_wrapped.get_tokens();
149
150 for updated_address in updated_addresses {
151 if !pool_tokens.contains(&updated_address) {
152 market_state_write_guard.config.add_force_insert(updated_address);
153 }
154 }
155
156 drop(market_state_write_guard);
157 }
158
159 let directions_vec = pool_wrapped.get_swap_directions();
160 let pool_manager_cells = pool_wrapped.get_pool_manager_cells();
161 let pool_id = pool_wrapped.get_pool_id();
162
163 let mut directions_tree: BTreeMap<PoolWrapper, Vec<SwapDirection>> = BTreeMap::new();
164 directions_tree.insert(pool_wrapped.clone(), directions_vec);
165
166 let start_time = std::time::Instant::now();
167 let mut market_write_guard = market.write().await;
168 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
169 let _ = market_write_guard.add_pool(pool_wrapped);
171
172 let swap_paths = market_write_guard.build_swap_path_vec(&directions_tree)?;
173 let swap_paths_added = market_write_guard.add_paths(swap_paths);
174
175 for (pool_manager_address, cells_vec) in pool_manager_cells {
176 for cell in cells_vec {
177 market_write_guard.add_pool_manager_cell(pool_manager_address, pool_id, cell)
178 }
179 }
180
181 debug!(elapsed = start_time.elapsed().as_micros(), market = %market_write_guard, "market_guard path added");
182
183 drop(market_write_guard);
184 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write releases");
185
186 Ok((pool_id, swap_paths_added))
187 }
188 Err(e) => {
189 error!("{}", e);
190 Err(e)
191 }
192 },
193 Err(e) => {
194 error!("{}", e);
195 Err(e)
196 }
197 }
198}
199
200#[derive(Accessor, Consumer, Producer)]
201pub struct PoolLoaderActor<P, PL, N, DB>
202where
203 N: Network,
204 P: Provider<N> + Send + Sync + Clone + 'static,
205 PL: Provider<N> + Send + Sync + Clone + 'static,
206 DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + Default + 'static,
207{
208 client: P,
209 pool_loaders: Arc<PoolLoaders<PL, N>>,
210 pools_config: PoolsLoadingConfig,
211 #[accessor]
212 market: Option<SharedState<Market>>,
213 #[accessor]
214 market_state: Option<SharedState<MarketState<DB>>>,
215 #[consumer]
216 tasks_rx: Option<Broadcaster<LoomTask>>,
217 #[producer]
218 market_events_channel_tx: Option<Broadcaster<MarketEvents>>,
219 _n: PhantomData<N>,
220}
221
222impl<P, PL, N, DB> PoolLoaderActor<P, PL, N, DB>
223where
224 N: Network,
225 P: Provider<N> + Send + Sync + Clone + 'static,
226 PL: Provider<N> + Send + Sync + Clone + 'static,
227 DB: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + Default + 'static,
228{
229 pub fn new(client: P, pool_loaders: Arc<PoolLoaders<PL, N>>, pools_config: PoolsLoadingConfig) -> Self {
230 Self {
231 client,
232 pool_loaders,
233 pools_config,
234 market: None,
235 market_state: None,
236 tasks_rx: None,
237 market_events_channel_tx: None,
238 _n: PhantomData,
239 }
240 }
241
242 pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
243 Self {
244 market: Some(bc.market()),
245 market_state: Some(state.market_state_commit()),
246 tasks_rx: Some(bc.tasks_channel()),
247 market_events_channel_tx: Some(bc.market_events_channel()),
248 ..self
249 }
250 }
251}
252
253impl<P, PL, N, DB> Actor for PoolLoaderActor<P, PL, N, DB>
254where
255 N: Network<TransactionRequest = TransactionRequest>,
256 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
257 PL: Provider<N> + Send + Sync + Clone + 'static,
258 DB: Database + DatabaseRef + DatabaseCommit + Default + Send + Sync + Clone + 'static,
259{
260 fn start(&self) -> ActorResult {
261 let task = tokio::task::spawn(pool_loader_worker(
262 self.client.clone(),
263 self.pool_loaders.clone(),
264 self.pools_config.clone(),
265 self.market.clone().unwrap(),
266 self.market_state.clone().unwrap(),
267 self.tasks_rx.clone().unwrap(),
268 self.market_events_channel_tx.clone().unwrap(),
269 ));
270 Ok(vec![task])
271 }
272
273 fn name(&self) -> &'static str {
274 "PoolLoaderActor"
275 }
276}