kabu_defi_market/
required_pools_actor.rs

1use alloy_network::Network;
2use alloy_primitives::Address;
3use alloy_provider::Provider;
4use revm::DatabaseRef;
5use revm::{Database, DatabaseCommit};
6use std::marker::PhantomData;
7use std::sync::Arc;
8use tracing::{debug, error, info};
9
10use crate::pool_loader_actor::fetch_and_add_pool_by_pool_id;
11use kabu_core_actors::{Accessor, Actor, ActorResult, SharedState, WorkerResult};
12use kabu_core_actors_macros::{Accessor, Consumer};
13use kabu_core_blockchain::{Blockchain, BlockchainState};
14use kabu_evm_db::KabuDBError;
15use kabu_node_debug_provider::DebugProviderExt;
16use kabu_types_blockchain::KabuDataTypesEVM;
17use kabu_types_entities::required_state::{RequiredState, RequiredStateReader};
18use kabu_types_entities::{Market, MarketState, PoolClass, PoolId, PoolLoaders};
19
20async fn required_pools_loader_worker<P, N, DB, LDT>(
21    client: P,
22    pool_loaders: Arc<PoolLoaders<P, N, LDT>>,
23    pools: Vec<(PoolId, PoolClass)>,
24    required_state: Option<RequiredState>,
25    market: SharedState<Market>,
26    market_state: SharedState<MarketState<DB>>,
27) -> WorkerResult
28where
29    N: Network<TransactionRequest = LDT::TransactionRequest>,
30    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
31    DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
32    LDT: KabuDataTypesEVM + 'static,
33{
34    for (pool_id, pool_class) in pools {
35        debug!(class=%pool_class, %pool_id, "Loading pool");
36        match fetch_and_add_pool_by_pool_id(client.clone(), market.clone(), market_state.clone(), pool_loaders.clone(), pool_id, pool_class)
37            .await
38        {
39            Ok(_) => {
40                info!(class=%pool_class, %pool_id, "pool loaded")
41            }
42            Err(error) => {
43                error!(%error, "load_pool_with_provider")
44            }
45        }
46    }
47    //
48    //
49    //     match pool_class {
50    //         PoolClass::UniswapV2 | PoolClass::UniswapV3 => {
51    //             if let Err(error) =
52    //                 fetch_and_add_pool_by_pool_id(client.clone(), market.clone(), market_state.clone(), pool_address, pool_class).await
53    //             {
54    //                 error!(%error, address = %pool_address, "fetch_and_add_pool_by_address")
55    //             }
56    //         }
57    //         PoolClass::Curve => {
58    //             if let Ok(curve_contract) = CurveProtocol::get_contract_from_code(client.clone(), pool_address).await {
59    //                 let curve_pool = CurvePool::<P, T, N>::fetch_pool_data_with_default_encoder(client.clone(), curve_contract).await?;
60    //                 fetch_state_and_add_pool(client.clone(), market.clone(), market_state.clone(), curve_pool.into()).await?
61    //             } else {
62    //                 error!("CURVE_POOL_NOT_LOADED");
63    //             }
64    //         }
65    //         _ => {
66    //             error!("Unknown pool class")
67    //         }
68    //     }
69    //     debug!(class=%pool_class, address=%pool_address, "Loaded pool");
70    // }
71
72    if let Some(required_state) = required_state {
73        let update = RequiredStateReader::<LDT>::fetch_calls_and_slots(client.clone(), required_state, None).await?;
74        market_state.write().await.apply_geth_update(update);
75    }
76
77    Ok("required_pools_loader_worker".to_string())
78}
79
80#[derive(Accessor, Consumer)]
81pub struct RequiredPoolLoaderActor<P, N, DB, LDT>
82where
83    N: Network,
84    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
85    DB: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static,
86    LDT: KabuDataTypesEVM + 'static,
87{
88    client: P,
89    pool_loaders: Arc<PoolLoaders<P, N, LDT>>,
90    pools: Vec<(PoolId, PoolClass)>,
91    required_state: Option<RequiredState>,
92    #[accessor]
93    market: Option<SharedState<Market>>,
94    #[accessor]
95    market_state: Option<SharedState<MarketState<DB>>>,
96    _n: PhantomData<N>,
97}
98
99impl<P, N, DB, LDT> RequiredPoolLoaderActor<P, N, DB, LDT>
100where
101    N: Network,
102    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
103    DB: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static,
104    LDT: KabuDataTypesEVM + 'static,
105{
106    pub fn new(client: P, pool_loaders: Arc<PoolLoaders<P, N, LDT>>) -> Self {
107        Self { client, pools: Vec::new(), pool_loaders, required_state: None, market: None, market_state: None, _n: PhantomData }
108    }
109
110    pub fn with_pool_address(self, address: Address, pool_class: PoolClass) -> Self {
111        let mut pools = self.pools;
112        pools.push((PoolId::Address(address), pool_class));
113        Self { pools, ..self }
114    }
115
116    pub fn on_bc(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
117        Self { market: Some(bc.market()), market_state: Some(state.market_state_commit()), ..self }
118    }
119
120    pub fn with_required_state(self, required_state: RequiredState) -> Self {
121        Self { required_state: Some(required_state), ..self }
122    }
123}
124
125impl<P, N, DB, LDT> Actor for RequiredPoolLoaderActor<P, N, DB, LDT>
126where
127    N: Network<TransactionRequest = LDT::TransactionRequest>,
128    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
129    DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
130    LDT: KabuDataTypesEVM + 'static,
131{
132    fn start(&self) -> ActorResult {
133        let task = tokio::task::spawn(required_pools_loader_worker(
134            self.client.clone(),
135            self.pool_loaders.clone(),
136            self.pools.clone(),
137            self.required_state.clone(),
138            self.market.clone().unwrap(),
139            self.market_state.clone().unwrap(),
140        ));
141
142        Ok(vec![task])
143    }
144
145    fn name(&self) -> &'static str {
146        "RequiredPoolLoaderActor"
147    }
148}