kabu_strategy_backrun/
pending_tx_state_change_processor.rs

1use alloy_eips::BlockNumberOrTag;
2use alloy_network::Network;
3use alloy_primitives::{Address, BlockNumber, TxHash, U256};
4use alloy_provider::Provider;
5use alloy_rpc_types::state::StateOverride;
6use alloy_rpc_types::BlockOverrides;
7use alloy_rpc_types_trace::geth::GethDebugTracingCallOptions;
8use eyre::{eyre, Result};
9use lazy_static::lazy_static;
10use revm::{Database, DatabaseCommit, DatabaseRef};
11use std::collections::hash_map::Entry;
12use std::collections::HashMap;
13use std::marker::PhantomData;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::{debug, error, warn};
17
18use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
19use kabu_core_actors_macros::{Accessor, Consumer, Producer};
20use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
21use kabu_node_debug_provider::DebugProviderExt;
22use kabu_types_blockchain::{debug_trace_call_diff, GethStateUpdateVec, KabuDataTypesEVM, KabuTx, Mempool, TRACING_CALL_OPTS};
23use kabu_types_entities::required_state::{accounts_vec_len, storage_vec_len};
24use kabu_types_entities::{LatestBlock, Market, MarketState};
25use kabu_types_events::{MarketEvents, MempoolEvents, StateUpdateEvent};
26
27use super::affected_pools_code::{get_affected_pools_from_code, is_pool_code};
28use super::affected_pools_state::get_affected_pools_from_state_update;
29
30lazy_static! {
31    static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
32}
33
34/// Process a pending tx from the mempool
35#[allow(clippy::too_many_arguments)]
36pub async fn pending_tx_state_change_task<P, N, DB, LDT>(
37    client: P,
38    tx_hash: TxHash,
39    market: SharedState<Market>,
40    mempool: SharedState<Mempool<LDT>>,
41    latest_block: SharedState<LatestBlock<LDT>>,
42    market_state: SharedState<MarketState<DB>>,
43    affecting_tx: Arc<RwLock<HashMap<TxHash, bool>>>,
44    cur_block_number: BlockNumber,
45    cur_block_time: u64,
46    cur_next_base_fee: u64,
47    cur_state_override: StateOverride,
48    state_updates_broadcaster: Broadcaster<StateUpdateEvent<DB, LDT>>,
49) -> Result<()>
50where
51    N: Network<TransactionRequest = LDT::TransactionRequest>,
52    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
53    DB: DatabaseRef + Database + DatabaseCommit + Clone + Send + Sync + 'static,
54    LDT: KabuDataTypesEVM,
55{
56    let mut state_update_vec: GethStateUpdateVec = Vec::new();
57    let mut state_required_vec: GethStateUpdateVec = Vec::new();
58
59    let mut merged_state_update_vec: GethStateUpdateVec = Vec::new();
60
61    let mempool_tx = match mempool.read().await.get_tx_by_hash(&tx_hash).cloned() {
62        Some(tx) => tx,
63        None => return Err(eyre!("MEMPOOL_TX_NOT_FOUND")),
64    };
65
66    let tx = match mempool_tx.tx.clone() {
67        Some(tx) => tx,
68        None => return Err(eyre!("NO_TX_IN_MEMPOOL")),
69    };
70
71    let source = mempool_tx.source.clone();
72
73    let transaction_request: LDT::TransactionRequest = tx.to_transaction_request();
74
75    // let transaction_type = transaction_request.transaction_type.unwrap_or_default();
76    // if transaction_type == LEGACY_TX_TYPE_ID || transaction_type == EIP2930_TX_TYPE_ID {
77    //     match transaction_request.gas_price {
78    //         Some(g) => {
79    //             if g < cur_next_base_fee as u128 {
80    //                 transaction_request.set_gas_price(cur_next_base_fee as u128);
81    //             }
82    //         }
83    //         None => {
84    //             error!(
85    //                 "No gas price for gas_price={:?}, max_fee_per_gas={:?}, max_priority_fee_per_gas={:?}, hash={:?}",
86    //                 transaction_request.gas_price,
87    //                 transaction_request.max_fee_per_gas,
88    //                 transaction_request.max_priority_fee_per_gas,
89    //                 mempool_tx.tx_hash
90    //             );
91    //             return Err(eyre!("NO_GAS_PRICE"));
92    //         }
93    //     }
94    // } else if transaction_type == EIP1559_TX_TYPE_ID {
95    //     match transaction_request.max_fee_per_gas {
96    //         Some(g) => {
97    //             if g < cur_next_base_fee as u128 {
98    //                 transaction_request.set_max_fee_per_gas(cur_next_base_fee as u128);
99    //             }
100    //         }
101    //         None => {
102    //             error!(
103    //                 "No base fee for gas_price={:?}, max_fee_per_gas={:?}, max_priority_fee_per_gas={:?}, hash={:?}",
104    //                 transaction_request.gas_price,
105    //                 transaction_request.max_fee_per_gas,
106    //                 transaction_request.max_priority_fee_per_gas,
107    //                 mempool_tx.tx_hash
108    //             );
109    //             return Err(eyre!("NO_BASE_FEE"));
110    //         }
111    //     }
112    // } else if transaction_type == EIP4844_TX_TYPE_ID {
113    //     // ignore blob tx
114    //     debug!("Ignore EIP4844 transaction: hash={:?}", mempool_tx.tx_hash);
115    //     return Ok(());
116    // } else {
117    //     warn!("Unknown transaction type: type={}, hash={:?}", transaction_type, mempool_tx.tx_hash);
118    //     return Err(eyre!("UNKNOWN_TX_TYPE"));
119    // }
120
121    let call_opts: GethDebugTracingCallOptions = GethDebugTracingCallOptions {
122        block_overrides: Some(BlockOverrides {
123            number: Some(U256::from(cur_block_number)),
124            time: Some(cur_block_time),
125            coinbase: Some(*COINBASE),
126            base_fee: Some(U256::from(cur_next_base_fee)),
127            ..Default::default()
128        }),
129        state_overrides: Some(cur_state_override.clone()),
130        ..TRACING_CALL_OPTS.clone()
131    };
132
133    if !(*affecting_tx.read().await.get(&tx_hash).unwrap_or(&true)) {
134        return Err(eyre!("NON_AFFECTING_TX"));
135    }
136
137    let diff_trace_result =
138        debug_trace_call_diff(client.clone(), transaction_request, BlockNumberOrTag::Latest.into(), Some(call_opts)).await;
139    match diff_trace_result {
140        Ok((pre, post)) => {
141            state_required_vec.push(pre.clone());
142            state_update_vec.push(post.clone());
143
144            merged_state_update_vec.push(pre);
145            merged_state_update_vec.push(post);
146        }
147        Err(error) => {
148            let tx_hash = tx.get_tx_hash();
149            mempool.write().await.set_failed(tx_hash);
150            debug!(block=cur_block_number, %tx_hash, %error, "debug_trace_call error for");
151        }
152    }
153
154    let affected_pools = get_affected_pools_from_state_update(market.clone(), &state_update_vec).await;
155
156    let accounts_len = accounts_vec_len(&state_update_vec);
157    let storage_len = storage_vec_len(&state_update_vec);
158
159    debug!(%tx_hash, %source, pools = affected_pools.len(), accounts = accounts_len, storage = storage_len, "Mempool affected pools");
160
161    affecting_tx.write().await.insert(tx_hash, !affected_pools.is_empty());
162
163    //TODO : Fix Latest header is empty
164    if let Some(latest_header) = latest_block.read().await.block_header.clone() {
165        let next_block_number = latest_header.number + 1;
166        let next_block_timestamp = latest_header.timestamp + 12;
167
168        if !affected_pools.is_empty() {
169            let cur_state_db = market_state.read().await.state_db.clone();
170            let request = StateUpdateEvent::new(
171                next_block_number,
172                next_block_timestamp,
173                cur_next_base_fee,
174                cur_state_db,
175                state_update_vec,
176                Some(state_required_vec.clone()),
177                affected_pools,
178                vec![tx_hash],
179                vec![mempool_tx.tx.clone().unwrap()],
180                "pending_tx_searcher".to_string(),
181                9000,
182            );
183            if let Err(e) = state_updates_broadcaster.send(request) {
184                error!("state_updates_broadcaster : {}", e)
185            }
186        }
187    } else {
188        error!("Latest header is empty")
189    }
190
191    if is_pool_code(&merged_state_update_vec) {
192        match get_affected_pools_from_code(client, market.clone(), &merged_state_update_vec).await {
193            Ok(affected_pools) => {
194                match affecting_tx.write().await.entry(tx_hash) {
195                    Entry::Occupied(mut v) => {
196                        if !v.get() {
197                            v.insert(!affected_pools.is_empty());
198                        }
199                    }
200                    Entry::Vacant(v) => {
201                        v.insert(!affected_pools.is_empty());
202                    }
203                };
204
205                debug!("Mempool code pools {} {} update len : {}", tx_hash, source, affected_pools.len());
206
207                if let Some(latest_header) = latest_block.read().await.block_header.clone() {
208                    let block_number = latest_header.number + 1;
209                    let block_timestamp = latest_header.timestamp + 12;
210
211                    if !affected_pools.is_empty() {
212                        let cur_state_db = market_state.read().await.state_db.clone();
213
214                        let request = StateUpdateEvent::new(
215                            block_number,
216                            block_timestamp,
217                            cur_next_base_fee,
218                            cur_state_db,
219                            merged_state_update_vec,
220                            None,
221                            affected_pools,
222                            vec![tx_hash],
223                            vec![mempool_tx.tx.unwrap()],
224                            "poolcode_searcher".to_string(),
225                            3000,
226                        );
227                        if let Err(e) = state_updates_broadcaster.send(request) {
228                            error!("state_updates_broadcaster : {}", e)
229                        }
230                    }
231                } else {
232                    error!("Latest header is empty")
233                }
234            }
235            Err(e) => {
236                debug!("code affected pools error : {e}")
237            }
238        }
239    }
240    Ok(())
241}
242
243#[allow(clippy::too_many_arguments)]
244pub async fn pending_tx_state_change_worker<P, N, DB, LDT>(
245    client: P,
246    market: SharedState<Market>,
247    mempool: SharedState<Mempool<LDT>>,
248    latest_block: SharedState<LatestBlock<LDT>>,
249    market_state: SharedState<MarketState<DB>>,
250    mempool_events_rx: Broadcaster<MempoolEvents>,
251    market_events_rx: Broadcaster<MarketEvents>,
252    state_updates_broadcaster: Broadcaster<StateUpdateEvent<DB, LDT>>,
253) -> WorkerResult
254where
255    N: Network<TransactionRequest = LDT::TransactionRequest>,
256    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
257    DB: DatabaseRef + Database + DatabaseCommit + Clone + Send + Sync + 'static,
258    LDT: KabuDataTypesEVM,
259{
260    subscribe!(mempool_events_rx);
261    subscribe!(market_events_rx);
262
263    let affecting_tx: Arc<RwLock<HashMap<TxHash, bool>>> = Arc::new(RwLock::new(HashMap::new()));
264    let mut cur_next_base_fee = 0;
265    let mut cur_block_number: Option<BlockNumber> = None;
266    let mut cur_block_time: Option<u64> = None;
267    let mut cur_state_override: StateOverride = StateOverride::default();
268
269    loop {
270        tokio::select! {
271            msg = market_events_rx.recv() => {
272                if let Ok(msg) = msg {
273                    let market_event_msg : MarketEvents = msg;
274                    if let MarketEvents::BlockHeaderUpdate{ block_number, block_hash, timestamp, base_fee, next_base_fee } = market_event_msg {
275                        debug!("Block header update {} {} base_fee {} ", block_number, block_hash, base_fee);
276                        cur_block_number = Some( block_number + 1);
277                        cur_block_time = Some(timestamp + 12 );
278                        cur_next_base_fee = next_base_fee;
279
280                        for _counter in 0..5  {
281                            if let Ok(msg) = market_events_rx.recv().await {
282                                if matches!(msg, MarketEvents::BlockStateUpdate{..} ) {
283                                    cur_state_override = latest_block.read().await.node_state_override();
284                                    debug!("Block state update received {} {}", block_number, block_hash);
285                                    break;
286                                }
287                            }
288                        }
289                    }
290                }
291            }
292            msg = mempool_events_rx.recv() => {
293                if let Ok(msg) = msg {
294                    let mempool_event_msg : MempoolEvents = msg;
295                    if let MempoolEvents::MempoolActualTxUpdate{ tx_hash }  = mempool_event_msg {
296                        if cur_block_number.is_none() {
297                            warn!("Did not received block header update yet!");
298                            continue;
299                        }
300
301                        tokio::task::spawn(
302                            pending_tx_state_change_task(
303                                client.clone(),
304                                tx_hash,
305                                market.clone(),
306                                mempool.clone(),
307                                latest_block.clone(),
308                                market_state.clone(),
309                                affecting_tx.clone(),
310                                cur_block_number.unwrap_or_default(),
311                                cur_block_time.unwrap_or_default(),
312                                cur_next_base_fee,
313                                cur_state_override.clone(),
314                                state_updates_broadcaster.clone(),
315                            )
316                        );
317                    }
318                }
319            }
320        }
321    }
322}
323
324#[derive(Accessor, Consumer, Producer)]
325pub struct PendingTxStateChangeProcessorActor<P, N, DB: Clone + Send + Sync + 'static, LDT: KabuDataTypesEVM + 'static> {
326    client: P,
327    #[accessor]
328    market: Option<SharedState<Market>>,
329    #[accessor]
330    mempool: Option<SharedState<Mempool<LDT>>>,
331    #[accessor]
332    market_state: Option<SharedState<MarketState<DB>>>,
333    #[accessor]
334    latest_block: Option<SharedState<LatestBlock<LDT>>>,
335    #[consumer]
336    market_events_rx: Option<Broadcaster<MarketEvents>>,
337    #[consumer]
338    mempool_events_rx: Option<Broadcaster<MempoolEvents>>,
339    #[producer]
340    state_updates_tx: Option<Broadcaster<StateUpdateEvent<DB, LDT>>>,
341    _n: PhantomData<N>,
342}
343
344impl<P, N, DB, LDT> PendingTxStateChangeProcessorActor<P, N, DB, LDT>
345where
346    N: Network,
347    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
348    DB: DatabaseRef + Send + Sync + Clone + 'static,
349    LDT: KabuDataTypesEVM + 'static,
350{
351    pub fn new(client: P) -> PendingTxStateChangeProcessorActor<P, N, DB, LDT> {
352        PendingTxStateChangeProcessorActor {
353            client,
354            market: None,
355            mempool: None,
356            market_state: None,
357            latest_block: None,
358            market_events_rx: None,
359            mempool_events_rx: None,
360            state_updates_tx: None,
361            _n: PhantomData,
362        }
363    }
364
365    pub fn on_bc(self, bc: &Blockchain<LDT>, state: &BlockchainState<DB, LDT>, strategy: &Strategy<DB, LDT>) -> Self {
366        Self {
367            market: Some(bc.market()),
368            mempool: Some(bc.mempool()),
369            market_state: Some(state.market_state()),
370            latest_block: Some(bc.latest_block()),
371            market_events_rx: Some(bc.market_events_channel()),
372            mempool_events_rx: Some(bc.mempool_events_channel()),
373            state_updates_tx: Some(strategy.state_update_channel()),
374            ..self
375        }
376    }
377}
378
379impl<P, N, DB, LDT> Actor for PendingTxStateChangeProcessorActor<P, N, DB, LDT>
380where
381    N: Network<TransactionRequest = LDT::TransactionRequest>,
382    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
383    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
384    LDT: KabuDataTypesEVM + 'static,
385{
386    fn start(&self) -> ActorResult {
387        let task = tokio::task::spawn(pending_tx_state_change_worker(
388            self.client.clone(),
389            self.market.clone().unwrap(),
390            self.mempool.clone().unwrap(),
391            self.latest_block.clone().unwrap(),
392            self.market_state.clone().unwrap(),
393            self.mempool_events_rx.clone().unwrap(),
394            self.market_events_rx.clone().unwrap(),
395            self.state_updates_tx.clone().unwrap(),
396        ));
397        Ok(vec![task])
398    }
399
400    fn name(&self) -> &'static str {
401        "PendingTxStateChangeProcessorActor"
402    }
403}