kabu_node_json_rpc/eth/
eth_node_block_starter.rs

1use crate::eth::node_block_header_worker::new_node_block_header_worker;
2use crate::eth::node_block_logs_worker::new_node_block_logs_worker;
3use crate::eth::node_block_state_worker::new_node_block_state_worker;
4use crate::eth::node_block_with_tx_worker::new_block_with_tx_worker;
5use alloy_json_rpc::RpcRecv;
6use alloy_network::{BlockResponse, Network};
7use alloy_provider::Provider;
8use alloy_rpc_types::Header;
9use kabu_core_actors::{ActorResult, Broadcaster, WorkerResult};
10use kabu_node_debug_provider::DebugProviderExt;
11use kabu_types_blockchain::KabuDataTypesEVM;
12use kabu_types_events::{MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate};
13use tokio::task::JoinHandle;
14
15pub fn new_eth_node_block_workers_starter<P, N, LDT>(
16    client: P,
17    new_block_headers_channel: Option<Broadcaster<MessageBlockHeader<LDT>>>,
18    new_block_with_tx_channel: Option<Broadcaster<MessageBlock<LDT>>>,
19    new_block_logs_channel: Option<Broadcaster<MessageBlockLogs<LDT>>>,
20    new_block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate<LDT>>>,
21) -> ActorResult
22where
23    N: Network<HeaderResponse = LDT::Header, BlockResponse = LDT::Block>,
24    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
25    LDT: KabuDataTypesEVM,
26    LDT::Block: RpcRecv + BlockResponse,
27{
28    let new_header_internal_channel: Broadcaster<Header> = Broadcaster::new(10);
29    let mut tasks: Vec<JoinHandle<WorkerResult>> = Vec::new();
30
31    if let Some(channel) = new_block_with_tx_channel {
32        tasks.push(tokio::task::spawn(new_block_with_tx_worker(client.clone(), new_header_internal_channel.clone(), channel)));
33    }
34
35    if let Some(channel) = new_block_headers_channel {
36        tasks.push(tokio::task::spawn(new_node_block_header_worker(client.clone(), new_header_internal_channel.clone(), channel)));
37    }
38
39    if let Some(channel) = new_block_logs_channel {
40        tasks.push(tokio::task::spawn(new_node_block_logs_worker(client.clone(), new_header_internal_channel.clone(), channel)));
41    }
42
43    if let Some(channel) = new_block_state_update_channel {
44        tasks.push(tokio::task::spawn(new_node_block_state_worker(client.clone(), new_header_internal_channel.clone(), channel)));
45    }
46
47    Ok(tasks)
48}