kabu_types_entities/
pool_loader.rs

1use crate::pool_config::PoolsLoadingConfig;
2use crate::{PoolClass, PoolId, PoolWrapper};
3use alloy_network::{Ethereum, Network};
4use alloy_primitives::Bytes;
5use alloy_provider::Provider;
6use eyre::{eyre, Result};
7use kabu_evm_db::KabuDBError;
8use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM, KabuDataTypesEthereum};
9use revm::DatabaseRef;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio_stream::Stream;
15
16#[allow(clippy::type_complexity)]
17pub trait PoolLoader<P, N, LDT = KabuDataTypesEthereum>: Send + Sync + 'static
18where
19    N: Network,
20    P: Provider<N>,
21    LDT: Send + Sync + KabuDataTypes,
22{
23    fn get_pool_class_by_log(&self, log_entry: &LDT::Log) -> Option<(PoolId, PoolClass)>;
24    fn fetch_pool_by_id<'a>(&'a self, pool_id: PoolId) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>;
25    fn fetch_pool_by_id_from_provider<'a>(
26        &'a self,
27        pool_id: PoolId,
28        provider: P,
29    ) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>;
30    fn fetch_pool_by_id_from_evm(&self, pool_id: PoolId, db: &dyn DatabaseRef<Error = KabuDBError>) -> Result<PoolWrapper>;
31    fn is_code(&self, code: &Bytes) -> bool;
32    fn protocol_loader(&self) -> Result<Pin<Box<dyn Stream<Item = (PoolId, PoolClass)> + Send>>>;
33}
34
35pub struct PoolLoaders<P, N = Ethereum, LDT = KabuDataTypesEthereum>
36where
37    N: Network,
38    P: Provider<N> + 'static,
39    LDT: KabuDataTypes,
40{
41    provider: Option<P>,
42    config: Option<PoolsLoadingConfig>,
43    pub map: HashMap<PoolClass, Arc<dyn PoolLoader<P, N, LDT>>>,
44}
45
46impl<P, N, LDT> PoolLoaders<P, N, LDT>
47where
48    N: Network,
49    P: Provider<N> + 'static,
50    LDT: KabuDataTypes,
51{
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    pub fn with_config(self, config: PoolsLoadingConfig) -> Self {
57        Self { config: Some(config), ..self }
58    }
59
60    pub fn with_provider<NP: Provider<N>>(self, provider: NP) -> PoolLoaders<NP, N, LDT> {
61        PoolLoaders { provider: Some(provider), map: HashMap::new(), config: self.config }
62    }
63
64    pub fn add_loader<L: PoolLoader<P, N, LDT> + Send + Sync + Clone + 'static>(self, pool_class: PoolClass, loader: L) -> Self {
65        let mut map = self.map;
66        map.insert(pool_class, Arc::new(loader));
67        Self { map, ..self }
68    }
69}
70
71impl<P, N, LDT> Default for PoolLoaders<P, N, LDT>
72where
73    N: Network,
74    P: Provider<N> + 'static,
75    LDT: KabuDataTypes,
76{
77    fn default() -> Self {
78        Self { provider: None, map: Default::default(), config: None }
79    }
80}
81
82impl<P, N, LDT> PoolLoaders<P, N, LDT>
83where
84    N: Network,
85    P: Provider<N> + 'static,
86    LDT: KabuDataTypesEVM + 'static,
87{
88    pub fn determine_pool_class(&self, log_entry: &<KabuDataTypesEthereum as KabuDataTypes>::Log) -> Option<(PoolId, PoolClass)> {
89        for (pool_class, pool_loader) in self.map.iter() {
90            if let Some((pool_id, pool_class)) = pool_loader.get_pool_class_by_log(log_entry) {
91                return Some((pool_id, pool_class));
92            }
93        }
94        None
95    }
96
97    /*pub fn load_pool_with_provider<'a>(
98        &'a self,
99        provider: P,
100        pool_id: PoolId<KabuDataTypesEthereum>,
101        pool_class: &'a PoolClass,
102    ) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>
103    where
104        P: Provider<N>,
105    {
106        Box::pin(async move {
107            if let Some(pool_loader) = self.map.get(pool_class).cloned() {
108                pool_loader.fetch_pool_by_id_from_provider(provider, pool_id).await
109            } else {
110                Err(eyre!("POOL_CLASS_NOT_FOUND"))
111            }
112        })
113    }
114     */
115
116    pub fn load_pool_without_provider<'a>(
117        &'a self,
118        pool_id: PoolId,
119        pool_class: &'a PoolClass,
120    ) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>
121    where
122        P: Provider<N>,
123    {
124        Box::pin(async move {
125            if let Some(pool_loader) = self.map.get(pool_class).cloned() {
126                pool_loader.fetch_pool_by_id(pool_id).await
127            } else {
128                Err(eyre!("POOL_CLASS_NOT_FOUND"))
129            }
130        })
131    }
132}