1use alloy_eips::BlockNumHash;
2use alloy_network::primitives::BlockTransactions;
3use alloy_primitives::map::HashMap;
4use alloy_primitives::{Address, U256};
5use alloy_rpc_types::Block;
6use futures::TryStreamExt;
7use kabu_core_actors::Broadcaster;
8use kabu_core_blockchain::Blockchain;
9use kabu_evm_utils::reth_types::append_all_matching_block_logs_sealed;
10use kabu_node_actor_config::NodeBlockActorConfig;
11use kabu_types_blockchain::{GethStateUpdate, KabuDataTypesEthereum, MempoolTx};
12use kabu_types_events::{
13 BlockHeaderEventData, BlockLogs, BlockStateUpdate, BlockUpdate, Message, MessageBlock, MessageBlockHeader, MessageBlockLogs,
14 MessageBlockStateUpdate, MessageMempoolDataUpdate, NodeMempoolDataUpdate,
15};
16use reth_exex::{ExExContext, ExExEvent, ExExNotification};
17use reth_node_api::{FullNodeComponents, NodeTypes};
18use reth_primitives::EthPrimitives;
19use reth_provider::Chain;
20use reth_transaction_pool::{EthPooledTransaction, TransactionPool};
22use revm::database::states::StorageSlot;
23use revm::database::{BundleAccount, StorageWithOriginalValues};
24use std::sync::Arc;
25use tokio::select;
26use tracing::{debug, error, info};
27
28async fn process_chain(
29 chain: Arc<Chain<EthPrimitives>>,
30 block_header_channel: Broadcaster<MessageBlockHeader>,
31 block_with_tx_channel: Broadcaster<MessageBlock>,
32 logs_channel: Broadcaster<MessageBlockLogs>,
33 state_update_channel: Broadcaster<MessageBlockStateUpdate>,
34 config: &NodeBlockActorConfig,
35) -> eyre::Result<()> {
36 if config.block_header {
37 for sealed_header in chain.headers() {
38 let header = alloy_rpc_types::Header {
40 hash: sealed_header.hash(),
41 inner: sealed_header.header().clone(),
42 total_difficulty: None,
43 size: None,
44 };
45 if let Err(e) =
46 block_header_channel.send(MessageBlockHeader::new_with_time(BlockHeaderEventData::<KabuDataTypesEthereum>::new(header)))
47 {
48 error!(error=?e.to_string(), "block_header_channel.send")
49 }
50 }
51 }
52
53 for (sealed_block, receipts) in chain.blocks_and_receipts() {
54 let number = sealed_block.number;
55 let hash = sealed_block.hash();
56
57 let block_hash_num = BlockNumHash { number, hash };
58
59 if config.block_with_tx {
61 info!(block_number=?block_hash_num.number, block_hash=?block_hash_num.hash, "Processing block");
62 let block = Block {
64 header: alloy_rpc_types::Header {
65 hash: sealed_block.hash(),
66 inner: sealed_block.header().clone(),
67 total_difficulty: None,
68 size: Some(U256::from(0)),
69 },
70 uncles: vec![],
71 transactions: BlockTransactions::Full(vec![]),
72 withdrawals: None,
73 };
74 match Ok::<Block, eyre::Error>(block) {
75 Ok(block) => {
76 let block: Block = Block {
77 transactions: BlockTransactions::Full(block.transactions.into_transactions().collect()),
78 header: block.header,
79 uncles: block.uncles,
80 withdrawals: block.withdrawals,
81 };
82
83 if let Err(e) = block_with_tx_channel.send(Message::new_with_time(BlockUpdate { block })) {
84 error!(error=?e.to_string(), "block_with_tx_channel.send")
85 }
86 }
87 Err(e) => {
88 error!(error = ?e, "from_block")
89 }
90 }
91 }
92
93 if config.block_logs {
95 let mut logs: Vec<alloy_rpc_types::Log> = Vec::new();
96
97 let receipts = receipts.clone();
98
99 append_all_matching_block_logs_sealed(&mut logs, block_hash_num, receipts, false, sealed_block)?;
100
101 let reth_header = sealed_block.header().clone();
102 let block_header = alloy_rpc_types::Header {
103 hash: sealed_block.hash(),
104 inner: reth_header.clone(),
105 total_difficulty: Some(reth_header.difficulty),
106 size: Some(U256::from(reth_header.size())),
107 };
108
109 let log_update = BlockLogs { block_header: block_header.clone(), logs };
110
111 if let Err(e) = logs_channel.send(Message::new_with_time(log_update)) {
112 error!(error=?e.to_string(), "logs_channel.send")
113 }
114 }
115
116 if config.block_state_update {
118 if let Some(execution_outcome) = chain.execution_outcome_at_block(block_hash_num.number) {
119 let mut state_update = GethStateUpdate::new();
120
121 let state_ref: &HashMap<Address, BundleAccount> = execution_outcome.bundle.state();
122
123 for (address, accounts) in state_ref.iter() {
124 let account_state = state_update.entry(*address).or_default();
125 if let Some(account_info) = accounts.info.clone() {
126 account_state.code = account_info.code.map(|c| c.bytecode().clone());
127 account_state.balance = Some(account_info.balance);
128 account_state.nonce = Some(account_info.nonce);
129 }
130
131 let storage: &StorageWithOriginalValues = &accounts.storage;
132
133 for (key, storage_slot) in storage.iter() {
134 let (key, storage_slot): (&U256, &StorageSlot) = (key, storage_slot);
135 account_state.storage.insert((*key).into(), storage_slot.present_value.into());
136 }
137 }
138 let reth_header = sealed_block.header().clone();
139 let block_header = alloy_rpc_types::Header {
140 hash: sealed_block.hash(),
141 inner: reth_header.clone(),
142 total_difficulty: Some(reth_header.difficulty),
143 size: Some(U256::from(reth_header.size())),
144 };
145
146 let block_state_update = BlockStateUpdate { block_header: block_header.clone(), state_update: vec![state_update] };
147
148 if let Err(e) = state_update_channel.send(Message::new_with_time(block_state_update)) {
149 error!(error=?e.to_string(), "block_with_tx_channel.send")
150 }
151 }
152 }
153 }
154
155 Ok(())
156}
157
158pub async fn kabu_exex<Node>(mut ctx: ExExContext<Node>, bc: Blockchain, config: NodeBlockActorConfig) -> eyre::Result<()>
159where
160 Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
161{
162 info!("Kabu ExEx is started");
163
164 while let Some(exex_notification) = ctx.notifications.try_next().await? {
165 match &exex_notification {
166 ExExNotification::ChainCommitted { new } => {
167 info!(committed_chain = ?new.range(), "Received commit");
168 if let Err(e) = process_chain(
169 new.clone(),
170 bc.new_block_headers_channel(),
171 bc.new_block_with_tx_channel(),
172 bc.new_block_logs_channel(),
173 bc.new_block_state_update_channel(),
174 &config,
175 )
176 .await
177 {
178 error!(error=?e, "process_chain");
179 }
180 }
181 ExExNotification::ChainReorged { old, new } => {
182 info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
184 if let Err(e) = process_chain(
185 new.clone(),
186 bc.new_block_headers_channel(),
187 bc.new_block_with_tx_channel(),
188 bc.new_block_logs_channel(),
189 bc.new_block_state_update_channel(),
190 &config,
191 )
192 .await
193 {
194 error!(error=?e, "process_chain");
195 }
196 }
197 ExExNotification::ChainReverted { old } => {
198 info!(reverted_chain = ?old.range(), "Received revert");
199 }
200 };
201 if let Some(committed_chain) = exex_notification.committed_chain() {
202 ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
203 }
204 }
205
206 info!("Kabu ExEx is finished");
207 Ok(())
208}
209
210pub async fn mempool_worker<Pool>(mempool: Pool, bc: Blockchain) -> eyre::Result<()>
211where
212 Pool: TransactionPool<Transaction = EthPooledTransaction> + Clone + 'static,
213{
214 info!("Mempool worker started");
215 let mut tx_listener = mempool.new_transactions_listener();
216
217 let mempool_tx = bc.new_mempool_tx_channel();
218
219 loop {
222 select! {
223 tx_notification = tx_listener.recv() => {
224 if let Some(tx_notification) = tx_notification {
225 let tx_hash = *tx_notification.transaction.hash();
226 let update_msg: MessageMempoolDataUpdate = MessageMempoolDataUpdate::new_with_source(NodeMempoolDataUpdate { tx_hash, mempool_tx: MempoolTx { tx: None, ..MempoolTx::default() } }, "exex".to_string());
230 if let Err(e) = mempool_tx.send(update_msg) {
231 error!(error=?e.to_string(), "mempool_tx.send");
232 }else{
233 debug!(hash = ?tx_notification.transaction.hash(), "Received pool tx");
234 }
235 }
236 }
237 }
238 }
239}