1use alloy_network::Network;
2use alloy_primitives::Address;
3use alloy_provider::Provider;
4use revm::DatabaseRef;
5use revm::{Database, DatabaseCommit};
6use std::marker::PhantomData;
7use std::sync::Arc;
8use tracing::{debug, error, info};
9
10use crate::pool_loader_actor::fetch_and_add_pool_by_pool_id;
11use kabu_core_actors::{Accessor, Actor, ActorResult, SharedState, WorkerResult};
12use kabu_core_actors_macros::{Accessor, Consumer};
13use kabu_core_blockchain::{Blockchain, BlockchainState};
14use kabu_evm_db::KabuDBError;
15use kabu_node_debug_provider::DebugProviderExt;
16use kabu_types_blockchain::KabuDataTypesEVM;
17use kabu_types_entities::required_state::{RequiredState, RequiredStateReader};
18use kabu_types_entities::{Market, MarketState, PoolClass, PoolId, PoolLoaders};
19
20async fn required_pools_loader_worker<P, N, DB, LDT>(
21 client: P,
22 pool_loaders: Arc<PoolLoaders<P, N, LDT>>,
23 pools: Vec<(PoolId, PoolClass)>,
24 required_state: Option<RequiredState>,
25 market: SharedState<Market>,
26 market_state: SharedState<MarketState<DB>>,
27) -> WorkerResult
28where
29 N: Network<TransactionRequest = LDT::TransactionRequest>,
30 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
31 DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
32 LDT: KabuDataTypesEVM + 'static,
33{
34 for (pool_id, pool_class) in pools {
35 debug!(class=%pool_class, %pool_id, "Loading pool");
36 match fetch_and_add_pool_by_pool_id(client.clone(), market.clone(), market_state.clone(), pool_loaders.clone(), pool_id, pool_class)
37 .await
38 {
39 Ok(_) => {
40 info!(class=%pool_class, %pool_id, "pool loaded")
41 }
42 Err(error) => {
43 error!(%error, "load_pool_with_provider")
44 }
45 }
46 }
47 if let Some(required_state) = required_state {
73 let update = RequiredStateReader::<LDT>::fetch_calls_and_slots(client.clone(), required_state, None).await?;
74 market_state.write().await.apply_geth_update(update);
75 }
76
77 Ok("required_pools_loader_worker".to_string())
78}
79
80#[derive(Accessor, Consumer)]
81pub struct RequiredPoolLoaderActor<P, N, DB, LDT>
82where
83 N: Network,
84 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
85 DB: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static,
86 LDT: KabuDataTypesEVM + 'static,
87{
88 client: P,
89 pool_loaders: Arc<PoolLoaders<P, N, LDT>>,
90 pools: Vec<(PoolId, PoolClass)>,
91 required_state: Option<RequiredState>,
92 #[accessor]
93 market: Option<SharedState<Market>>,
94 #[accessor]
95 market_state: Option<SharedState<MarketState<DB>>>,
96 _n: PhantomData<N>,
97}
98
99impl<P, N, DB, LDT> RequiredPoolLoaderActor<P, N, DB, LDT>
100where
101 N: Network,
102 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
103 DB: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static,
104 LDT: KabuDataTypesEVM + 'static,
105{
106 pub fn new(client: P, pool_loaders: Arc<PoolLoaders<P, N, LDT>>) -> Self {
107 Self { client, pools: Vec::new(), pool_loaders, required_state: None, market: None, market_state: None, _n: PhantomData }
108 }
109
110 pub fn with_pool_address(self, address: Address, pool_class: PoolClass) -> Self {
111 let mut pools = self.pools;
112 pools.push((PoolId::Address(address), pool_class));
113 Self { pools, ..self }
114 }
115
116 pub fn on_bc(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
117 Self { market: Some(bc.market()), market_state: Some(state.market_state_commit()), ..self }
118 }
119
120 pub fn with_required_state(self, required_state: RequiredState) -> Self {
121 Self { required_state: Some(required_state), ..self }
122 }
123}
124
125impl<P, N, DB, LDT> Actor for RequiredPoolLoaderActor<P, N, DB, LDT>
126where
127 N: Network<TransactionRequest = LDT::TransactionRequest>,
128 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
129 DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
130 LDT: KabuDataTypesEVM + 'static,
131{
132 fn start(&self) -> ActorResult {
133 let task = tokio::task::spawn(required_pools_loader_worker(
134 self.client.clone(),
135 self.pool_loaders.clone(),
136 self.pools.clone(),
137 self.required_state.clone(),
138 self.market.clone().unwrap(),
139 self.market_state.clone().unwrap(),
140 ));
141
142 Ok(vec![task])
143 }
144
145 fn name(&self) -> &'static str {
146 "RequiredPoolLoaderActor"
147 }
148}