kabu_strategy_backrun/
arb_actor.rs

1use super::{PendingTxStateChangeProcessorActor, StateChangeArbSearcherActor};
2use crate::block_state_change_processor::BlockStateChangeProcessorActor;
3use crate::BackrunConfig;
4use alloy_network::Network;
5use alloy_provider::Provider;
6use influxdb::WriteQuery;
7use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
8use kabu_core_actors_macros::{Accessor, Consumer, Producer};
9use kabu_evm_db::KabuDBError;
10use kabu_node_debug_provider::DebugProviderExt;
11use kabu_types_blockchain::{KabuDataTypesEVM, Mempool};
12use kabu_types_entities::{BlockHistory, LatestBlock, Market, MarketState};
13use kabu_types_events::{MarketEvents, MempoolEvents, MessageHealthEvent, MessageSwapCompose};
14use revm::{Database, DatabaseCommit, DatabaseRef};
15use std::marker::PhantomData;
16use tokio::task::JoinHandle;
17use tracing::info;
18
19#[derive(Accessor, Consumer, Producer)]
20pub struct StateChangeArbActor<P, N, DB: Clone + Send + Sync + 'static, LDT: KabuDataTypesEVM + 'static> {
21    backrun_config: BackrunConfig,
22    client: P,
23    use_blocks: bool,
24    use_mempool: bool,
25    #[accessor]
26    market: Option<SharedState<Market>>,
27    #[accessor]
28    mempool: Option<SharedState<Mempool<LDT>>>,
29    #[accessor]
30    latest_block: Option<SharedState<LatestBlock<LDT>>>,
31    #[accessor]
32    market_state: Option<SharedState<MarketState<DB>>>,
33    #[accessor]
34    block_history: Option<SharedState<BlockHistory<DB, LDT>>>,
35    #[consumer]
36    mempool_events_tx: Option<Broadcaster<MempoolEvents>>,
37    #[consumer]
38    market_events_tx: Option<Broadcaster<MarketEvents>>,
39    #[producer]
40    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
41    #[producer]
42    pool_health_monitor_tx: Option<Broadcaster<MessageHealthEvent>>,
43    #[producer]
44    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
45
46    _n: PhantomData<N>,
47}
48
49impl<P, N, DB, LDT> StateChangeArbActor<P, N, DB, LDT>
50where
51    N: Network,
52    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
53    DB: DatabaseRef + Send + Sync + Clone + 'static,
54    LDT: KabuDataTypesEVM + 'static,
55{
56    pub fn new(client: P, use_blocks: bool, use_mempool: bool, backrun_config: BackrunConfig) -> StateChangeArbActor<P, N, DB, LDT> {
57        StateChangeArbActor {
58            backrun_config,
59            client,
60            use_blocks,
61            use_mempool,
62            market: None,
63            mempool: None,
64            latest_block: None,
65            block_history: None,
66            market_state: None,
67            mempool_events_tx: None,
68            market_events_tx: None,
69            compose_channel_tx: None,
70            pool_health_monitor_tx: None,
71            influxdb_write_channel_tx: None,
72            _n: PhantomData,
73        }
74    }
75}
76
77impl<P, N, DB, LDT> Actor for StateChangeArbActor<P, N, DB, LDT>
78where
79    N: Network<TransactionRequest = LDT::TransactionRequest>,
80    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
81    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
82    LDT: KabuDataTypesEVM + 'static,
83{
84    fn start(&self) -> ActorResult {
85        let searcher_pool_update_channel = Broadcaster::new(100);
86        let mut tasks: Vec<JoinHandle<WorkerResult>> = Vec::new();
87
88        let mut state_update_searcher = StateChangeArbSearcherActor::new(self.backrun_config.clone());
89        match state_update_searcher
90            .access(self.market.clone().unwrap())
91            .consume(searcher_pool_update_channel.clone())
92            .produce(self.compose_channel_tx.clone().unwrap())
93            .produce(self.pool_health_monitor_tx.clone().unwrap())
94            .produce(self.influxdb_write_channel_tx.clone().unwrap())
95            .start()
96        {
97            Err(e) => {
98                panic!("{}", e)
99            }
100            Ok(r) => {
101                tasks.extend(r);
102                info!("State change searcher actor started successfully")
103            }
104        }
105
106        if self.mempool_events_tx.is_some() && self.use_mempool {
107            let mut pending_tx_state_processor = PendingTxStateChangeProcessorActor::new(self.client.clone());
108            match pending_tx_state_processor
109                .access(self.mempool.clone().unwrap())
110                .access(self.latest_block.clone().unwrap())
111                .access(self.market.clone().unwrap())
112                .access(self.market_state.clone().unwrap())
113                .consume(self.mempool_events_tx.clone().unwrap())
114                .consume(self.market_events_tx.clone().unwrap())
115                .produce(searcher_pool_update_channel.clone())
116                .start()
117            {
118                Err(e) => {
119                    panic!("{}", e)
120                }
121                Ok(r) => {
122                    tasks.extend(r);
123                    info!("Pending tx state actor started successfully")
124                }
125            }
126        }
127
128        if self.market_events_tx.is_some() && self.use_blocks {
129            let mut block_state_processor = BlockStateChangeProcessorActor::new();
130            match block_state_processor
131                .access(self.market.clone().unwrap())
132                .access(self.block_history.clone().unwrap())
133                .consume(self.market_events_tx.clone().unwrap())
134                .produce(searcher_pool_update_channel.clone())
135                .start()
136            {
137                Err(e) => {
138                    panic!("{}", e)
139                }
140                Ok(r) => {
141                    tasks.extend(r);
142                    info!("Block change state actor started successfully")
143                }
144            }
145        }
146
147        Ok(tasks)
148    }
149
150    fn name(&self) -> &'static str {
151        "StateChangeArbActor"
152    }
153}