1use std::collections::{BTreeMap, HashSet};
2use std::sync::Arc;
3
4use alloy_primitives::U256;
5#[cfg(not(debug_assertions))]
6use chrono::TimeDelta;
7use eyre::{eyre, Result};
8use influxdb::{Timestamp, WriteQuery};
9use rayon::prelude::*;
10use rayon::{ThreadPool, ThreadPoolBuilder};
11use revm::{Database, DatabaseCommit, DatabaseRef};
12use tokio::sync::broadcast::error::RecvError;
13#[cfg(not(debug_assertions))]
14use tracing::warn;
15use tracing::{debug, error, info};
16
17use crate::{BackrunConfig, SwapCalculator};
18use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
19use kabu_core_actors_macros::{Accessor, Consumer, Producer};
20use kabu_core_blockchain::{Blockchain, Strategy};
21use kabu_evm_db::{DatabaseHelpers, KabuDBError};
22use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEVM};
23use kabu_types_entities::strategy_config::StrategyConfig;
24use kabu_types_entities::{Market, PoolWrapper, Swap, SwapDirection, SwapError, SwapLine, SwapPath};
25use kabu_types_events::{
26 BestTxSwapCompose, HealthEvent, Message, MessageHealthEvent, MessageSwapCompose, StateUpdateEvent, SwapComposeData, SwapComposeMessage,
27 TxComposeData,
28};
29
30async fn state_change_arb_searcher_task<
31 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
32 LDT: KabuDataTypesEVM,
33>(
34 thread_pool: Arc<ThreadPool>,
35 backrun_config: BackrunConfig,
36 state_update_event: StateUpdateEvent<DB, LDT>,
37 market: SharedState<Market>,
38 swap_request_tx: Broadcaster<MessageSwapCompose<DB, LDT>>,
39 pool_health_monitor_tx: Broadcaster<MessageHealthEvent>,
40 influxdb_write_channel_tx: Broadcaster<WriteQuery>,
41) -> Result<()> {
42 debug!("Message received {} stuffing : {:?}", state_update_event.origin, state_update_event.stuffing_tx_hash());
43
44 let mut db = state_update_event.market_state().clone();
45 DatabaseHelpers::apply_geth_state_update_vec(&mut db, state_update_event.state_update().clone());
46
47 let start_time_utc = chrono::Utc::now();
48
49 let start_time = std::time::Instant::now();
50 #[allow(clippy::mutable_key_type)]
51 let mut swap_path_set: HashSet<SwapPath> = HashSet::new();
52
53 let market_guard_read = market.read().await;
54 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.read acquired");
55
56 for (pool, v) in state_update_event.directions().iter() {
57 let pool_paths: Vec<SwapPath> = match market_guard_read.get_pool_paths(&pool.get_pool_id()) {
58 Some(paths) => {
59 paths
67 .into_iter()
68 .enumerate()
69 .filter(|(idx, swap_path)| {
70 *idx < 100 || swap_path.score.unwrap_or_default() > 0.97
71 })
73 .map(|(_, swap_path)| swap_path)
74 .collect::<Vec<_>>()
75 }
76
77 None => {
78 let mut pool_direction: BTreeMap<PoolWrapper, Vec<SwapDirection>> = BTreeMap::new();
79 pool_direction.insert(pool.clone(), v.clone());
80 market_guard_read.build_swap_path_vec(&pool_direction).unwrap_or_default()
81 }
82 };
83
84 for pool_path in pool_paths {
85 swap_path_set.insert(pool_path);
86 }
87 }
88 drop(market_guard_read);
89 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.read released");
90
91 let swap_path_vec: Vec<SwapPath> = swap_path_set.into_iter().collect();
92
93 if swap_path_vec.is_empty() {
94 debug!(
95 request=?state_update_event.stuffing_txs_hashes().first(),
96 elapsed=start_time.elapsed().as_micros(),
97 "No swap path built",
98
99 );
100 return Err(eyre!("NO_SWAP_PATHS"));
101 }
102 info!("Calculation started: swap_path_vec_len={} elapsed={}", swap_path_vec.len(), start_time.elapsed().as_micros());
103
104 let channel_len = swap_path_vec.len();
105 let (swap_path_tx, mut swap_line_rx) = tokio::sync::mpsc::channel(channel_len);
106
107 let market_state_clone = db.clone();
108 let swap_path_vec_len = swap_path_vec.len();
109
110 let _tasks = tokio::task::spawn(async move {
111 thread_pool.install(|| {
112 swap_path_vec.into_par_iter().for_each_with((&swap_path_tx, &market_state_clone), |_req, item| {
113 let mut mut_item: SwapLine = SwapLine { path: item, ..Default::default() };
114
115 let calc_result = SwapCalculator::calculate(&mut mut_item, &market_state_clone);
116
117 match calc_result {
118 Ok(_) => {
119 debug!("Calc result received: {}", mut_item);
120
121 if let Ok(profit) = mut_item.profit() {
122 if profit.is_positive() && mut_item.abs_profit_eth() > U256::from(state_update_event.next_base_fee * 100_000) {
123 if let Err(error) = swap_path_tx.try_send(Ok(mut_item)) {
124 error!(%error, "swap_path_tx.try_send")
125 }
126 } else {
127 debug!("profit is not enough")
128 }
129 }
130 }
131 Err(e) => {
132 debug!("Swap error: {:?}", e);
133
134 if let Err(error) = swap_path_tx.try_send(Err(e)) {
135 error!(%error, "try_send to swap_path_tx")
136 }
137 }
138 }
139 });
140 });
141 debug!(elapsed = start_time.elapsed().as_micros(), "Calculation iteration finished");
142 });
143
144 debug!(elapsed = start_time.elapsed().as_micros(), "Calculation results receiver started");
145
146 let swap_request_tx_clone = swap_request_tx.clone();
147 let pool_health_monitor_tx_clone = pool_health_monitor_tx.clone();
148
149 let mut answers = 0;
150
151 let mut best_answers = BestTxSwapCompose::new_with_pct(U256::from(9000));
152
153 let mut failed_pools: HashSet<SwapError> = HashSet::new();
154
155 while let Some(swap_line_result) = swap_line_rx.recv().await {
156 match swap_line_result {
157 Ok(swap_line) => {
158 let prepare_request = SwapComposeMessage::Prepare(SwapComposeData {
159 tx_compose: TxComposeData::<LDT> {
160 eoa: backrun_config.eoa(),
161 next_block_number: state_update_event.next_block_number,
162 next_block_timestamp: state_update_event.next_block_timestamp,
163 next_block_base_fee: state_update_event.next_base_fee,
164 gas: swap_line.gas_used.unwrap_or(300000),
165 stuffing_txs: state_update_event.stuffing_txs().clone(),
166 stuffing_txs_hashes: state_update_event.stuffing_txs_hashes.clone(),
167 ..TxComposeData::default()
168 },
169 swap: Swap::BackrunSwapLine(swap_line),
170 origin: Some(state_update_event.origin.clone()),
171 tips_pct: Some(state_update_event.tips_pct),
172 poststate: Some(db.clone()),
173 poststate_update: Some(state_update_event.state_update().clone()),
174 ..SwapComposeData::default()
175 });
176
177 if !backrun_config.smart() || best_answers.check(&prepare_request) {
178 if let Err(e) = swap_request_tx_clone.send(Message::new(prepare_request)) {
179 error!("swap_request_tx_clone.send {}", e)
180 }
181 }
182 }
183 Err(swap_error) => {
184 if failed_pools.insert(swap_error.clone()) {
185 if let Err(e) = pool_health_monitor_tx_clone.send(Message::new(HealthEvent::PoolSwapError(swap_error))) {
186 error!("try_send to pool_health_monitor error : {:?}", e)
187 }
188 }
189 }
190 }
191
192 answers += 1;
193 }
194
195 let stuffing_tx_hash = state_update_event.stuffing_tx_hash();
196 let elapsed = start_time.elapsed().as_micros();
197 info!(
198 origin = %state_update_event.origin,
199 swap_path_vec_len,
200 answers,
201 elapsed,
202 stuffing_hash = %stuffing_tx_hash,
203 "Calculation finished"
204 );
205
206 let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "calculations")
207 .add_field("calculations", swap_path_vec_len as u64)
208 .add_field("answers", answers as u64)
209 .add_field("elapsed", elapsed as u64)
210 .add_tag("origin", state_update_event.origin)
211 .add_tag("stuffing", stuffing_tx_hash.to_string());
212
213 if let Err(e) = influxdb_write_channel_tx.send(write_query) {
214 error!("Failed to send block latency to influxdb: {:?}", e);
215 }
216
217 Ok(())
218}
219
220pub async fn state_change_arb_searcher_worker<
221 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
222 LDT: KabuDataTypesEVM,
223>(
224 backrun_config: BackrunConfig,
225 market: SharedState<Market>,
226 search_request_rx: Broadcaster<StateUpdateEvent<DB, LDT>>,
227 swap_request_tx: Broadcaster<MessageSwapCompose<DB, LDT>>,
228 pool_health_monitor_tx: Broadcaster<MessageHealthEvent>,
229 influxdb_write_channel_tx: Broadcaster<WriteQuery>,
230) -> WorkerResult {
231 subscribe!(search_request_rx);
232
233 let cpus = num_cpus::get();
234 let tasks = (cpus * 5) / 10;
235 info!("Starting state arb searcher cpus={cpus}, tasks={tasks}");
236 let thread_pool = Arc::new(ThreadPoolBuilder::new().num_threads(tasks).build()?);
237
238 loop {
239 tokio::select! {
240 msg = search_request_rx.recv() => {
241 let pool_update_msg : Result<StateUpdateEvent<DB, LDT>, RecvError> = msg;
242 if let Ok(msg) = pool_update_msg {
243 tokio::task::spawn(
244 state_change_arb_searcher_task(
245 thread_pool.clone(),
246 backrun_config.clone(),
247 msg,
248 market.clone(),
249 swap_request_tx.clone(),
250 pool_health_monitor_tx.clone(),
251 influxdb_write_channel_tx.clone(),
252 )
253 );
254 }
255 }
256 }
257 }
258}
259
260#[derive(Accessor, Consumer, Producer)]
261pub struct StateChangeArbSearcherActor<DB: Clone + Send + Sync + 'static, LDT: KabuDataTypes + 'static> {
262 backrun_config: BackrunConfig,
263 #[accessor]
264 market: Option<SharedState<Market>>,
265 #[consumer]
266 state_update_rx: Option<Broadcaster<StateUpdateEvent<DB, LDT>>>,
267 #[producer]
268 compose_tx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
269 #[producer]
270 pool_health_monitor_tx: Option<Broadcaster<MessageHealthEvent>>,
271 #[producer]
272 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
273}
274
275impl<
276 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
277 LDT: KabuDataTypesEVM + 'static,
278 > StateChangeArbSearcherActor<DB, LDT>
279{
280 pub fn new(backrun_config: BackrunConfig) -> StateChangeArbSearcherActor<DB, LDT> {
281 StateChangeArbSearcherActor {
282 backrun_config,
283 market: None,
284 state_update_rx: None,
285 compose_tx: None,
286 pool_health_monitor_tx: None,
287 influxdb_write_channel_tx: None,
288 }
289 }
290
291 pub fn on_bc(self, bc: &Blockchain<LDT>, strategy: &Strategy<DB, LDT>) -> Self {
292 Self {
293 market: Some(bc.market()),
294 pool_health_monitor_tx: Some(bc.health_monitor_channel()),
295 compose_tx: Some(strategy.swap_compose_channel()),
296 state_update_rx: Some(strategy.state_update_channel()),
297 influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
298 ..self
299 }
300 }
301}
302
303impl<
304 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
305 LDT: KabuDataTypesEVM + 'static,
306 > Actor for StateChangeArbSearcherActor<DB, LDT>
307{
308 fn start(&self) -> ActorResult {
309 let task = tokio::task::spawn(state_change_arb_searcher_worker(
310 self.backrun_config.clone(),
311 self.market.clone().unwrap(),
312 self.state_update_rx.clone().unwrap(),
313 self.compose_tx.clone().unwrap(),
314 self.pool_health_monitor_tx.clone().unwrap(),
315 self.influxdb_write_channel_tx.clone().unwrap(),
316 ));
317 Ok(vec![task])
318 }
319
320 fn name(&self) -> &'static str {
321 "StateChangeArbSearcherActor"
322 }
323}