kabu_node_player/
worker.rs1use crate::mempool::replayer_mempool_task;
2use alloy_eips::BlockId;
3use alloy_network::Ethereum;
4use alloy_primitives::BlockNumber;
5use alloy_provider::Provider;
6use alloy_rpc_types::{BlockTransactions, Filter};
7use kabu_core_actors::{Broadcaster, SharedState, WorkerResult};
8use kabu_evm_db::{DatabaseKabuExt, KabuDBError};
9use kabu_node_debug_provider::DebugProviderExt;
10use kabu_types_blockchain::{debug_trace_block, KabuDataTypesEthereum, Mempool};
11use kabu_types_entities::MarketState;
12use kabu_types_events::{
13 BlockHeaderEventData, BlockLogs, BlockStateUpdate, BlockUpdate, Message, MessageBlock, MessageBlockHeader, MessageBlockLogs,
14 MessageBlockStateUpdate,
15};
16use revm::{Database, DatabaseCommit, DatabaseRef};
17use std::ops::RangeInclusive;
18use std::time::Duration;
19use tracing::{debug, error};
20
21#[allow(clippy::too_many_arguments)]
22pub async fn node_player_worker<P, DB>(
23 provider: P,
24 start_block: BlockNumber,
25 end_block: BlockNumber,
26 mempool: Option<SharedState<Mempool>>,
27 market_state: Option<SharedState<MarketState<DB>>>,
28 new_block_headers_channel: Option<Broadcaster<MessageBlockHeader>>,
29 new_block_with_tx_channel: Option<Broadcaster<MessageBlock>>,
30 new_block_logs_channel: Option<Broadcaster<MessageBlockLogs>>,
31 new_block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate>>,
32) -> WorkerResult
33where
34 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
35 DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + DatabaseKabuExt + 'static,
36{
37 for curblock_number in RangeInclusive::new(start_block, end_block) {
38 let block = provider.get_block_by_number(curblock_number.into()).await?;
40
41 if let Some(block) = block {
42 let block_header = block.header.clone();
43 let curblock_hash = block.header.hash;
44
45 if let Some(mempool) = mempool.clone() {
46 let mut mempool_guard = mempool.write().await;
47 for tx_hash in mempool_guard.txs.clone().keys() {
48 if mempool_guard.is_mined(tx_hash) {
49 } else {
51 mempool_guard.set_mined(*tx_hash, curblock_number);
52 }
53 }
54
55 debug!("Mempool cleaned");
57 }
58
59 if let Some(mempool) = mempool.clone() {
61 if let Some(market_state) = market_state.clone() {
62 if let Err(e) = replayer_mempool_task(mempool, market_state, block.header.clone()).await {
63 error!("process_mempool_task : {e}");
64 }
65 };
66 };
67
68 if let Some(block_headers_channel) = &new_block_headers_channel {
69 if let Err(e) =
70 block_headers_channel.send(Message::new_with_time(BlockHeaderEventData::<KabuDataTypesEthereum>::new(block.header)))
71 {
72 error!("new_block_headers_channel.send error: {e}");
73 }
74 }
75 if let Some(block_with_tx_channel) = &new_block_with_tx_channel {
76 match provider.get_block_by_hash(curblock_hash).full().await {
77 Ok(block) => {
78 if let Some(block) = block {
79 let mut txs = if let Some(mempool) = mempool.clone() {
80 let guard = mempool.read().await;
81
82 if !guard.is_empty() {
83 guard.filter_on_block(curblock_number).into_iter().flat_map(|x| x.tx.clone()).collect()
84 } else {
85 vec![]
86 }
87 } else {
88 vec![]
89 };
90
91 if txs.is_empty() {
92 let block_update = BlockUpdate { block };
93 if let Err(e) = block_with_tx_channel.send(Message::new_with_time(block_update)) {
94 error!("new_block_with_tx_channel.send error: {e}");
95 }
96 } else if let Some(block_txs) = block.transactions.as_transactions() {
97 txs.extend(block_txs.iter().cloned());
98 let mut block = block;
99
100 block.transactions = BlockTransactions::Full(txs);
101 let block_update = BlockUpdate { block };
102 if let Err(e) = block_with_tx_channel.send(Message::new_with_time(block_update)) {
103 error!("new_block_with_tx_channel.send updated block error: {e}");
104 }
105 }
106 } else {
107 error!("Block is empty")
108 }
109 }
110 Err(e) => {
111 error!("get_logs error: {e}")
112 }
113 }
114 }
115
116 if let Some(block_logs_channel) = &new_block_logs_channel {
117 let filter = Filter::new().at_block_hash(curblock_hash);
118
119 let mut logs = if let Some(mempool) = mempool.clone() {
120 let guard = mempool.read().await;
121
122 if !guard.is_empty() {
123 guard.filter_on_block(curblock_number).into_iter().flat_map(|x| x.logs.clone().unwrap_or_default()).collect()
124 } else {
125 vec![]
126 }
127 } else {
128 vec![]
129 };
130
131 match provider.get_logs(&filter).await {
132 Ok(block_logs) => {
133 debug!("Mempool logs : {}", logs.len());
134 logs.extend(block_logs);
135 let logs_update = BlockLogs { block_header: block_header.clone(), logs };
136 if let Err(e) = block_logs_channel.send(Message::new_with_time(logs_update)) {
137 error!("new_block_logs_channel.send error: {e}");
138 }
139 }
140 Err(e) => {
141 error!("get_logs error: {e}")
142 }
143 }
144 }
145
146 if let Some(block_state_update_channel) = &new_block_state_update_channel {
147 if let Some(mempool) = mempool.clone() {
148 if let Some(market_state) = market_state.clone() {
149 let mempool_guard = mempool.read().await;
150 let txes = mempool_guard.filter_on_block(curblock_number);
151
152 if !txes.is_empty() {
153 let mut marker_state_guard = market_state.write().await;
154 for mempool_tx in txes {
155 if let Some(state_update) = &mempool_tx.state_update {
156 marker_state_guard.apply_geth_update(state_update.clone());
157 }
158 }
159 marker_state_guard.state_db = marker_state_guard.state_db.clone().maintain();
160 }
161 }
162 }
163
164 match debug_trace_block(provider.clone(), BlockId::Hash(curblock_hash.into()), true).await {
165 Ok((_, post)) => {
166 if let Err(e) =
167 block_state_update_channel.send(Message::new_with_time(BlockStateUpdate { block_header, state_update: post }))
168 {
169 error!("new_block_state_update_channel error: {e}");
170 }
171 }
172 Err(e) => {
173 error!("debug_trace_block error : {e}")
174 }
175 }
176 }
177 }
178
179 tokio::time::sleep(Duration::from_millis(1000)).await;
180 }
181
182 Ok("Node block player worker finished".to_string())
183}