kabu_defi_pools/loaders/
curve.rs

1use crate::protocols::CurveProtocol;
2use crate::{pool_loader, CurvePool};
3use alloy::primitives::Bytes;
4use async_stream::stream;
5use eyre::eyre;
6use futures::Stream;
7use kabu_evm_db::KabuDBError;
8use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM, KabuDataTypesEthereum};
9use kabu_types_entities::{PoolClass, PoolId, PoolLoader, PoolWrapper};
10use revm::DatabaseRef;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tracing::error;
15
16pool_loader!(CurvePoolLoader);
17
18impl<P, N, LDT> PoolLoader<P, N, LDT> for CurvePoolLoader<P, N, LDT>
19where
20    N: Network,
21    P: Provider<N> + Clone + 'static,
22    LDT: KabuDataTypesEVM + 'static,
23{
24    fn get_pool_class_by_log(&self, _log_entry: &LDT::Log) -> Option<(PoolId, PoolClass)> {
25        None
26    }
27
28    fn fetch_pool_by_id<'a>(&'a self, pool_id: PoolId) -> Pin<Box<dyn Future<Output = eyre::Result<PoolWrapper>> + Send + 'a>> {
29        Box::pin(async move {
30            if let Some(provider) = &self.provider {
31                self.fetch_pool_by_id_from_provider(pool_id, provider.clone()).await
32            } else {
33                Err(eyre!("NO_PROVIDER"))
34            }
35        })
36    }
37
38    fn fetch_pool_by_id_from_provider<'a>(
39        &'a self,
40        pool_id: PoolId,
41        provider: P,
42    ) -> Pin<Box<dyn Future<Output = eyre::Result<PoolWrapper>> + Send + 'a>> {
43        Box::pin(async move {
44            let pool_address = match pool_id {
45                PoolId::Address(addr) => addr,
46                _ => return Err(eyre!("Pool ID must be an address for Curve pools")),
47            };
48            match CurveProtocol::get_contract_from_code(provider.clone(), pool_address).await {
49                Ok(curve_contract) => {
50                    let curve_pool = CurvePool::<P, N>::fetch_pool_data_with_default_encoder(provider.clone(), curve_contract).await?;
51
52                    Ok(PoolWrapper::new(Arc::new(curve_pool)))
53                }
54                Err(e) => {
55                    error!("Error getting curve contract from code {} : {} ", pool_address, e);
56                    Err(e)
57                }
58            }
59        })
60    }
61
62    fn fetch_pool_by_id_from_evm(&self, _pool_id: PoolId, _db: &dyn DatabaseRef<Error = KabuDBError>) -> eyre::Result<PoolWrapper> {
63        Err(eyre!("NOT_IMPLEMENTED"))
64    }
65
66    fn is_code(&self, _code: &Bytes) -> bool {
67        false
68    }
69
70    fn protocol_loader(&self) -> eyre::Result<Pin<Box<dyn Stream<Item = (PoolId, PoolClass)> + Send>>> {
71        let provider_clone = self.provider.clone();
72
73        if let Some(client) = provider_clone {
74            Ok(Box::pin(stream! {
75                let curve_contracts = CurveProtocol::get_contracts_vec(client.clone());
76                for curve_contract in curve_contracts.iter() {
77                    yield (PoolId::Address(curve_contract.get_address()), PoolClass::Curve)
78                }
79
80                for factory_idx in 0..10 {
81                    if let Ok(factory_address) = CurveProtocol::get_factory_address(client.clone(), factory_idx).await {
82                        if let Ok(pool_count) = CurveProtocol::get_pool_count(client.clone(), factory_address).await {
83                            for pool_id in 0..pool_count {
84                                if let Ok(addr) = CurveProtocol::get_pool_address(client.clone(), factory_address, pool_id).await {
85                                    yield (PoolId::Address(addr), PoolClass::Curve)
86                                }
87                            }
88                        }
89                    }
90                }
91            }))
92        } else {
93            Err(eyre!("NO_PROVIDER"))
94        }
95    }
96}