kabu_defi_pools/loaders/
curve.rs1use crate::protocols::CurveProtocol;
2use crate::{pool_loader, CurvePool};
3use alloy::primitives::Bytes;
4use async_stream::stream;
5use eyre::eyre;
6use futures::Stream;
7use kabu_evm_db::KabuDBError;
8use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM, KabuDataTypesEthereum};
9use kabu_types_entities::{PoolClass, PoolId, PoolLoader, PoolWrapper};
10use revm::DatabaseRef;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tracing::error;
15
16pool_loader!(CurvePoolLoader);
17
18impl<P, N, LDT> PoolLoader<P, N, LDT> for CurvePoolLoader<P, N, LDT>
19where
20 N: Network,
21 P: Provider<N> + Clone + 'static,
22 LDT: KabuDataTypesEVM + 'static,
23{
24 fn get_pool_class_by_log(&self, _log_entry: &LDT::Log) -> Option<(PoolId, PoolClass)> {
25 None
26 }
27
28 fn fetch_pool_by_id<'a>(&'a self, pool_id: PoolId) -> Pin<Box<dyn Future<Output = eyre::Result<PoolWrapper>> + Send + 'a>> {
29 Box::pin(async move {
30 if let Some(provider) = &self.provider {
31 self.fetch_pool_by_id_from_provider(pool_id, provider.clone()).await
32 } else {
33 Err(eyre!("NO_PROVIDER"))
34 }
35 })
36 }
37
38 fn fetch_pool_by_id_from_provider<'a>(
39 &'a self,
40 pool_id: PoolId,
41 provider: P,
42 ) -> Pin<Box<dyn Future<Output = eyre::Result<PoolWrapper>> + Send + 'a>> {
43 Box::pin(async move {
44 let pool_address = match pool_id {
45 PoolId::Address(addr) => addr,
46 _ => return Err(eyre!("Pool ID must be an address for Curve pools")),
47 };
48 match CurveProtocol::get_contract_from_code(provider.clone(), pool_address).await {
49 Ok(curve_contract) => {
50 let curve_pool = CurvePool::<P, N>::fetch_pool_data_with_default_encoder(provider.clone(), curve_contract).await?;
51
52 Ok(PoolWrapper::new(Arc::new(curve_pool)))
53 }
54 Err(e) => {
55 error!("Error getting curve contract from code {} : {} ", pool_address, e);
56 Err(e)
57 }
58 }
59 })
60 }
61
62 fn fetch_pool_by_id_from_evm(&self, _pool_id: PoolId, _db: &dyn DatabaseRef<Error = KabuDBError>) -> eyre::Result<PoolWrapper> {
63 Err(eyre!("NOT_IMPLEMENTED"))
64 }
65
66 fn is_code(&self, _code: &Bytes) -> bool {
67 false
68 }
69
70 fn protocol_loader(&self) -> eyre::Result<Pin<Box<dyn Stream<Item = (PoolId, PoolClass)> + Send>>> {
71 let provider_clone = self.provider.clone();
72
73 if let Some(client) = provider_clone {
74 Ok(Box::pin(stream! {
75 let curve_contracts = CurveProtocol::get_contracts_vec(client.clone());
76 for curve_contract in curve_contracts.iter() {
77 yield (PoolId::Address(curve_contract.get_address()), PoolClass::Curve)
78 }
79
80 for factory_idx in 0..10 {
81 if let Ok(factory_address) = CurveProtocol::get_factory_address(client.clone(), factory_idx).await {
82 if let Ok(pool_count) = CurveProtocol::get_pool_count(client.clone(), factory_address).await {
83 for pool_id in 0..pool_count {
84 if let Ok(addr) = CurveProtocol::get_pool_address(client.clone(), factory_address, pool_id).await {
85 yield (PoolId::Address(addr), PoolClass::Curve)
86 }
87 }
88 }
89 }
90 }
91 }))
92 } else {
93 Err(eyre!("NO_PROVIDER"))
94 }
95 }
96}