kabu_strategy_backrun/
block_state_change_processor.rs

1use super::affected_pools_state::get_affected_pools_from_state_update;
2use eyre::eyre;
3use kabu_core_actors::{run_sync, subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
4use kabu_core_actors_macros::{Accessor, Consumer, Producer};
5use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
6use kabu_types_blockchain::ChainParameters;
7use kabu_types_blockchain::KabuDataTypesEVM;
8use kabu_types_entities::{BlockHistory, Market};
9use kabu_types_events::{MarketEvents, StateUpdateEvent};
10use revm::DatabaseRef;
11use tokio::sync::broadcast::error::RecvError;
12use tracing::error;
13
14pub async fn block_state_change_worker<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM>(
15    chain_parameters: ChainParameters,
16    market: SharedState<Market>,
17    block_history: SharedState<BlockHistory<DB, LDT>>,
18    market_events_rx: Broadcaster<MarketEvents>,
19    state_updates_broadcaster: Broadcaster<StateUpdateEvent<DB, LDT>>,
20) -> WorkerResult {
21    subscribe!(market_events_rx);
22
23    loop {
24        let market_event = match market_events_rx.recv().await {
25            Ok(market_event) => market_event,
26            Err(e) => match e {
27                RecvError::Closed => {
28                    error!("Market events txs channel closed");
29                    break Err(eyre!("MARKET_EVENTS_RX_CLOSED"));
30                }
31                RecvError::Lagged(lag) => {
32                    error!("Market events txs channel lagged by {} messages", lag);
33                    continue;
34                }
35            },
36        };
37        let block_hash = match market_event {
38            MarketEvents::BlockStateUpdate { block_hash } => block_hash,
39            _ => continue,
40        };
41
42        let Some(block_history_entry) = block_history.read().await.get_block_history_entry(&block_hash).cloned() else {
43            error!("Block history entry not found in block history: {:?}", block_hash);
44            continue;
45        };
46
47        let Some(block_state_entry) = block_history.read().await.get_block_state(&block_hash).cloned() else {
48            error!("Block state not found in block history: {:?}", block_hash);
49            continue;
50        };
51
52        let Some(state_update) = block_history_entry.state_update.clone() else {
53            error!("Block {:?} has no state update", block_hash);
54            continue;
55        };
56
57        let affected_pools = get_affected_pools_from_state_update(market.clone(), &state_update).await;
58
59        if affected_pools.is_empty() {
60            error!("Could not get affected pools for block {:?}", block_hash);
61            continue;
62        };
63
64        let next_block_number = block_history_entry.number() + 1;
65        let next_block_timestamp = block_history_entry.timestamp() + 12;
66        let next_base_fee = chain_parameters.calc_next_block_base_fee_from_header(&block_history_entry.header);
67
68        let request = StateUpdateEvent::new(
69            next_block_number,
70            next_block_timestamp,
71            next_base_fee,
72            block_state_entry,
73            state_update,
74            None,
75            affected_pools,
76            Vec::new(),
77            Vec::new(),
78            "block_searcher".to_string(),
79            90_00,
80        );
81        run_sync!(state_updates_broadcaster.send(request));
82    }
83}
84
85#[derive(Accessor, Consumer, Producer)]
86pub struct BlockStateChangeProcessorActor<DB: Clone + Send + Sync + 'static, LDT: KabuDataTypesEVM + 'static> {
87    chain_parameters: ChainParameters,
88    #[accessor]
89    market: Option<SharedState<Market>>,
90    #[accessor]
91    block_history: Option<SharedState<BlockHistory<DB, LDT>>>,
92    #[consumer]
93    market_events_rx: Option<Broadcaster<MarketEvents>>,
94    #[producer]
95    state_updates_tx: Option<Broadcaster<StateUpdateEvent<DB, LDT>>>,
96}
97
98impl<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM> BlockStateChangeProcessorActor<DB, LDT> {
99    pub fn new() -> BlockStateChangeProcessorActor<DB, LDT> {
100        BlockStateChangeProcessorActor {
101            chain_parameters: ChainParameters::ethereum(),
102            market: None,
103            block_history: None,
104            market_events_rx: None,
105            state_updates_tx: None,
106        }
107    }
108
109    pub fn on_bc(self, bc: &Blockchain<LDT>, state: &BlockchainState<DB, LDT>, strategy: &Strategy<DB, LDT>) -> Self {
110        Self {
111            chain_parameters: bc.chain_parameters(),
112            market: Some(bc.market()),
113            market_events_rx: Some(bc.market_events_channel()),
114            state_updates_tx: Some(strategy.state_update_channel()),
115            block_history: Some(state.block_history()),
116        }
117    }
118}
119
120impl<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM> Default for BlockStateChangeProcessorActor<DB, LDT> {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM> Actor for BlockStateChangeProcessorActor<DB, LDT> {
127    fn start(&self) -> ActorResult {
128        let task = tokio::task::spawn(block_state_change_worker(
129            self.chain_parameters.clone(),
130            self.market.clone().unwrap(),
131            self.block_history.clone().unwrap(),
132            self.market_events_rx.clone().unwrap(),
133            self.state_updates_tx.clone().unwrap(),
134        ));
135        Ok(vec![task])
136    }
137
138    fn name(&self) -> &'static str {
139        "BlockStateChangeProcessorActor"
140    }
141}