kabu_node_json_rpc/eth/
node_block_with_tx_worker.rs

1use alloy_json_rpc::RpcRecv;
2use alloy_network::{BlockResponse, Network};
3use alloy_provider::Provider;
4use alloy_rpc_types::Header;
5use kabu_core_actors::{subscribe, Broadcaster, WorkerResult};
6use kabu_types_blockchain::KabuDataTypesEVM;
7use kabu_types_events::{BlockUpdate, Message, MessageBlock};
8use tracing::{debug, error};
9
10pub async fn new_block_with_tx_worker<P, N, LDT>(
11    client: P,
12    block_header_receiver: Broadcaster<Header>,
13    sender: Broadcaster<MessageBlock<LDT>>,
14) -> WorkerResult
15where
16    N: Network<BlockResponse = LDT::Block>,
17    P: Provider<N> + Send + Sync + 'static,
18    LDT: KabuDataTypesEVM,
19    LDT::Block: RpcRecv + BlockResponse,
20{
21    subscribe!(block_header_receiver);
22
23    loop {
24        if let Ok(block_header) = block_header_receiver.recv().await {
25            let (block_number, block_hash) = (block_header.inner.number, block_header.hash);
26            debug!("BlockWithTx header received {} {}", block_number, block_hash);
27
28            let mut err_counter = 0;
29
30            while err_counter < 3 {
31                match client.get_block_by_hash(block_hash).full().await {
32                    Ok(block_with_tx) => {
33                        if let Some(block_with_txes) = block_with_tx {
34                            if let Err(e) = sender.send(Message::new_with_time(BlockUpdate { block: block_with_txes })) {
35                                error!("Broadcaster error {}", e);
36                            }
37                        } else {
38                            error!("BlockWithTx is empty");
39                        }
40                        break;
41                    }
42                    Err(e) => {
43                        error!("client.get_block_by_hash {e}");
44                        err_counter += 1;
45                    }
46                }
47            }
48
49            debug!("BlockWithTx processing finished {} {}", block_number, block_hash);
50        }
51    }
52}