kabu_node_exex/
reth_exex_worker.rs

1use alloy_eips::BlockNumHash;
2use alloy_network::primitives::BlockTransactions;
3use alloy_primitives::map::HashMap;
4use alloy_primitives::{Address, U256};
5use alloy_rpc_types::Block;
6use futures::TryStreamExt;
7use kabu_core_actors::Broadcaster;
8use kabu_core_blockchain::Blockchain;
9use kabu_evm_utils::reth_types::append_all_matching_block_logs_sealed;
10use kabu_node_actor_config::NodeBlockActorConfig;
11use kabu_types_blockchain::{GethStateUpdate, KabuDataTypesEthereum, MempoolTx};
12use kabu_types_events::{
13    BlockHeaderEventData, BlockLogs, BlockStateUpdate, BlockUpdate, Message, MessageBlock, MessageBlockHeader, MessageBlockLogs,
14    MessageBlockStateUpdate, MessageMempoolDataUpdate, NodeMempoolDataUpdate,
15};
16use reth_exex::{ExExContext, ExExEvent, ExExNotification};
17use reth_node_api::{FullNodeComponents, NodeTypes};
18use reth_primitives::EthPrimitives;
19use reth_provider::Chain;
20// RpcNodeCore and TransactionCompat removed in new reth version
21use reth_transaction_pool::{EthPooledTransaction, TransactionPool};
22use revm::database::states::StorageSlot;
23use revm::database::{BundleAccount, StorageWithOriginalValues};
24use std::sync::Arc;
25use tokio::select;
26use tracing::{debug, error, info};
27
28async fn process_chain(
29    chain: Arc<Chain<EthPrimitives>>,
30    block_header_channel: Broadcaster<MessageBlockHeader>,
31    block_with_tx_channel: Broadcaster<MessageBlock>,
32    logs_channel: Broadcaster<MessageBlockLogs>,
33    state_update_channel: Broadcaster<MessageBlockStateUpdate>,
34    config: &NodeBlockActorConfig,
35) -> eyre::Result<()> {
36    if config.block_header {
37        for sealed_header in chain.headers() {
38            //let header = TryInto::<alloy_SealedHeader>::reth_rpc_types_compat::block::from_primitive_with_hash(sealed_header);
39            let header = alloy_rpc_types::Header {
40                hash: sealed_header.hash(),
41                inner: sealed_header.header().clone(),
42                total_difficulty: None,
43                size: None,
44            };
45            if let Err(e) =
46                block_header_channel.send(MessageBlockHeader::new_with_time(BlockHeaderEventData::<KabuDataTypesEthereum>::new(header)))
47            {
48                error!(error=?e.to_string(), "block_header_channel.send")
49            }
50        }
51    }
52
53    for (sealed_block, receipts) in chain.blocks_and_receipts() {
54        let number = sealed_block.number;
55        let hash = sealed_block.hash();
56
57        let block_hash_num = BlockNumHash { number, hash };
58
59        // Block with tx
60        if config.block_with_tx {
61            info!(block_number=?block_hash_num.number, block_hash=?block_hash_num.hash, "Processing block");
62            // Convert block - reth API changed
63            let block = Block {
64                header: alloy_rpc_types::Header {
65                    hash: sealed_block.hash(),
66                    inner: sealed_block.header().clone(),
67                    total_difficulty: None,
68                    size: Some(U256::from(0)),
69                },
70                uncles: vec![],
71                transactions: BlockTransactions::Full(vec![]),
72                withdrawals: None,
73            };
74            match Ok::<Block, eyre::Error>(block) {
75                Ok(block) => {
76                    let block: Block = Block {
77                        transactions: BlockTransactions::Full(block.transactions.into_transactions().collect()),
78                        header: block.header,
79                        uncles: block.uncles,
80                        withdrawals: block.withdrawals,
81                    };
82
83                    if let Err(e) = block_with_tx_channel.send(Message::new_with_time(BlockUpdate { block })) {
84                        error!(error=?e.to_string(), "block_with_tx_channel.send")
85                    }
86                }
87                Err(e) => {
88                    error!(error = ?e, "from_block")
89                }
90            }
91        }
92
93        // Block logs
94        if config.block_logs {
95            let mut logs: Vec<alloy_rpc_types::Log> = Vec::new();
96
97            let receipts = receipts.clone();
98
99            append_all_matching_block_logs_sealed(&mut logs, block_hash_num, receipts, false, sealed_block)?;
100
101            let reth_header = sealed_block.header().clone();
102            let block_header = alloy_rpc_types::Header {
103                hash: sealed_block.hash(),
104                inner: reth_header.clone(),
105                total_difficulty: Some(reth_header.difficulty),
106                size: Some(U256::from(reth_header.size())),
107            };
108
109            let log_update = BlockLogs { block_header: block_header.clone(), logs };
110
111            if let Err(e) = logs_channel.send(Message::new_with_time(log_update)) {
112                error!(error=?e.to_string(), "logs_channel.send")
113            }
114        }
115
116        // Block state update
117        if config.block_state_update {
118            if let Some(execution_outcome) = chain.execution_outcome_at_block(block_hash_num.number) {
119                let mut state_update = GethStateUpdate::new();
120
121                let state_ref: &HashMap<Address, BundleAccount> = execution_outcome.bundle.state();
122
123                for (address, accounts) in state_ref.iter() {
124                    let account_state = state_update.entry(*address).or_default();
125                    if let Some(account_info) = accounts.info.clone() {
126                        account_state.code = account_info.code.map(|c| c.bytecode().clone());
127                        account_state.balance = Some(account_info.balance);
128                        account_state.nonce = Some(account_info.nonce);
129                    }
130
131                    let storage: &StorageWithOriginalValues = &accounts.storage;
132
133                    for (key, storage_slot) in storage.iter() {
134                        let (key, storage_slot): (&U256, &StorageSlot) = (key, storage_slot);
135                        account_state.storage.insert((*key).into(), storage_slot.present_value.into());
136                    }
137                }
138                let reth_header = sealed_block.header().clone();
139                let block_header = alloy_rpc_types::Header {
140                    hash: sealed_block.hash(),
141                    inner: reth_header.clone(),
142                    total_difficulty: Some(reth_header.difficulty),
143                    size: Some(U256::from(reth_header.size())),
144                };
145
146                let block_state_update = BlockStateUpdate { block_header: block_header.clone(), state_update: vec![state_update] };
147
148                if let Err(e) = state_update_channel.send(Message::new_with_time(block_state_update)) {
149                    error!(error=?e.to_string(), "block_with_tx_channel.send")
150                }
151            }
152        }
153    }
154
155    Ok(())
156}
157
158pub async fn kabu_exex<Node>(mut ctx: ExExContext<Node>, bc: Blockchain, config: NodeBlockActorConfig) -> eyre::Result<()>
159where
160    Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
161{
162    info!("Kabu ExEx is started");
163
164    while let Some(exex_notification) = ctx.notifications.try_next().await? {
165        match &exex_notification {
166            ExExNotification::ChainCommitted { new } => {
167                info!(committed_chain = ?new.range(), "Received commit");
168                if let Err(e) = process_chain(
169                    new.clone(),
170                    bc.new_block_headers_channel(),
171                    bc.new_block_with_tx_channel(),
172                    bc.new_block_logs_channel(),
173                    bc.new_block_state_update_channel(),
174                    &config,
175                )
176                .await
177                {
178                    error!(error=?e, "process_chain");
179                }
180            }
181            ExExNotification::ChainReorged { old, new } => {
182                // revert to block before the reorg
183                info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
184                if let Err(e) = process_chain(
185                    new.clone(),
186                    bc.new_block_headers_channel(),
187                    bc.new_block_with_tx_channel(),
188                    bc.new_block_logs_channel(),
189                    bc.new_block_state_update_channel(),
190                    &config,
191                )
192                .await
193                {
194                    error!(error=?e, "process_chain");
195                }
196            }
197            ExExNotification::ChainReverted { old } => {
198                info!(reverted_chain = ?old.range(), "Received revert");
199            }
200        };
201        if let Some(committed_chain) = exex_notification.committed_chain() {
202            ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
203        }
204    }
205
206    info!("Kabu ExEx is finished");
207    Ok(())
208}
209
210pub async fn mempool_worker<Pool>(mempool: Pool, bc: Blockchain) -> eyre::Result<()>
211where
212    Pool: TransactionPool<Transaction = EthPooledTransaction> + Clone + 'static,
213{
214    info!("Mempool worker started");
215    let mut tx_listener = mempool.new_transactions_listener();
216
217    let mempool_tx = bc.new_mempool_tx_channel();
218
219    // EthTxBuilder removed in new version
220
221    loop {
222        select! {
223            tx_notification = tx_listener.recv() => {
224                if let Some(tx_notification) = tx_notification {
225                    let tx_hash = *tx_notification.transaction.hash();
226                    // TODO: Implement proper transaction conversion from EthPooledTransaction to alloy RPC Transaction
227                    // For now, we're skipping the transaction details in the mempool update
228                    // This allows the system to track transaction hashes without full transaction data
229                    let update_msg: MessageMempoolDataUpdate = MessageMempoolDataUpdate::new_with_source(NodeMempoolDataUpdate { tx_hash, mempool_tx: MempoolTx { tx: None, ..MempoolTx::default() } }, "exex".to_string());
230                    if let Err(e) =  mempool_tx.send(update_msg) {
231                        error!(error=?e.to_string(), "mempool_tx.send");
232                    }else{
233                        debug!(hash = ?tx_notification.transaction.hash(), "Received pool tx");
234                    }
235                }
236            }
237        }
238    }
239}