kabu_strategy_backrun/
block_state_change_processor.rs1use super::affected_pools_state::get_affected_pools_from_state_update;
2use eyre::eyre;
3use kabu_core_actors::{run_sync, subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
4use kabu_core_actors_macros::{Accessor, Consumer, Producer};
5use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
6use kabu_types_blockchain::ChainParameters;
7use kabu_types_blockchain::KabuDataTypesEVM;
8use kabu_types_entities::{BlockHistory, Market};
9use kabu_types_events::{MarketEvents, StateUpdateEvent};
10use revm::DatabaseRef;
11use tokio::sync::broadcast::error::RecvError;
12use tracing::error;
13
14pub async fn block_state_change_worker<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM>(
15 chain_parameters: ChainParameters,
16 market: SharedState<Market>,
17 block_history: SharedState<BlockHistory<DB, LDT>>,
18 market_events_rx: Broadcaster<MarketEvents>,
19 state_updates_broadcaster: Broadcaster<StateUpdateEvent<DB, LDT>>,
20) -> WorkerResult {
21 subscribe!(market_events_rx);
22
23 loop {
24 let market_event = match market_events_rx.recv().await {
25 Ok(market_event) => market_event,
26 Err(e) => match e {
27 RecvError::Closed => {
28 error!("Market events txs channel closed");
29 break Err(eyre!("MARKET_EVENTS_RX_CLOSED"));
30 }
31 RecvError::Lagged(lag) => {
32 error!("Market events txs channel lagged by {} messages", lag);
33 continue;
34 }
35 },
36 };
37 let block_hash = match market_event {
38 MarketEvents::BlockStateUpdate { block_hash } => block_hash,
39 _ => continue,
40 };
41
42 let Some(block_history_entry) = block_history.read().await.get_block_history_entry(&block_hash).cloned() else {
43 error!("Block history entry not found in block history: {:?}", block_hash);
44 continue;
45 };
46
47 let Some(block_state_entry) = block_history.read().await.get_block_state(&block_hash).cloned() else {
48 error!("Block state not found in block history: {:?}", block_hash);
49 continue;
50 };
51
52 let Some(state_update) = block_history_entry.state_update.clone() else {
53 error!("Block {:?} has no state update", block_hash);
54 continue;
55 };
56
57 let affected_pools = get_affected_pools_from_state_update(market.clone(), &state_update).await;
58
59 if affected_pools.is_empty() {
60 error!("Could not get affected pools for block {:?}", block_hash);
61 continue;
62 };
63
64 let next_block_number = block_history_entry.number() + 1;
65 let next_block_timestamp = block_history_entry.timestamp() + 12;
66 let next_base_fee = chain_parameters.calc_next_block_base_fee_from_header(&block_history_entry.header);
67
68 let request = StateUpdateEvent::new(
69 next_block_number,
70 next_block_timestamp,
71 next_base_fee,
72 block_state_entry,
73 state_update,
74 None,
75 affected_pools,
76 Vec::new(),
77 Vec::new(),
78 "block_searcher".to_string(),
79 90_00,
80 );
81 run_sync!(state_updates_broadcaster.send(request));
82 }
83}
84
85#[derive(Accessor, Consumer, Producer)]
86pub struct BlockStateChangeProcessorActor<DB: Clone + Send + Sync + 'static, LDT: KabuDataTypesEVM + 'static> {
87 chain_parameters: ChainParameters,
88 #[accessor]
89 market: Option<SharedState<Market>>,
90 #[accessor]
91 block_history: Option<SharedState<BlockHistory<DB, LDT>>>,
92 #[consumer]
93 market_events_rx: Option<Broadcaster<MarketEvents>>,
94 #[producer]
95 state_updates_tx: Option<Broadcaster<StateUpdateEvent<DB, LDT>>>,
96}
97
98impl<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM> BlockStateChangeProcessorActor<DB, LDT> {
99 pub fn new() -> BlockStateChangeProcessorActor<DB, LDT> {
100 BlockStateChangeProcessorActor {
101 chain_parameters: ChainParameters::ethereum(),
102 market: None,
103 block_history: None,
104 market_events_rx: None,
105 state_updates_tx: None,
106 }
107 }
108
109 pub fn on_bc(self, bc: &Blockchain<LDT>, state: &BlockchainState<DB, LDT>, strategy: &Strategy<DB, LDT>) -> Self {
110 Self {
111 chain_parameters: bc.chain_parameters(),
112 market: Some(bc.market()),
113 market_events_rx: Some(bc.market_events_channel()),
114 state_updates_tx: Some(strategy.state_update_channel()),
115 block_history: Some(state.block_history()),
116 }
117 }
118}
119
120impl<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM> Default for BlockStateChangeProcessorActor<DB, LDT> {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126impl<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypesEVM> Actor for BlockStateChangeProcessorActor<DB, LDT> {
127 fn start(&self) -> ActorResult {
128 let task = tokio::task::spawn(block_state_change_worker(
129 self.chain_parameters.clone(),
130 self.market.clone().unwrap(),
131 self.block_history.clone().unwrap(),
132 self.market_events_rx.clone().unwrap(),
133 self.state_updates_tx.clone().unwrap(),
134 ));
135 Ok(vec![task])
136 }
137
138 fn name(&self) -> &'static str {
139 "BlockStateChangeProcessorActor"
140 }
141}