kabu_node_player/
mempool.rs

1use alloy_rpc_types::Header;
2use eyre::Result;
3use kabu_core_actors::SharedState;
4use kabu_evm_db::KabuDBError;
5use kabu_evm_utils::evm_env::header_to_block_env;
6use kabu_types_blockchain::Mempool;
7use kabu_types_entities::MarketState;
8use revm::{Database, DatabaseCommit, DatabaseRef};
9use tracing::debug;
10
11pub(crate) async fn replayer_mempool_task<DB>(
12    mempool: SharedState<Mempool>,
13    market_state: SharedState<MarketState<DB>>,
14    header: Header,
15) -> Result<()>
16where
17    DB: DatabaseRef<Error = KabuDBError> + DatabaseCommit + Database<Error = KabuDBError> + Send + Sync + Clone + 'static,
18{
19    let mut mempool_guard = mempool.write().await;
20    debug!("process_mempool_task");
21
22    if !mempool_guard.is_empty() {
23        debug!("Mempool is not empty : {}", mempool_guard.len());
24        let _market_state_guard = market_state.write().await;
25
26        for (_tx_hash, mempool_tx) in mempool_guard.txs.iter_mut() {
27            if mempool_tx.mined == Some(header.number) {
28                let _block_env = header_to_block_env(&header);
29
30                // TODO: Fix me
31                /*
32                let result_and_state = evm_call(&market_state_guard.state_db, mempool_tx.tx.clone().unwrap())?;
33                let (logs, state_update) = convert_evm_result_to_rpc(
34                    result_and_state,
35                    *tx_hash,
36                    BlockNumHash { number: header.number, hash: header.hash },
37                    header.timestamp,
38                )?;
39                info!("Updating state for mempool tx {} logs: {} state_updates : {}", tx_hash, logs.len(), state_update.len());
40
41                mempool_tx.logs = Some(logs);
42                mempool_tx.state_update = Some(state_update);
43
44                 */
45            }
46        }
47    } else {
48        debug!("Mempool is empty");
49    }
50    Ok(())
51}