kabu_node_json_rpc/eth/
node_block_header_worker.rs

1use std::collections::HashMap;
2
3use alloy_network::Network;
4use alloy_primitives::BlockHash;
5use alloy_provider::Provider;
6use alloy_rpc_types::Header;
7use chrono::Utc;
8use futures::StreamExt;
9use kabu_core_actors::{Broadcaster, WorkerResult};
10use kabu_types_blockchain::KabuDataTypesEVM;
11use kabu_types_events::{BlockHeaderEventData, MessageBlockHeader};
12use tracing::{error, info};
13
14pub async fn new_node_block_header_worker<P, N, LDT>(
15    client: P,
16    new_block_header_channel: Broadcaster<Header>,
17    block_header_channel: Broadcaster<MessageBlockHeader<LDT>>,
18) -> WorkerResult
19where
20    N: Network<HeaderResponse = LDT::Header>,
21    P: Provider<N> + Send + Sync + Clone + 'static,
22    LDT: KabuDataTypesEVM,
23{
24    info!("Starting node block header worker");
25    let sub = client.subscribe_blocks().await?;
26    let mut stream = sub.into_stream();
27
28    let mut block_processed: HashMap<BlockHash, chrono::DateTime<Utc>> = HashMap::new();
29
30    loop {
31        tokio::select! {
32            block_msg = stream.next() => {
33                if let Some(block_header) = block_msg {
34                    let block_hash = block_header.hash_slow();
35                    info!("Block hash received: {:?}" , block_hash);
36                    if let std::collections::hash_map::Entry::Vacant(e) = block_processed.entry(block_hash) {
37                        e.insert(Utc::now());
38                        if let Err(e) =  new_block_header_channel.send(block_header.clone()) {
39                            error!("Block hash broadcaster error  {}", e);
40                        }
41                        if let Err(e) = block_header_channel.send(MessageBlockHeader::new_with_time(BlockHeaderEventData::<LDT>::new(block_header))) {
42                            error!("Block header broadcaster error {}", e);
43                        }
44                    }
45                }
46            }
47        }
48    }
49}