kabu_node_player/
actor.rs1use revm::{Database, DatabaseCommit, DatabaseRef};
2use std::any::type_name;
3use std::marker::PhantomData;
4
5use crate::compose::replayer_compose_worker;
6use crate::worker::node_player_worker;
7use alloy_network::{Ethereum, Network};
8use alloy_primitives::BlockNumber;
9use alloy_provider::Provider;
10use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
11use kabu_core_actors_macros::{Accessor, Consumer, Producer};
12use kabu_core_blockchain::{Blockchain, BlockchainState};
13use kabu_evm_db::{DatabaseKabuExt, KabuDBError};
14use kabu_node_debug_provider::DebugProviderExt;
15use kabu_types_blockchain::Mempool;
16use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
17use kabu_types_entities::MarketState;
18use kabu_types_events::{MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageTxCompose};
19use tokio::task::JoinHandle;
20
21#[derive(Producer, Consumer, Accessor)]
22pub struct NodeBlockPlayerActor<P, N, DB: Send + Sync + Clone + 'static> {
23 client: P,
24 start_block: BlockNumber,
25 end_block: BlockNumber,
26 #[accessor]
27 mempool: Option<SharedState<Mempool>>,
28 #[accessor]
29 market_state: Option<SharedState<MarketState<DB>>>,
30 #[consumer]
31 compose_channel: Option<Broadcaster<MessageTxCompose<KabuDataTypesEthereum>>>,
32 #[producer]
33 block_header_channel: Option<Broadcaster<MessageBlockHeader>>,
34 #[producer]
35 block_with_tx_channel: Option<Broadcaster<MessageBlock>>,
36 #[producer]
37 block_logs_channel: Option<Broadcaster<MessageBlockLogs>>,
38 #[producer]
39 block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate>>,
40 _n: PhantomData<N>,
41}
42
43impl<P, N, DB> NodeBlockPlayerActor<P, N, DB>
44where
45 N: Network,
46 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
47 DB: Database<Error = kabu_evm_db::KabuDBError>
48 + DatabaseRef<Error = kabu_evm_db::KabuDBError>
49 + DatabaseCommit
50 + DatabaseKabuExt
51 + Send
52 + Sync
53 + Clone
54 + 'static,
55{
56 pub fn new(client: P, start_block: BlockNumber, end_block: BlockNumber) -> NodeBlockPlayerActor<P, N, DB> {
57 NodeBlockPlayerActor {
58 client,
59 start_block,
60 end_block,
61 mempool: None,
62 market_state: None,
63 compose_channel: None,
64 block_header_channel: None,
65 block_with_tx_channel: None,
66 block_logs_channel: None,
67 block_state_update_channel: None,
68 _n: PhantomData,
69 }
70 }
71
72 pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
73 Self {
74 mempool: Some(bc.mempool()),
75 block_header_channel: Some(bc.new_block_headers_channel()),
76 block_with_tx_channel: Some(bc.new_block_with_tx_channel()),
77 block_logs_channel: Some(bc.new_block_logs_channel()),
78 block_state_update_channel: Some(bc.new_block_state_update_channel()),
79 market_state: Some(state.market_state_commit()),
80 compose_channel: Some(bc.tx_compose_channel()),
81 ..self
82 }
83 }
84}
85
86impl<P, N, DB> Actor for NodeBlockPlayerActor<P, N, DB>
87where
88 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
89 N: Send + Sync,
90 DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone + 'static,
91{
92 fn start(&self) -> ActorResult {
93 let mut handles: Vec<JoinHandle<WorkerResult>> = Vec::new();
94 if let Some(mempool) = self.mempool.clone() {
95 if let Some(compose_channel) = self.compose_channel.clone() {
96 let handle = tokio::task::spawn(replayer_compose_worker(mempool, compose_channel));
97 handles.push(handle);
98 }
99 }
100
101 let handle = tokio::task::spawn(node_player_worker(
102 self.client.clone(),
103 self.start_block,
104 self.end_block,
105 self.mempool.clone(),
106 self.market_state.clone(),
107 self.block_header_channel.clone(),
108 self.block_with_tx_channel.clone(),
109 self.block_logs_channel.clone(),
110 self.block_state_update_channel.clone(),
111 ));
112 handles.push(handle);
113 Ok(handles)
114 }
115
116 fn name(&self) -> &'static str {
117 type_name::<Self>().rsplit("::").next().unwrap_or(type_name::<Self>())
118 }
119}