1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::sync::Arc;
5
6use alloy_eips::BlockNumberOrTag;
7use alloy_network::Network;
8use alloy_primitives::{Address, TxHash, U256};
9use alloy_provider::Provider;
10use alloy_rpc_types::state::StateOverride;
11use alloy_rpc_types::{BlockOverrides, Transaction, TransactionRequest};
12use alloy_rpc_types_trace::geth::GethDebugTracingCallOptions;
13use eyre::{eyre, Result};
14use lazy_static::lazy_static;
15use revm::{Context, Database, DatabaseCommit, DatabaseRef, MainBuilder, MainContext};
16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, trace};
19
20use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
21use kabu_core_actors_macros::{Accessor, Consumer, Producer};
22use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
23use kabu_evm_db::{DatabaseHelpers, KabuDBError};
24use kabu_evm_utils::evm_env::tx_req_to_env;
25use kabu_evm_utils::evm_transact;
26use kabu_node_debug_provider::DebugProviderExt;
27use kabu_types_blockchain::{debug_trace_call_pre_state, GethStateUpdate, GethStateUpdateVec, KabuDataTypes, KabuTx, TRACING_CALL_OPTS};
28use kabu_types_entities::{DataFetcher, FetchState, LatestBlock, MarketState, Swap};
29use kabu_types_events::{MarketEvents, MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
30use revm::context::{BlockEnv, ContextTr};
31
32lazy_static! {
33 static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
34}
35
36fn get_merge_list<'a, DB: Clone + 'static>(
37 request: &SwapComposeData<DB>,
38 swap_paths: &'a HashMap<TxHash, Vec<SwapComposeData<DB>>>,
39) -> Vec<&'a SwapComposeData<DB>> {
40 let swap_line = if let Swap::BackrunSwapLine(swap_line) = &request.swap {
42 swap_line
43 } else {
44 return Vec::new();
45 };
46
47 let swap_stuffing_hash = request.first_stuffing_hash();
48
49 let mut ret: Vec<&SwapComposeData<DB>> = swap_paths
50 .iter()
51 .filter_map(|(k, v)| {
52 if *k != swap_stuffing_hash {
53 v.iter().find(|a| if let Swap::BackrunSwapLine(a_line) = &a.swap { a_line.path == swap_line.path } else { false })
54 } else {
55 None
56 }
57 })
58 .collect();
59
60 ret.sort_by(|a, b| b.swap.arb_profit_eth().cmp(&a.swap.arb_profit_eth()));
61
62 ret
63}
64
65async fn same_path_merger_task<P, N, DB>(
66 client: P,
67 stuffing_txes: Vec<Transaction>,
68 pre_states: Arc<RwLock<DataFetcher<TxHash, GethStateUpdate>>>,
69 market_state: SharedState<MarketState<DB>>,
70 call_opts: GethDebugTracingCallOptions,
71 request: SwapComposeData<DB>,
72 swap_request_tx: Broadcaster<MessageSwapCompose<DB>>,
73) -> Result<()>
74where
75 N: Network<TransactionRequest = TransactionRequest>,
76 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
77 DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
78{
79 debug!("same_path_merger_task stuffing_txs len {}", stuffing_txes.len());
80
81 let mut prestate_guard = pre_states.write().await;
82
83 let mut stuffing_state_locks: Vec<(Transaction, FetchState<GethStateUpdate>)> = Vec::new();
84
85 for tx in stuffing_txes.into_iter() {
86 let client_clone = client.clone(); let tx_clone = tx.clone();
88 let tx_hash: TxHash = tx.get_tx_hash();
89 let call_opts_clone = call_opts.clone();
90
91 let lock = prestate_guard
92 .fetch(tx_hash, |_tx_hash| async move {
93 debug_trace_call_pre_state(client_clone, tx_clone, BlockNumberOrTag::Latest.into(), Some(call_opts_clone)).await
94 })
95 .await;
96
97 stuffing_state_locks.push((tx, lock));
98 }
99
100 drop(prestate_guard);
101
102 let mut stuffing_states: Vec<(Transaction, GethStateUpdate)> = Vec::new();
103
104 for (tx, lock) in stuffing_state_locks.into_iter() {
105 if let FetchState::Fetching(lock) = lock {
106 if let Some(t) = lock.read().await.deref() {
107 stuffing_states.push((tx, t.clone()));
108 }
109 }
110 }
111
112 let mut tx_order: Vec<usize> = (0..stuffing_states.len()).collect();
113
114 let mut changing: Option<usize> = None;
115 let mut counter = 0;
116
117 let db_org = market_state.read().await.state_db.clone();
118
119 let rdb: Option<DB> = loop {
120 counter += 1;
121 if counter > 10 {
122 break None;
123 }
124
125 let mut ok = true;
126
127 let tx_and_state: Vec<&(Transaction, GethStateUpdate)> = tx_order.iter().map(|i| stuffing_states.get(*i).unwrap()).collect();
128
129 let states: GethStateUpdateVec = tx_and_state.iter().map(|(_tx, state)| state.clone()).collect();
130
131 let mut db = db_org.clone();
132
133 DatabaseHelpers::apply_geth_state_update_vec(&mut db, states);
134
135 let block_env = BlockEnv {
136 number: U256::from(request.tx_compose.next_block_number),
137 timestamp: U256::from(request.tx_compose.next_block_timestamp),
138 basefee: request.tx_compose.next_block_base_fee,
139 ..Default::default()
140 };
141
142 let mut evm = Context::mainnet().with_db(db).with_block(block_env).build_mainnet();
143
144 for (idx, tx_idx) in tx_order.clone().iter().enumerate() {
145 let tx = &stuffing_states[*tx_idx].0;
147
148 let tx_req: TransactionRequest = tx.to_transaction_request();
149 let tx_env = tx_req_to_env(tx_req);
150
151 match evm_transact(&mut evm, tx_env) {
154 Ok(_c) => {
155 trace!("Transaction {} committed successfully {:?}", idx, tx.get_tx_hash());
156 }
157 Err(e) => {
158 error!("Transaction {} {:?} commit error: {}", idx, tx.get_tx_hash(), e);
159 match changing {
160 Some(changing_idx) => {
161 if (changing_idx == idx && idx == 0) || (changing_idx == idx - 1) {
162 tx_order.remove(changing_idx);
163 trace!("Removing Some {idx} {changing_idx}");
164 changing = None;
165 } else if idx < tx_order.len() && idx > 0 {
166 tx_order.swap(idx, idx - 1);
167 trace!("Swapping Some {idx} {changing_idx}");
168 changing = Some(idx - 1)
169 }
170 }
171 None => {
172 if idx > 0 {
173 trace!("Swapping None {idx}");
174 tx_order.swap(idx, idx - 1);
175 changing = Some(idx - 1)
176 } else {
177 trace!("Removing None {idx}");
178 tx_order.remove(0);
179 changing = None
180 }
181 }
182 }
183 ok = false;
184 break;
185 }
186 }
187 }
188
189 if ok {
190 debug!("Transaction sequence found {tx_order:?}");
191 let db = evm.ctx.db_ref().clone();
193 break Some(db);
194 }
195 };
196
197 if tx_order.len() < 2 {
198 return Err(eyre!("NOT_MERGED"));
199 }
200
201 if let Some(db) = rdb {
202 let _block_env = BlockEnv {
203 number: U256::from(request.tx_compose.next_block_number),
204 timestamp: U256::from(request.tx_compose.next_block_timestamp),
205 basefee: request.tx_compose.next_block_base_fee,
206 ..Default::default()
207 };
208
209 if let Swap::BackrunSwapLine(swap_line) = request.swap.clone() {
210 let first_token = swap_line.get_first_token().unwrap();
211 let _amount_in = first_token.calc_token_value_from_eth(U256::from(10).pow(U256::from(17))).unwrap();
212
213 match Ok::<(), eyre::Error>(()) {
215 Ok(_r) => {
216 let encode_request = MessageSwapCompose::prepare(SwapComposeData {
217 tx_compose: TxComposeData {
218 stuffing_txs_hashes: tx_order.iter().map(|i| stuffing_states[*i].0.get_tx_hash()).collect(),
219 stuffing_txs: tx_order.iter().map(|i| stuffing_states[*i].0.clone()).collect(),
220 ..request.tx_compose
221 },
222 swap: Swap::BackrunSwapLine(swap_line.clone()),
223 origin: Some("samepath_merger".to_string()),
224 tips_pct: None,
225 poststate: Some(db),
226 poststate_update: None,
227 ..request
228 });
229
230 if let Err(e) = swap_request_tx.send(encode_request) {
231 error!("{}", e)
232 }
233 info!("+++ Calculation finished {swap_line}");
234 }
235 Err(e) => {
236 error!("optimization error : {e:?}")
237 }
238 }
239 }
240 }
241
242 trace!("same_path_merger_task stuffing_states len {}", stuffing_states.len());
243
244 Ok(())
245}
246
247async fn same_path_merger_worker<
248 N: Network<TransactionRequest = TransactionRequest>,
249 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
250 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
251>(
252 client: P,
253 latest_block: SharedState<LatestBlock>,
254 market_state: SharedState<MarketState<DB>>,
255 market_events_rx: Broadcaster<MarketEvents>,
256 compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
257 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
258) -> WorkerResult {
259 subscribe!(market_events_rx);
260 subscribe!(compose_channel_rx);
261
262 let mut swap_paths: HashMap<TxHash, Vec<SwapComposeData<DB>>> = HashMap::new();
263
264 let prestate = Arc::new(RwLock::new(DataFetcher::<TxHash, GethStateUpdate>::new()));
265
266 let mut cur_next_base_fee: u64 = 0;
269 let mut cur_block_number: Option<alloy_primitives::BlockNumber> = None;
270 let mut cur_block_time: Option<u64> = None;
271 let mut cur_state_override: StateOverride = StateOverride::default();
272
273 loop {
274 tokio::select! {
275 msg = market_events_rx.recv() => {
276 if let Ok(msg) = msg {
277 let market_event_msg : MarketEvents = msg;
278 if let MarketEvents::BlockHeaderUpdate{block_number, block_hash, base_fee, next_base_fee, timestamp} = market_event_msg {
279 debug!("Block header update {} {} base_fee {} ", block_number, block_hash, base_fee);
280 cur_block_number = Some( block_number + 1);
281 cur_block_time = Some(timestamp + 12 );
282 cur_next_base_fee = next_base_fee;
283 *prestate.write().await = DataFetcher::<TxHash, GethStateUpdate>::new();
285 swap_paths = HashMap::new();
286
287 let new_block_hash = block_hash;
288
289 for _counter in 0..5 {
290 if let Ok(MarketEvents::BlockStateUpdate{block_hash}) = market_events_rx.recv().await {
291 if new_block_hash == block_hash {
292 cur_state_override = latest_block.read().await.node_state_override();
293 debug!("Block state update received {} {}", block_number, block_hash);
294 break;
295 }
296 }
297 }
298 }
299 }
300 }
301
302
303 msg = compose_channel_rx.recv() => {
304 let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
305 match msg {
306 Ok(compose_request)=>{
307 if let SwapComposeMessage::Ready(sign_request) = compose_request.inner() {
308
309 if sign_request.tx_compose.stuffing_txs_hashes.len() == 1 {
310 if let Swap::BackrunSwapLine( _swap_line ) = &sign_request.swap {
311 let stuffing_tx_hash = sign_request.first_stuffing_hash();
312
313 let requests_vec = get_merge_list(sign_request, &swap_paths);
314 if !requests_vec.is_empty() {
315
316 let mut stuffing_txs : Vec<Transaction> = vec![sign_request.tx_compose.stuffing_txs[0].clone()];
317 stuffing_txs.extend( requests_vec.iter().map(|r| r.tx_compose.stuffing_txs[0].clone() ).collect::<Vec<Transaction>>());
318 let client_clone = client.clone();
319 let prestate_clone = prestate.clone();
320
321 let call_opts : GethDebugTracingCallOptions = GethDebugTracingCallOptions{
322 block_overrides : Some(BlockOverrides {
323 number : Some( U256::from(cur_block_number.unwrap_or_default())),
324 time : Some(cur_block_time.unwrap_or_default()),
325 coinbase : Some(*COINBASE),
326 base_fee : Some(U256::from(cur_next_base_fee)),
327 ..Default::default()
328 }),
329 state_overrides : Some(cur_state_override.clone()),
330 ..TRACING_CALL_OPTS.clone()
331 };
332
333 tokio::task::spawn(
334 same_path_merger_task(
335 client_clone,
336 stuffing_txs,
337 prestate_clone,
338 market_state.clone(),
339 call_opts,
340 sign_request.clone(),
341 compose_channel_tx.clone()
342 )
343 );
344 }
345
346 let e = swap_paths.entry(stuffing_tx_hash).or_default();
347 e.push( sign_request.clone() );
348
349 }
350 }
351 }
352 },
353 Err(e)=>{
354 error!("{e}")
355 }
356 }
357 }
358 }
359 }
360}
361
362#[derive(Consumer, Producer, Accessor)]
363pub struct SamePathMergerActor<P, N, DB: Send + Sync + Clone + 'static> {
364 client: P,
365 #[accessor]
367 market_state: Option<SharedState<MarketState<DB>>>,
368 #[accessor]
369 latest_block: Option<SharedState<LatestBlock>>,
370 #[consumer]
371 market_events: Option<Broadcaster<MarketEvents>>,
372 #[consumer]
373 compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
374 #[producer]
375 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
376 _n: PhantomData<N>,
377}
378
379impl<P, N, DB> SamePathMergerActor<P, N, DB>
380where
381 N: Network,
382 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
383 DB: DatabaseRef<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
384{
385 pub fn new(client: P) -> Self {
386 Self {
387 client,
388 market_state: None,
389 latest_block: None,
390 market_events: None,
391 compose_channel_rx: None,
392 compose_channel_tx: None,
393 _n: PhantomData,
394 }
395 }
396
397 pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>, strategy: &Strategy<DB>) -> Self {
398 Self {
399 market_state: Some(state.market_state_commit()),
400 latest_block: Some(bc.latest_block()),
401 market_events: Some(bc.market_events_channel()),
402 compose_channel_tx: Some(strategy.swap_compose_channel()),
403 compose_channel_rx: Some(strategy.swap_compose_channel()),
404 ..self
405 }
406 }
407}
408
409impl<P, N, DB> Actor for SamePathMergerActor<P, N, DB>
410where
411 N: Network<TransactionRequest = TransactionRequest>,
412 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
413 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
414{
415 fn start(&self) -> ActorResult {
416 let task = tokio::task::spawn(same_path_merger_worker(
417 self.client.clone(),
418 self.latest_block.clone().unwrap(),
419 self.market_state.clone().unwrap(),
420 self.market_events.clone().unwrap(),
421 self.compose_channel_rx.clone().unwrap(),
422 self.compose_channel_tx.clone().unwrap(),
423 ));
424 Ok(vec![task])
425 }
426
427 fn name(&self) -> &'static str {
428 "SamePathMergerActor"
429 }
430}