kabu_node_json_rpc/eth/
node_block_logs_worker.rs

1use alloy_network::{primitives::HeaderResponse, Network};
2use std::time::Duration;
3
4use alloy_provider::Provider;
5use alloy_rpc_types::{Filter, Header};
6use tokio::sync::broadcast::Receiver;
7use tracing::{debug, error};
8
9use kabu_core_actors::{subscribe, Broadcaster, WorkerResult};
10use kabu_types_blockchain::KabuDataTypesEVM;
11use kabu_types_events::{BlockLogs, Message, MessageBlockLogs};
12
13pub async fn new_node_block_logs_worker<N, P, LDT>(
14    client: P,
15    block_header_receiver: Broadcaster<Header>,
16    sender: Broadcaster<MessageBlockLogs<LDT>>,
17) -> WorkerResult
18where
19    N: Network,
20    P: Provider<N> + Send + Sync + 'static,
21    LDT: KabuDataTypesEVM,
22{
23    subscribe!(block_header_receiver);
24
25    loop {
26        if let Ok(block_header) = block_header_receiver.recv().await {
27            let (block_number, block_hash) = (block_header.number, block_header.hash);
28            debug!("BlockLogs header received {} {}", block_number, block_hash);
29            let filter = Filter::new().at_block_hash(block_header.hash());
30
31            let mut err_counter = 0;
32
33            while err_counter < 3 {
34                match client.get_logs(&filter).await {
35                    Ok(logs) => {
36                        if let Err(e) = sender.send(Message::new_with_time(BlockLogs { block_header, logs })) {
37                            error!("Broadcaster error {}", e);
38                        }
39                        break;
40                    }
41                    Err(e) => {
42                        error!("client.get_logs error: {}", e);
43                        err_counter += 1;
44                        tokio::time::sleep(Duration::from_millis(100)).await;
45                    }
46                }
47            }
48
49            debug!("BlockLogs processing finished {} {}", block_number, block_hash);
50        }
51    }
52}
53
54#[allow(dead_code)]
55pub async fn new_node_block_logs_worker_reth<N: Network, P: Provider<N> + Send + Sync + 'static>(
56    client: P,
57    mut block_header_receiver: Receiver<Header>,
58    sender: Broadcaster<MessageBlockLogs>,
59) -> WorkerResult {
60    loop {
61        if let Ok(block_header) = block_header_receiver.recv().await {
62            let filter = Filter::new().at_block_hash(block_header.hash());
63
64            let logs = client.get_logs(&filter).await?;
65            if let Err(e) = sender.send(Message::new_with_time(BlockLogs { block_header, logs })) {
66                error!("Broadcaster error {}", e);
67            }
68        }
69    }
70}