kabu_node_json_rpc/eth/
node_block_with_tx_worker.rs1use alloy_json_rpc::RpcRecv;
2use alloy_network::{BlockResponse, Network};
3use alloy_provider::Provider;
4use alloy_rpc_types::Header;
5use kabu_core_actors::{subscribe, Broadcaster, WorkerResult};
6use kabu_types_blockchain::KabuDataTypesEVM;
7use kabu_types_events::{BlockUpdate, Message, MessageBlock};
8use tracing::{debug, error};
9
10pub async fn new_block_with_tx_worker<P, N, LDT>(
11 client: P,
12 block_header_receiver: Broadcaster<Header>,
13 sender: Broadcaster<MessageBlock<LDT>>,
14) -> WorkerResult
15where
16 N: Network<BlockResponse = LDT::Block>,
17 P: Provider<N> + Send + Sync + 'static,
18 LDT: KabuDataTypesEVM,
19 LDT::Block: RpcRecv + BlockResponse,
20{
21 subscribe!(block_header_receiver);
22
23 loop {
24 if let Ok(block_header) = block_header_receiver.recv().await {
25 let (block_number, block_hash) = (block_header.inner.number, block_header.hash);
26 debug!("BlockWithTx header received {} {}", block_number, block_hash);
27
28 let mut err_counter = 0;
29
30 while err_counter < 3 {
31 match client.get_block_by_hash(block_hash).full().await {
32 Ok(block_with_tx) => {
33 if let Some(block_with_txes) = block_with_tx {
34 if let Err(e) = sender.send(Message::new_with_time(BlockUpdate { block: block_with_txes })) {
35 error!("Broadcaster error {}", e);
36 }
37 } else {
38 error!("BlockWithTx is empty");
39 }
40 break;
41 }
42 Err(e) => {
43 error!("client.get_block_by_hash {e}");
44 err_counter += 1;
45 }
46 }
47 }
48
49 debug!("BlockWithTx processing finished {} {}", block_number, block_hash);
50 }
51 }
52}