kabu_types_entities/
pool_loader.rs1use crate::pool_config::PoolsLoadingConfig;
2use crate::{PoolClass, PoolId, PoolWrapper};
3use alloy_network::{Ethereum, Network};
4use alloy_primitives::Bytes;
5use alloy_provider::Provider;
6use eyre::{eyre, Result};
7use kabu_evm_db::KabuDBError;
8use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM, KabuDataTypesEthereum};
9use revm::DatabaseRef;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio_stream::Stream;
15
16#[allow(clippy::type_complexity)]
17pub trait PoolLoader<P, N, LDT = KabuDataTypesEthereum>: Send + Sync + 'static
18where
19 N: Network,
20 P: Provider<N>,
21 LDT: Send + Sync + KabuDataTypes,
22{
23 fn get_pool_class_by_log(&self, log_entry: &LDT::Log) -> Option<(PoolId, PoolClass)>;
24 fn fetch_pool_by_id<'a>(&'a self, pool_id: PoolId) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>;
25 fn fetch_pool_by_id_from_provider<'a>(
26 &'a self,
27 pool_id: PoolId,
28 provider: P,
29 ) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>;
30 fn fetch_pool_by_id_from_evm(&self, pool_id: PoolId, db: &dyn DatabaseRef<Error = KabuDBError>) -> Result<PoolWrapper>;
31 fn is_code(&self, code: &Bytes) -> bool;
32 fn protocol_loader(&self) -> Result<Pin<Box<dyn Stream<Item = (PoolId, PoolClass)> + Send>>>;
33}
34
35pub struct PoolLoaders<P, N = Ethereum, LDT = KabuDataTypesEthereum>
36where
37 N: Network,
38 P: Provider<N> + 'static,
39 LDT: KabuDataTypes,
40{
41 provider: Option<P>,
42 config: Option<PoolsLoadingConfig>,
43 pub map: HashMap<PoolClass, Arc<dyn PoolLoader<P, N, LDT>>>,
44}
45
46impl<P, N, LDT> PoolLoaders<P, N, LDT>
47where
48 N: Network,
49 P: Provider<N> + 'static,
50 LDT: KabuDataTypes,
51{
52 pub fn new() -> Self {
53 Self::default()
54 }
55
56 pub fn with_config(self, config: PoolsLoadingConfig) -> Self {
57 Self { config: Some(config), ..self }
58 }
59
60 pub fn with_provider<NP: Provider<N>>(self, provider: NP) -> PoolLoaders<NP, N, LDT> {
61 PoolLoaders { provider: Some(provider), map: HashMap::new(), config: self.config }
62 }
63
64 pub fn add_loader<L: PoolLoader<P, N, LDT> + Send + Sync + Clone + 'static>(self, pool_class: PoolClass, loader: L) -> Self {
65 let mut map = self.map;
66 map.insert(pool_class, Arc::new(loader));
67 Self { map, ..self }
68 }
69}
70
71impl<P, N, LDT> Default for PoolLoaders<P, N, LDT>
72where
73 N: Network,
74 P: Provider<N> + 'static,
75 LDT: KabuDataTypes,
76{
77 fn default() -> Self {
78 Self { provider: None, map: Default::default(), config: None }
79 }
80}
81
82impl<P, N, LDT> PoolLoaders<P, N, LDT>
83where
84 N: Network,
85 P: Provider<N> + 'static,
86 LDT: KabuDataTypesEVM + 'static,
87{
88 pub fn determine_pool_class(&self, log_entry: &<KabuDataTypesEthereum as KabuDataTypes>::Log) -> Option<(PoolId, PoolClass)> {
89 for (pool_class, pool_loader) in self.map.iter() {
90 if let Some((pool_id, pool_class)) = pool_loader.get_pool_class_by_log(log_entry) {
91 return Some((pool_id, pool_class));
92 }
93 }
94 None
95 }
96
97 pub fn load_pool_without_provider<'a>(
117 &'a self,
118 pool_id: PoolId,
119 pool_class: &'a PoolClass,
120 ) -> Pin<Box<dyn Future<Output = Result<PoolWrapper>> + Send + 'a>>
121 where
122 P: Provider<N>,
123 {
124 Box::pin(async move {
125 if let Some(pool_loader) = self.map.get(pool_class).cloned() {
126 pool_loader.fetch_pool_by_id(pool_id).await
127 } else {
128 Err(eyre!("POOL_CLASS_NOT_FOUND"))
129 }
130 })
131 }
132}