kabu_strategy_backrun/
affected_pools_code.rs

1use alloy_eips::BlockNumberOrTag;
2use alloy_network::Network;
3use alloy_provider::Provider;
4use eyre::eyre;
5use kabu_core_actors::SharedState;
6use kabu_defi_pools::protocols::{UniswapV2Protocol, UniswapV3Protocol};
7use kabu_defi_pools::state_readers::UniswapV3EvmStateReader;
8use kabu_defi_pools::{MaverickPool, PancakeV3Pool, UniswapV2Pool, UniswapV3Pool};
9use kabu_evm_db::{AlloyDB, KabuDB};
10use kabu_types_blockchain::GethStateUpdateVec;
11use kabu_types_entities::{get_protocol_by_factory, Market, MarketState, Pool, PoolId, PoolProtocol, PoolWrapper, SwapDirection};
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use tracing::{debug, error};
15
16pub async fn get_affected_pools_from_code<P, N>(
17    client: P,
18    market: SharedState<Market>,
19    state_update: &GethStateUpdateVec,
20) -> eyre::Result<BTreeMap<PoolWrapper, Vec<SwapDirection>>>
21where
22    N: Network,
23    P: Provider<N> + Send + Sync + Clone + 'static,
24{
25    let mut market_state = MarketState::new(KabuDB::new());
26
27    market_state.state_db.apply_geth_state_update(state_update, true, false);
28
29    let mut ret: BTreeMap<PoolWrapper, Vec<SwapDirection>> = BTreeMap::new();
30
31    for state_update_record in state_update.iter() {
32        for (address, state_update_entry) in state_update_record.iter() {
33            if let Some(code) = &state_update_entry.code {
34                if UniswapV2Protocol::is_code(code) {
35                    match market.read().await.get_pool(&PoolId::Address(*address)) {
36                        None => {
37                            debug!(?address, "Loading UniswapV2 class pool");
38
39                            let ext_db = AlloyDB::new(client.clone(), BlockNumberOrTag::Latest.into());
40
41                            let Some(ext_db) = ext_db else {
42                                error!("Cannot create AlloyDB");
43                                continue;
44                            };
45
46                            let state_db = market_state.state_db.clone().with_ext_db(ext_db);
47
48                            match UniswapV3EvmStateReader::factory(&state_db, *address) {
49                                Ok(_factory_address) => match UniswapV2Pool::fetch_pool_data_evm(&state_db, *address) {
50                                    Ok(pool) => {
51                                        let pool = PoolWrapper::new(Arc::new(pool));
52                                        let protocol = pool.get_protocol();
53                                        let swap_directions = pool.get_swap_directions();
54
55                                        debug!(%address, %protocol, ?swap_directions, "UniswapV2 pool loaded");
56                                        ret.insert(pool, swap_directions);
57                                    }
58                                    Err(err) => {
59                                        error!(?address, %err, "Error loading UniswapV2 pool");
60                                    }
61                                },
62                                Err(err) => {
63                                    error!(?address, %err, "Error loading UniswapV2 factory for pool")
64                                }
65                            }
66                        }
67                        Some(pool) => {
68                            debug!(?address, protocol = ?pool.get_protocol(), "Pool already exists");
69                        }
70                    }
71                }
72
73                if UniswapV3Protocol::is_code(code) {
74                    match market.read().await.get_pool(&PoolId::Address(*address)) {
75                        None => {
76                            debug!(%address, "Loading UniswapV3 class pool");
77
78                            let ext_db = AlloyDB::new(client.clone(), BlockNumberOrTag::Latest.into());
79
80                            let Some(ext_db) = ext_db else {
81                                error!("Cannot create AlloyDB");
82                                continue;
83                            };
84
85                            let state_db = market_state.state_db.clone().with_ext_db(ext_db);
86
87                            match UniswapV3EvmStateReader::factory(&state_db, *address) {
88                                Ok(factory_address) => {
89                                    match get_protocol_by_factory(factory_address) {
90                                        PoolProtocol::PancakeV3 => {
91                                            let pool = PancakeV3Pool::fetch_pool_data_evm(&state_db, *address);
92                                            match pool {
93                                                Ok(pool) => {
94                                                    let swap_directions = pool.get_swap_directions();
95                                                    let protocol = pool.get_protocol();
96                                                    debug!(?address, %protocol, ?swap_directions, "PancakeV3 Pool loaded");
97                                                    ret.insert(PoolWrapper::new(Arc::new(pool)), swap_directions);
98                                                }
99                                                Err(err) => {
100                                                    error!(?address, %err, "Error loading PancakeV3 pool");
101                                                }
102                                            }
103                                        }
104                                        PoolProtocol::Maverick => {
105                                            let pool = MaverickPool::fetch_pool_data_evm(&state_db, *address);
106                                            match pool {
107                                                Ok(pool) => {
108                                                    let pool = PoolWrapper::new(Arc::new(pool));
109                                                    let swap_directions = pool.get_swap_directions();
110                                                    let protocol = pool.get_protocol();
111                                                    debug!(?address, %protocol, ?swap_directions, "Maverick Pool loaded");
112
113                                                    ret.insert(pool, swap_directions);
114                                                }
115                                                Err(err) => {
116                                                    error!(?address, %err, "Error loading Maverick pool");
117                                                }
118                                            }
119                                        }
120                                        _ => match UniswapV3Pool::fetch_pool_data_evm(&state_db, *address) {
121                                            Ok(pool) => {
122                                                let pool = PoolWrapper::new(Arc::new(pool));
123                                                let swap_directions = pool.get_swap_directions();
124                                                let protocol = pool.get_protocol();
125                                                debug!(
126                                                    %address,
127                                                    %protocol,
128                                                    ?swap_directions,
129                                                    "UniswapV3 Pool loaded"
130                                                );
131                                                ret.insert(pool, swap_directions);
132                                            }
133                                            Err(err) => {
134                                                error!(%address, %err, "Error loading UniswapV3 pool");
135                                            }
136                                        },
137                                    };
138                                }
139                                Err(err) => {
140                                    error!(?address, %err, "Error loading UniswapV3 factory for pool")
141                                }
142                            }
143                        }
144                        Some(pool) => {
145                            debug!(?address, protocol = ?pool.get_protocol(), "Pool already exists")
146                        }
147                    }
148                }
149            }
150        }
151    }
152    if !ret.is_empty() {
153        Ok(ret)
154    } else {
155        Err(eyre!("NO_POOLS_LOADED"))
156    }
157}
158
159/// Check if the state update code contains code for a UniswapV2 pair or UniswapV3 pool by looking for method signatures.
160pub fn is_pool_code(state_update: &GethStateUpdateVec) -> bool {
161    for state_update_record in state_update.iter() {
162        for (_address, state_update_entry) in state_update_record.iter() {
163            if let Some(code) = &state_update_entry.code {
164                if UniswapV3Protocol::is_code(code) {
165                    return true;
166                }
167                if UniswapV2Protocol::is_code(code) {
168                    return true;
169                }
170            }
171        }
172    }
173    false
174}