kabu_node_json_rpc/
node_block_actor.rs

1use alloy_json_rpc::RpcRecv;
2use alloy_network::{BlockResponse, Network};
3use alloy_provider::Provider;
4use alloy_rpc_types::Block;
5use std::marker::PhantomData;
6
7use crate::eth::new_eth_node_block_workers_starter;
8use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer};
9use kabu_core_actors_macros::Producer;
10use kabu_core_blockchain::Blockchain;
11use kabu_node_actor_config::NodeBlockActorConfig;
12use kabu_node_debug_provider::DebugProviderExt;
13use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM};
14use kabu_types_events::{MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate};
15
16#[derive(Producer)]
17pub struct NodeBlockActor<P, N, LDT: KabuDataTypes + 'static> {
18    client: P,
19    config: NodeBlockActorConfig,
20    #[producer]
21    block_header_channel: Option<Broadcaster<MessageBlockHeader<LDT>>>,
22    #[producer]
23    block_with_tx_channel: Option<Broadcaster<MessageBlock<LDT>>>,
24    #[producer]
25    block_logs_channel: Option<Broadcaster<MessageBlockLogs<LDT>>>,
26    #[producer]
27    block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate<LDT>>>,
28    _n: PhantomData<N>,
29}
30
31impl<P, N, LDT> NodeBlockActor<P, N, LDT>
32where
33    N: Network<HeaderResponse = LDT::Header, BlockResponse = LDT::Block>,
34    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
35    LDT: KabuDataTypesEVM<Block = Block>,
36    LDT::Block: BlockResponse + RpcRecv,
37{
38    pub fn new(client: P, config: NodeBlockActorConfig) -> NodeBlockActor<P, N, LDT> {
39        NodeBlockActor {
40            client,
41            config,
42            block_header_channel: None,
43            block_with_tx_channel: None,
44            block_logs_channel: None,
45            block_state_update_channel: None,
46            _n: PhantomData,
47        }
48    }
49
50    pub fn on_bc(self, bc: &Blockchain<LDT>) -> Self {
51        Self {
52            block_header_channel: if self.config.block_header { Some(bc.new_block_headers_channel()) } else { None },
53            block_with_tx_channel: if self.config.block_with_tx { Some(bc.new_block_with_tx_channel()) } else { None },
54            block_logs_channel: if self.config.block_logs { Some(bc.new_block_logs_channel()) } else { None },
55            block_state_update_channel: if self.config.block_state_update { Some(bc.new_block_state_update_channel()) } else { None },
56            ..self
57        }
58    }
59}
60
61impl<P, N, LDT> Actor for NodeBlockActor<P, N, LDT>
62where
63    N: Network<HeaderResponse = LDT::Header, BlockResponse = LDT::Block>,
64    P: Provider + DebugProviderExt + Send + Sync + Clone + 'static,
65    LDT: KabuDataTypesEVM<Block = Block>,
66    LDT::Block: BlockResponse + RpcRecv,
67{
68    fn start(&self) -> ActorResult {
69        new_eth_node_block_workers_starter(
70            self.client.clone(),
71            self.block_header_channel.clone(),
72            self.block_with_tx_channel.clone(),
73            self.block_logs_channel.clone(),
74            self.block_state_update_channel.clone(),
75        )
76    }
77    fn name(&self) -> &'static str {
78        "NodeBlockActor"
79    }
80}
81
82/*
83impl<P> Actor for NodeBlockActor<P, N, KabuDataTypesOptimism>
84where
85    P: Provider<Optimism> + DebugProviderExt + Send + Sync + Clone + 'static,
86{
87    fn start(&self) -> ActorResult {
88        new_op_node_block_workers_starter(
89            self.client.clone(),
90            self.block_header_channel.clone(),
91            self.block_with_tx_channel.clone(),
92            self.block_logs_channel.clone(),
93            self.block_state_update_channel.clone(),
94        )
95    }
96    fn name(&self) -> &'static str {
97        "NodeBlockActor"
98    }
99}
100
101 */