kabu_node_json_rpc/eth/
node_block_header_worker.rs1use std::collections::HashMap;
2
3use alloy_network::Network;
4use alloy_primitives::BlockHash;
5use alloy_provider::Provider;
6use alloy_rpc_types::Header;
7use chrono::Utc;
8use futures::StreamExt;
9use kabu_core_actors::{Broadcaster, WorkerResult};
10use kabu_types_blockchain::KabuDataTypesEVM;
11use kabu_types_events::{BlockHeaderEventData, MessageBlockHeader};
12use tracing::{error, info};
13
14pub async fn new_node_block_header_worker<P, N, LDT>(
15 client: P,
16 new_block_header_channel: Broadcaster<Header>,
17 block_header_channel: Broadcaster<MessageBlockHeader<LDT>>,
18) -> WorkerResult
19where
20 N: Network<HeaderResponse = LDT::Header>,
21 P: Provider<N> + Send + Sync + Clone + 'static,
22 LDT: KabuDataTypesEVM,
23{
24 info!("Starting node block header worker");
25 let sub = client.subscribe_blocks().await?;
26 let mut stream = sub.into_stream();
27
28 let mut block_processed: HashMap<BlockHash, chrono::DateTime<Utc>> = HashMap::new();
29
30 loop {
31 tokio::select! {
32 block_msg = stream.next() => {
33 if let Some(block_header) = block_msg {
34 let block_hash = block_header.hash_slow();
35 info!("Block hash received: {:?}" , block_hash);
36 if let std::collections::hash_map::Entry::Vacant(e) = block_processed.entry(block_hash) {
37 e.insert(Utc::now());
38 if let Err(e) = new_block_header_channel.send(block_header.clone()) {
39 error!("Block hash broadcaster error {}", e);
40 }
41 if let Err(e) = block_header_channel.send(MessageBlockHeader::new_with_time(BlockHeaderEventData::<LDT>::new(block_header))) {
42 error!("Block header broadcaster error {}", e);
43 }
44 }
45 }
46 }
47 }
48 }
49}