kabu_node_player/
actor.rs

1use revm::{Database, DatabaseCommit, DatabaseRef};
2use std::any::type_name;
3use std::marker::PhantomData;
4
5use crate::compose::replayer_compose_worker;
6use crate::worker::node_player_worker;
7use alloy_network::{Ethereum, Network};
8use alloy_primitives::BlockNumber;
9use alloy_provider::Provider;
10use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
11use kabu_core_actors_macros::{Accessor, Consumer, Producer};
12use kabu_core_blockchain::{Blockchain, BlockchainState};
13use kabu_evm_db::{DatabaseKabuExt, KabuDBError};
14use kabu_node_debug_provider::DebugProviderExt;
15use kabu_types_blockchain::Mempool;
16use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
17use kabu_types_entities::MarketState;
18use kabu_types_events::{MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageTxCompose};
19use tokio::task::JoinHandle;
20
21#[derive(Producer, Consumer, Accessor)]
22pub struct NodeBlockPlayerActor<P, N, DB: Send + Sync + Clone + 'static> {
23    client: P,
24    start_block: BlockNumber,
25    end_block: BlockNumber,
26    #[accessor]
27    mempool: Option<SharedState<Mempool>>,
28    #[accessor]
29    market_state: Option<SharedState<MarketState<DB>>>,
30    #[consumer]
31    compose_channel: Option<Broadcaster<MessageTxCompose<KabuDataTypesEthereum>>>,
32    #[producer]
33    block_header_channel: Option<Broadcaster<MessageBlockHeader>>,
34    #[producer]
35    block_with_tx_channel: Option<Broadcaster<MessageBlock>>,
36    #[producer]
37    block_logs_channel: Option<Broadcaster<MessageBlockLogs>>,
38    #[producer]
39    block_state_update_channel: Option<Broadcaster<MessageBlockStateUpdate>>,
40    _n: PhantomData<N>,
41}
42
43impl<P, N, DB> NodeBlockPlayerActor<P, N, DB>
44where
45    N: Network,
46    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
47    DB: Database<Error = kabu_evm_db::KabuDBError>
48        + DatabaseRef<Error = kabu_evm_db::KabuDBError>
49        + DatabaseCommit
50        + DatabaseKabuExt
51        + Send
52        + Sync
53        + Clone
54        + 'static,
55{
56    pub fn new(client: P, start_block: BlockNumber, end_block: BlockNumber) -> NodeBlockPlayerActor<P, N, DB> {
57        NodeBlockPlayerActor {
58            client,
59            start_block,
60            end_block,
61            mempool: None,
62            market_state: None,
63            compose_channel: None,
64            block_header_channel: None,
65            block_with_tx_channel: None,
66            block_logs_channel: None,
67            block_state_update_channel: None,
68            _n: PhantomData,
69        }
70    }
71
72    pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
73        Self {
74            mempool: Some(bc.mempool()),
75            block_header_channel: Some(bc.new_block_headers_channel()),
76            block_with_tx_channel: Some(bc.new_block_with_tx_channel()),
77            block_logs_channel: Some(bc.new_block_logs_channel()),
78            block_state_update_channel: Some(bc.new_block_state_update_channel()),
79            market_state: Some(state.market_state_commit()),
80            compose_channel: Some(bc.tx_compose_channel()),
81            ..self
82        }
83    }
84}
85
86impl<P, N, DB> Actor for NodeBlockPlayerActor<P, N, DB>
87where
88    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
89    N: Send + Sync,
90    DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone + 'static,
91{
92    fn start(&self) -> ActorResult {
93        let mut handles: Vec<JoinHandle<WorkerResult>> = Vec::new();
94        if let Some(mempool) = self.mempool.clone() {
95            if let Some(compose_channel) = self.compose_channel.clone() {
96                let handle = tokio::task::spawn(replayer_compose_worker(mempool, compose_channel));
97                handles.push(handle);
98            }
99        }
100
101        let handle = tokio::task::spawn(node_player_worker(
102            self.client.clone(),
103            self.start_block,
104            self.end_block,
105            self.mempool.clone(),
106            self.market_state.clone(),
107            self.block_header_channel.clone(),
108            self.block_with_tx_channel.clone(),
109            self.block_logs_channel.clone(),
110            self.block_state_update_channel.clone(),
111        ));
112        handles.push(handle);
113        Ok(handles)
114    }
115
116    fn name(&self) -> &'static str {
117        type_name::<Self>().rsplit("::").next().unwrap_or(type_name::<Self>())
118    }
119}