kabu_strategy_backrun/
affected_pools_code.rs1use alloy_eips::BlockNumberOrTag;
2use alloy_network::Network;
3use alloy_provider::Provider;
4use eyre::eyre;
5use kabu_core_actors::SharedState;
6use kabu_defi_pools::protocols::{UniswapV2Protocol, UniswapV3Protocol};
7use kabu_defi_pools::state_readers::UniswapV3EvmStateReader;
8use kabu_defi_pools::{MaverickPool, PancakeV3Pool, UniswapV2Pool, UniswapV3Pool};
9use kabu_evm_db::{AlloyDB, KabuDB};
10use kabu_types_blockchain::GethStateUpdateVec;
11use kabu_types_entities::{get_protocol_by_factory, Market, MarketState, Pool, PoolId, PoolProtocol, PoolWrapper, SwapDirection};
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use tracing::{debug, error};
15
16pub async fn get_affected_pools_from_code<P, N>(
17 client: P,
18 market: SharedState<Market>,
19 state_update: &GethStateUpdateVec,
20) -> eyre::Result<BTreeMap<PoolWrapper, Vec<SwapDirection>>>
21where
22 N: Network,
23 P: Provider<N> + Send + Sync + Clone + 'static,
24{
25 let mut market_state = MarketState::new(KabuDB::new());
26
27 market_state.state_db.apply_geth_state_update(state_update, true, false);
28
29 let mut ret: BTreeMap<PoolWrapper, Vec<SwapDirection>> = BTreeMap::new();
30
31 for state_update_record in state_update.iter() {
32 for (address, state_update_entry) in state_update_record.iter() {
33 if let Some(code) = &state_update_entry.code {
34 if UniswapV2Protocol::is_code(code) {
35 match market.read().await.get_pool(&PoolId::Address(*address)) {
36 None => {
37 debug!(?address, "Loading UniswapV2 class pool");
38
39 let ext_db = AlloyDB::new(client.clone(), BlockNumberOrTag::Latest.into());
40
41 let Some(ext_db) = ext_db else {
42 error!("Cannot create AlloyDB");
43 continue;
44 };
45
46 let state_db = market_state.state_db.clone().with_ext_db(ext_db);
47
48 match UniswapV3EvmStateReader::factory(&state_db, *address) {
49 Ok(_factory_address) => match UniswapV2Pool::fetch_pool_data_evm(&state_db, *address) {
50 Ok(pool) => {
51 let pool = PoolWrapper::new(Arc::new(pool));
52 let protocol = pool.get_protocol();
53 let swap_directions = pool.get_swap_directions();
54
55 debug!(%address, %protocol, ?swap_directions, "UniswapV2 pool loaded");
56 ret.insert(pool, swap_directions);
57 }
58 Err(err) => {
59 error!(?address, %err, "Error loading UniswapV2 pool");
60 }
61 },
62 Err(err) => {
63 error!(?address, %err, "Error loading UniswapV2 factory for pool")
64 }
65 }
66 }
67 Some(pool) => {
68 debug!(?address, protocol = ?pool.get_protocol(), "Pool already exists");
69 }
70 }
71 }
72
73 if UniswapV3Protocol::is_code(code) {
74 match market.read().await.get_pool(&PoolId::Address(*address)) {
75 None => {
76 debug!(%address, "Loading UniswapV3 class pool");
77
78 let ext_db = AlloyDB::new(client.clone(), BlockNumberOrTag::Latest.into());
79
80 let Some(ext_db) = ext_db else {
81 error!("Cannot create AlloyDB");
82 continue;
83 };
84
85 let state_db = market_state.state_db.clone().with_ext_db(ext_db);
86
87 match UniswapV3EvmStateReader::factory(&state_db, *address) {
88 Ok(factory_address) => {
89 match get_protocol_by_factory(factory_address) {
90 PoolProtocol::PancakeV3 => {
91 let pool = PancakeV3Pool::fetch_pool_data_evm(&state_db, *address);
92 match pool {
93 Ok(pool) => {
94 let swap_directions = pool.get_swap_directions();
95 let protocol = pool.get_protocol();
96 debug!(?address, %protocol, ?swap_directions, "PancakeV3 Pool loaded");
97 ret.insert(PoolWrapper::new(Arc::new(pool)), swap_directions);
98 }
99 Err(err) => {
100 error!(?address, %err, "Error loading PancakeV3 pool");
101 }
102 }
103 }
104 PoolProtocol::Maverick => {
105 let pool = MaverickPool::fetch_pool_data_evm(&state_db, *address);
106 match pool {
107 Ok(pool) => {
108 let pool = PoolWrapper::new(Arc::new(pool));
109 let swap_directions = pool.get_swap_directions();
110 let protocol = pool.get_protocol();
111 debug!(?address, %protocol, ?swap_directions, "Maverick Pool loaded");
112
113 ret.insert(pool, swap_directions);
114 }
115 Err(err) => {
116 error!(?address, %err, "Error loading Maverick pool");
117 }
118 }
119 }
120 _ => match UniswapV3Pool::fetch_pool_data_evm(&state_db, *address) {
121 Ok(pool) => {
122 let pool = PoolWrapper::new(Arc::new(pool));
123 let swap_directions = pool.get_swap_directions();
124 let protocol = pool.get_protocol();
125 debug!(
126 %address,
127 %protocol,
128 ?swap_directions,
129 "UniswapV3 Pool loaded"
130 );
131 ret.insert(pool, swap_directions);
132 }
133 Err(err) => {
134 error!(%address, %err, "Error loading UniswapV3 pool");
135 }
136 },
137 };
138 }
139 Err(err) => {
140 error!(?address, %err, "Error loading UniswapV3 factory for pool")
141 }
142 }
143 }
144 Some(pool) => {
145 debug!(?address, protocol = ?pool.get_protocol(), "Pool already exists")
146 }
147 }
148 }
149 }
150 }
151 }
152 if !ret.is_empty() {
153 Ok(ret)
154 } else {
155 Err(eyre!("NO_POOLS_LOADED"))
156 }
157}
158
159pub fn is_pool_code(state_update: &GethStateUpdateVec) -> bool {
161 for state_update_record in state_update.iter() {
162 for (_address, state_update_entry) in state_update_record.iter() {
163 if let Some(code) = &state_update_entry.code {
164 if UniswapV3Protocol::is_code(code) {
165 return true;
166 }
167 if UniswapV2Protocol::is_code(code) {
168 return true;
169 }
170 }
171 }
172 }
173 false
174}