kabu_node_json_rpc/eth/
eth_node_block_starter.rs1use crate::eth::node_block_header_worker::new_node_block_header_worker;
2use crate::eth::node_block_logs_worker::new_node_block_logs_worker;
3use crate::eth::node_block_state_worker::new_node_block_state_worker;
4use crate::eth::node_block_with_tx_worker::new_block_with_tx_worker;
5use alloy_json_rpc::RpcRecv;
6use alloy_network::{BlockResponse, Network};
7use alloy_provider::Provider;
8use alloy_rpc_types::Header;
9use kabu_core_actors::{ActorResult, Broadcaster, WorkerResult};
10use kabu_node_debug_provider::DebugProviderExt;
11use kabu_types_blockchain::KabuDataTypesEVM;
12use kabu_types_events::{MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate};
13use tokio::task::JoinHandle;
14
15pub fn new_eth_node_block_workers_starter<P, N, LDT>(
16 client: P,
17 new_block_headers_channel: Option<Broadcaster<MessageBlockHeader<LDT>>>,
18 new_block_with_tx_channel: Option<Broadcaster<MessageBlock<LDT>>>,
19 new_block_logs_channel: Option<Broadcaster<MessageBlockLogs<LDT>>>,
20 new_block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate<LDT>>>,
21) -> ActorResult
22where
23 N: Network<HeaderResponse = LDT::Header, BlockResponse = LDT::Block>,
24 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
25 LDT: KabuDataTypesEVM,
26 LDT::Block: RpcRecv + BlockResponse,
27{
28 let new_header_internal_channel: Broadcaster<Header> = Broadcaster::new(10);
29 let mut tasks: Vec<JoinHandle<WorkerResult>> = Vec::new();
30
31 if let Some(channel) = new_block_with_tx_channel {
32 tasks.push(tokio::task::spawn(new_block_with_tx_worker(client.clone(), new_header_internal_channel.clone(), channel)));
33 }
34
35 if let Some(channel) = new_block_headers_channel {
36 tasks.push(tokio::task::spawn(new_node_block_header_worker(client.clone(), new_header_internal_channel.clone(), channel)));
37 }
38
39 if let Some(channel) = new_block_logs_channel {
40 tasks.push(tokio::task::spawn(new_node_block_logs_worker(client.clone(), new_header_internal_channel.clone(), channel)));
41 }
42
43 if let Some(channel) = new_block_state_update_channel {
44 tasks.push(tokio::task::spawn(new_node_block_state_worker(client.clone(), new_header_internal_channel.clone(), channel)));
45 }
46
47 Ok(tasks)
48}