kabu_node_json_rpc/eth/
node_block_logs_worker.rs1use alloy_network::{primitives::HeaderResponse, Network};
2use std::time::Duration;
3
4use alloy_provider::Provider;
5use alloy_rpc_types::{Filter, Header};
6use tokio::sync::broadcast::Receiver;
7use tracing::{debug, error};
8
9use kabu_core_actors::{subscribe, Broadcaster, WorkerResult};
10use kabu_types_blockchain::KabuDataTypesEVM;
11use kabu_types_events::{BlockLogs, Message, MessageBlockLogs};
12
13pub async fn new_node_block_logs_worker<N, P, LDT>(
14 client: P,
15 block_header_receiver: Broadcaster<Header>,
16 sender: Broadcaster<MessageBlockLogs<LDT>>,
17) -> WorkerResult
18where
19 N: Network,
20 P: Provider<N> + Send + Sync + 'static,
21 LDT: KabuDataTypesEVM,
22{
23 subscribe!(block_header_receiver);
24
25 loop {
26 if let Ok(block_header) = block_header_receiver.recv().await {
27 let (block_number, block_hash) = (block_header.number, block_header.hash);
28 debug!("BlockLogs header received {} {}", block_number, block_hash);
29 let filter = Filter::new().at_block_hash(block_header.hash());
30
31 let mut err_counter = 0;
32
33 while err_counter < 3 {
34 match client.get_logs(&filter).await {
35 Ok(logs) => {
36 if let Err(e) = sender.send(Message::new_with_time(BlockLogs { block_header, logs })) {
37 error!("Broadcaster error {}", e);
38 }
39 break;
40 }
41 Err(e) => {
42 error!("client.get_logs error: {}", e);
43 err_counter += 1;
44 tokio::time::sleep(Duration::from_millis(100)).await;
45 }
46 }
47 }
48
49 debug!("BlockLogs processing finished {} {}", block_number, block_hash);
50 }
51 }
52}
53
54#[allow(dead_code)]
55pub async fn new_node_block_logs_worker_reth<N: Network, P: Provider<N> + Send + Sync + 'static>(
56 client: P,
57 mut block_header_receiver: Receiver<Header>,
58 sender: Broadcaster<MessageBlockLogs>,
59) -> WorkerResult {
60 loop {
61 if let Ok(block_header) = block_header_receiver.recv().await {
62 let filter = Filter::new().at_block_hash(block_header.hash());
63
64 let logs = client.get_logs(&filter).await?;
65 if let Err(e) = sender.send(Message::new_with_time(BlockLogs { block_header, logs })) {
66 error!("Broadcaster error {}", e);
67 }
68 }
69 }
70}