kabu_node_player/
worker.rs

1use crate::mempool::replayer_mempool_task;
2use alloy_eips::BlockId;
3use alloy_network::Ethereum;
4use alloy_primitives::BlockNumber;
5use alloy_provider::Provider;
6use alloy_rpc_types::{BlockTransactions, Filter};
7use kabu_core_actors::{Broadcaster, SharedState, WorkerResult};
8use kabu_evm_db::{DatabaseKabuExt, KabuDBError};
9use kabu_node_debug_provider::DebugProviderExt;
10use kabu_types_blockchain::{debug_trace_block, KabuDataTypesEthereum, Mempool};
11use kabu_types_entities::MarketState;
12use kabu_types_events::{
13    BlockHeaderEventData, BlockLogs, BlockStateUpdate, BlockUpdate, Message, MessageBlock, MessageBlockHeader, MessageBlockLogs,
14    MessageBlockStateUpdate,
15};
16use revm::{Database, DatabaseCommit, DatabaseRef};
17use std::ops::RangeInclusive;
18use std::time::Duration;
19use tracing::{debug, error};
20
21#[allow(clippy::too_many_arguments)]
22pub async fn node_player_worker<P, DB>(
23    provider: P,
24    start_block: BlockNumber,
25    end_block: BlockNumber,
26    mempool: Option<SharedState<Mempool>>,
27    market_state: Option<SharedState<MarketState<DB>>>,
28    new_block_headers_channel: Option<Broadcaster<MessageBlockHeader>>,
29    new_block_with_tx_channel: Option<Broadcaster<MessageBlock>>,
30    new_block_logs_channel: Option<Broadcaster<MessageBlockLogs>>,
31    new_block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate>>,
32) -> WorkerResult
33where
34    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
35    DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + DatabaseKabuExt + 'static,
36{
37    for curblock_number in RangeInclusive::new(start_block, end_block) {
38        //let curblock_number = provider.client().transport().fetch_next_block().await?;
39        let block = provider.get_block_by_number(curblock_number.into()).await?;
40
41        if let Some(block) = block {
42            let block_header = block.header.clone();
43            let curblock_hash = block.header.hash;
44
45            if let Some(mempool) = mempool.clone() {
46                let mut mempool_guard = mempool.write().await;
47                for tx_hash in mempool_guard.txs.clone().keys() {
48                    if mempool_guard.is_mined(tx_hash) {
49                        //mempool_guard.remove_tx(tx_hash);
50                    } else {
51                        mempool_guard.set_mined(*tx_hash, curblock_number);
52                    }
53                }
54
55                //mempool_guard.clean_txs(curblock_number - 1, DateTime::<Utc>::MIN_UTC);
56                debug!("Mempool cleaned");
57            }
58
59            // Processing mempool tx to update state
60            if let Some(mempool) = mempool.clone() {
61                if let Some(market_state) = market_state.clone() {
62                    if let Err(e) = replayer_mempool_task(mempool, market_state, block.header.clone()).await {
63                        error!("process_mempool_task : {e}");
64                    }
65                };
66            };
67
68            if let Some(block_headers_channel) = &new_block_headers_channel {
69                if let Err(e) =
70                    block_headers_channel.send(Message::new_with_time(BlockHeaderEventData::<KabuDataTypesEthereum>::new(block.header)))
71                {
72                    error!("new_block_headers_channel.send error: {e}");
73                }
74            }
75            if let Some(block_with_tx_channel) = &new_block_with_tx_channel {
76                match provider.get_block_by_hash(curblock_hash).full().await {
77                    Ok(block) => {
78                        if let Some(block) = block {
79                            let mut txs = if let Some(mempool) = mempool.clone() {
80                                let guard = mempool.read().await;
81
82                                if !guard.is_empty() {
83                                    guard.filter_on_block(curblock_number).into_iter().flat_map(|x| x.tx.clone()).collect()
84                                } else {
85                                    vec![]
86                                }
87                            } else {
88                                vec![]
89                            };
90
91                            if txs.is_empty() {
92                                let block_update = BlockUpdate { block };
93                                if let Err(e) = block_with_tx_channel.send(Message::new_with_time(block_update)) {
94                                    error!("new_block_with_tx_channel.send error: {e}");
95                                }
96                            } else if let Some(block_txs) = block.transactions.as_transactions() {
97                                txs.extend(block_txs.iter().cloned());
98                                let mut block = block;
99
100                                block.transactions = BlockTransactions::Full(txs);
101                                let block_update = BlockUpdate { block };
102                                if let Err(e) = block_with_tx_channel.send(Message::new_with_time(block_update)) {
103                                    error!("new_block_with_tx_channel.send updated block error: {e}");
104                                }
105                            }
106                        } else {
107                            error!("Block is empty")
108                        }
109                    }
110                    Err(e) => {
111                        error!("get_logs error: {e}")
112                    }
113                }
114            }
115
116            if let Some(block_logs_channel) = &new_block_logs_channel {
117                let filter = Filter::new().at_block_hash(curblock_hash);
118
119                let mut logs = if let Some(mempool) = mempool.clone() {
120                    let guard = mempool.read().await;
121
122                    if !guard.is_empty() {
123                        guard.filter_on_block(curblock_number).into_iter().flat_map(|x| x.logs.clone().unwrap_or_default()).collect()
124                    } else {
125                        vec![]
126                    }
127                } else {
128                    vec![]
129                };
130
131                match provider.get_logs(&filter).await {
132                    Ok(block_logs) => {
133                        debug!("Mempool logs : {}", logs.len());
134                        logs.extend(block_logs);
135                        let logs_update = BlockLogs { block_header: block_header.clone(), logs };
136                        if let Err(e) = block_logs_channel.send(Message::new_with_time(logs_update)) {
137                            error!("new_block_logs_channel.send error: {e}");
138                        }
139                    }
140                    Err(e) => {
141                        error!("get_logs error: {e}")
142                    }
143                }
144            }
145
146            if let Some(block_state_update_channel) = &new_block_state_update_channel {
147                if let Some(mempool) = mempool.clone() {
148                    if let Some(market_state) = market_state.clone() {
149                        let mempool_guard = mempool.read().await;
150                        let txes = mempool_guard.filter_on_block(curblock_number);
151
152                        if !txes.is_empty() {
153                            let mut marker_state_guard = market_state.write().await;
154                            for mempool_tx in txes {
155                                if let Some(state_update) = &mempool_tx.state_update {
156                                    marker_state_guard.apply_geth_update(state_update.clone());
157                                }
158                            }
159                            marker_state_guard.state_db = marker_state_guard.state_db.clone().maintain();
160                        }
161                    }
162                }
163
164                match debug_trace_block(provider.clone(), BlockId::Hash(curblock_hash.into()), true).await {
165                    Ok((_, post)) => {
166                        if let Err(e) =
167                            block_state_update_channel.send(Message::new_with_time(BlockStateUpdate { block_header, state_update: post }))
168                        {
169                            error!("new_block_state_update_channel error: {e}");
170                        }
171                    }
172                    Err(e) => {
173                        error!("debug_trace_block error : {e}")
174                    }
175                }
176            }
177        }
178
179        tokio::time::sleep(Duration::from_millis(1000)).await;
180    }
181
182    Ok("Node block player worker finished".to_string())
183}