kabu_node_json_rpc/
node_block_actor.rs1use alloy_json_rpc::RpcRecv;
2use alloy_network::{BlockResponse, Network};
3use alloy_provider::Provider;
4use alloy_rpc_types::Block;
5use std::marker::PhantomData;
6
7use crate::eth::new_eth_node_block_workers_starter;
8use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer};
9use kabu_core_actors_macros::Producer;
10use kabu_core_blockchain::Blockchain;
11use kabu_node_actor_config::NodeBlockActorConfig;
12use kabu_node_debug_provider::DebugProviderExt;
13use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM};
14use kabu_types_events::{MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate};
15
16#[derive(Producer)]
17pub struct NodeBlockActor<P, N, LDT: KabuDataTypes + 'static> {
18 client: P,
19 config: NodeBlockActorConfig,
20 #[producer]
21 block_header_channel: Option<Broadcaster<MessageBlockHeader<LDT>>>,
22 #[producer]
23 block_with_tx_channel: Option<Broadcaster<MessageBlock<LDT>>>,
24 #[producer]
25 block_logs_channel: Option<Broadcaster<MessageBlockLogs<LDT>>>,
26 #[producer]
27 block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate<LDT>>>,
28 _n: PhantomData<N>,
29}
30
31impl<P, N, LDT> NodeBlockActor<P, N, LDT>
32where
33 N: Network<HeaderResponse = LDT::Header, BlockResponse = LDT::Block>,
34 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
35 LDT: KabuDataTypesEVM<Block = Block>,
36 LDT::Block: BlockResponse + RpcRecv,
37{
38 pub fn new(client: P, config: NodeBlockActorConfig) -> NodeBlockActor<P, N, LDT> {
39 NodeBlockActor {
40 client,
41 config,
42 block_header_channel: None,
43 block_with_tx_channel: None,
44 block_logs_channel: None,
45 block_state_update_channel: None,
46 _n: PhantomData,
47 }
48 }
49
50 pub fn on_bc(self, bc: &Blockchain<LDT>) -> Self {
51 Self {
52 block_header_channel: if self.config.block_header { Some(bc.new_block_headers_channel()) } else { None },
53 block_with_tx_channel: if self.config.block_with_tx { Some(bc.new_block_with_tx_channel()) } else { None },
54 block_logs_channel: if self.config.block_logs { Some(bc.new_block_logs_channel()) } else { None },
55 block_state_update_channel: if self.config.block_state_update { Some(bc.new_block_state_update_channel()) } else { None },
56 ..self
57 }
58 }
59}
60
61impl<P, N, LDT> Actor for NodeBlockActor<P, N, LDT>
62where
63 N: Network<HeaderResponse = LDT::Header, BlockResponse = LDT::Block>,
64 P: Provider + DebugProviderExt + Send + Sync + Clone + 'static,
65 LDT: KabuDataTypesEVM<Block = Block>,
66 LDT::Block: BlockResponse + RpcRecv,
67{
68 fn start(&self) -> ActorResult {
69 new_eth_node_block_workers_starter(
70 self.client.clone(),
71 self.block_header_channel.clone(),
72 self.block_with_tx_channel.clone(),
73 self.block_logs_channel.clone(),
74 self.block_state_update_channel.clone(),
75 )
76 }
77 fn name(&self) -> &'static str {
78 "NodeBlockActor"
79 }
80}
81
82