kabu_strategy_backrun/
arb_actor.rs1use super::{PendingTxStateChangeProcessorActor, StateChangeArbSearcherActor};
2use crate::block_state_change_processor::BlockStateChangeProcessorActor;
3use crate::BackrunConfig;
4use alloy_network::Network;
5use alloy_provider::Provider;
6use influxdb::WriteQuery;
7use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
8use kabu_core_actors_macros::{Accessor, Consumer, Producer};
9use kabu_evm_db::KabuDBError;
10use kabu_node_debug_provider::DebugProviderExt;
11use kabu_types_blockchain::{KabuDataTypesEVM, Mempool};
12use kabu_types_entities::{BlockHistory, LatestBlock, Market, MarketState};
13use kabu_types_events::{MarketEvents, MempoolEvents, MessageHealthEvent, MessageSwapCompose};
14use revm::{Database, DatabaseCommit, DatabaseRef};
15use std::marker::PhantomData;
16use tokio::task::JoinHandle;
17use tracing::info;
18
19#[derive(Accessor, Consumer, Producer)]
20pub struct StateChangeArbActor<P, N, DB: Clone + Send + Sync + 'static, LDT: KabuDataTypesEVM + 'static> {
21 backrun_config: BackrunConfig,
22 client: P,
23 use_blocks: bool,
24 use_mempool: bool,
25 #[accessor]
26 market: Option<SharedState<Market>>,
27 #[accessor]
28 mempool: Option<SharedState<Mempool<LDT>>>,
29 #[accessor]
30 latest_block: Option<SharedState<LatestBlock<LDT>>>,
31 #[accessor]
32 market_state: Option<SharedState<MarketState<DB>>>,
33 #[accessor]
34 block_history: Option<SharedState<BlockHistory<DB, LDT>>>,
35 #[consumer]
36 mempool_events_tx: Option<Broadcaster<MempoolEvents>>,
37 #[consumer]
38 market_events_tx: Option<Broadcaster<MarketEvents>>,
39 #[producer]
40 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
41 #[producer]
42 pool_health_monitor_tx: Option<Broadcaster<MessageHealthEvent>>,
43 #[producer]
44 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
45
46 _n: PhantomData<N>,
47}
48
49impl<P, N, DB, LDT> StateChangeArbActor<P, N, DB, LDT>
50where
51 N: Network,
52 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
53 DB: DatabaseRef + Send + Sync + Clone + 'static,
54 LDT: KabuDataTypesEVM + 'static,
55{
56 pub fn new(client: P, use_blocks: bool, use_mempool: bool, backrun_config: BackrunConfig) -> StateChangeArbActor<P, N, DB, LDT> {
57 StateChangeArbActor {
58 backrun_config,
59 client,
60 use_blocks,
61 use_mempool,
62 market: None,
63 mempool: None,
64 latest_block: None,
65 block_history: None,
66 market_state: None,
67 mempool_events_tx: None,
68 market_events_tx: None,
69 compose_channel_tx: None,
70 pool_health_monitor_tx: None,
71 influxdb_write_channel_tx: None,
72 _n: PhantomData,
73 }
74 }
75}
76
77impl<P, N, DB, LDT> Actor for StateChangeArbActor<P, N, DB, LDT>
78where
79 N: Network<TransactionRequest = LDT::TransactionRequest>,
80 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
81 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
82 LDT: KabuDataTypesEVM + 'static,
83{
84 fn start(&self) -> ActorResult {
85 let searcher_pool_update_channel = Broadcaster::new(100);
86 let mut tasks: Vec<JoinHandle<WorkerResult>> = Vec::new();
87
88 let mut state_update_searcher = StateChangeArbSearcherActor::new(self.backrun_config.clone());
89 match state_update_searcher
90 .access(self.market.clone().unwrap())
91 .consume(searcher_pool_update_channel.clone())
92 .produce(self.compose_channel_tx.clone().unwrap())
93 .produce(self.pool_health_monitor_tx.clone().unwrap())
94 .produce(self.influxdb_write_channel_tx.clone().unwrap())
95 .start()
96 {
97 Err(e) => {
98 panic!("{}", e)
99 }
100 Ok(r) => {
101 tasks.extend(r);
102 info!("State change searcher actor started successfully")
103 }
104 }
105
106 if self.mempool_events_tx.is_some() && self.use_mempool {
107 let mut pending_tx_state_processor = PendingTxStateChangeProcessorActor::new(self.client.clone());
108 match pending_tx_state_processor
109 .access(self.mempool.clone().unwrap())
110 .access(self.latest_block.clone().unwrap())
111 .access(self.market.clone().unwrap())
112 .access(self.market_state.clone().unwrap())
113 .consume(self.mempool_events_tx.clone().unwrap())
114 .consume(self.market_events_tx.clone().unwrap())
115 .produce(searcher_pool_update_channel.clone())
116 .start()
117 {
118 Err(e) => {
119 panic!("{}", e)
120 }
121 Ok(r) => {
122 tasks.extend(r);
123 info!("Pending tx state actor started successfully")
124 }
125 }
126 }
127
128 if self.market_events_tx.is_some() && self.use_blocks {
129 let mut block_state_processor = BlockStateChangeProcessorActor::new();
130 match block_state_processor
131 .access(self.market.clone().unwrap())
132 .access(self.block_history.clone().unwrap())
133 .consume(self.market_events_tx.clone().unwrap())
134 .produce(searcher_pool_update_channel.clone())
135 .start()
136 {
137 Err(e) => {
138 panic!("{}", e)
139 }
140 Ok(r) => {
141 tasks.extend(r);
142 info!("Block change state actor started successfully")
143 }
144 }
145 }
146
147 Ok(tasks)
148 }
149
150 fn name(&self) -> &'static str {
151 "StateChangeArbActor"
152 }
153}