kabu_strategy_merger/
samepath_merger_actor.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::sync::Arc;
5
6use alloy_eips::BlockNumberOrTag;
7use alloy_network::Network;
8use alloy_primitives::{Address, TxHash, U256};
9use alloy_provider::Provider;
10use alloy_rpc_types::state::StateOverride;
11use alloy_rpc_types::{BlockOverrides, Transaction, TransactionRequest};
12use alloy_rpc_types_trace::geth::GethDebugTracingCallOptions;
13use eyre::{eyre, Result};
14use lazy_static::lazy_static;
15use revm::{Context, Database, DatabaseCommit, DatabaseRef, MainBuilder, MainContext};
16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, trace};
19
20use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
21use kabu_core_actors_macros::{Accessor, Consumer, Producer};
22use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
23use kabu_evm_db::{DatabaseHelpers, KabuDBError};
24use kabu_evm_utils::evm_env::tx_req_to_env;
25use kabu_evm_utils::evm_transact;
26use kabu_node_debug_provider::DebugProviderExt;
27use kabu_types_blockchain::{debug_trace_call_pre_state, GethStateUpdate, GethStateUpdateVec, KabuDataTypes, KabuTx, TRACING_CALL_OPTS};
28use kabu_types_entities::{DataFetcher, FetchState, LatestBlock, MarketState, Swap};
29use kabu_types_events::{MarketEvents, MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
30use revm::context::{BlockEnv, ContextTr};
31
32lazy_static! {
33    static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
34}
35
36fn get_merge_list<'a, DB: Clone + 'static>(
37    request: &SwapComposeData<DB>,
38    swap_paths: &'a HashMap<TxHash, Vec<SwapComposeData<DB>>>,
39) -> Vec<&'a SwapComposeData<DB>> {
40    //let mut ret : Vec<&TxComposeData> = Vec::new();
41    let swap_line = if let Swap::BackrunSwapLine(swap_line) = &request.swap {
42        swap_line
43    } else {
44        return Vec::new();
45    };
46
47    let swap_stuffing_hash = request.first_stuffing_hash();
48
49    let mut ret: Vec<&SwapComposeData<DB>> = swap_paths
50        .iter()
51        .filter_map(|(k, v)| {
52            if *k != swap_stuffing_hash {
53                v.iter().find(|a| if let Swap::BackrunSwapLine(a_line) = &a.swap { a_line.path == swap_line.path } else { false })
54            } else {
55                None
56            }
57        })
58        .collect();
59
60    ret.sort_by(|a, b| b.swap.arb_profit_eth().cmp(&a.swap.arb_profit_eth()));
61
62    ret
63}
64
65async fn same_path_merger_task<P, N, DB>(
66    client: P,
67    stuffing_txes: Vec<Transaction>,
68    pre_states: Arc<RwLock<DataFetcher<TxHash, GethStateUpdate>>>,
69    market_state: SharedState<MarketState<DB>>,
70    call_opts: GethDebugTracingCallOptions,
71    request: SwapComposeData<DB>,
72    swap_request_tx: Broadcaster<MessageSwapCompose<DB>>,
73) -> Result<()>
74where
75    N: Network<TransactionRequest = TransactionRequest>,
76    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
77    DB: Database<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
78{
79    debug!("same_path_merger_task stuffing_txs len {}", stuffing_txes.len());
80
81    let mut prestate_guard = pre_states.write().await;
82
83    let mut stuffing_state_locks: Vec<(Transaction, FetchState<GethStateUpdate>)> = Vec::new();
84
85    for tx in stuffing_txes.into_iter() {
86        let client_clone = client.clone(); //Pin::new(Box::new(client.clone()));
87        let tx_clone = tx.clone();
88        let tx_hash: TxHash = tx.get_tx_hash();
89        let call_opts_clone = call_opts.clone();
90
91        let lock = prestate_guard
92            .fetch(tx_hash, |_tx_hash| async move {
93                debug_trace_call_pre_state(client_clone, tx_clone, BlockNumberOrTag::Latest.into(), Some(call_opts_clone)).await
94            })
95            .await;
96
97        stuffing_state_locks.push((tx, lock));
98    }
99
100    drop(prestate_guard);
101
102    let mut stuffing_states: Vec<(Transaction, GethStateUpdate)> = Vec::new();
103
104    for (tx, lock) in stuffing_state_locks.into_iter() {
105        if let FetchState::Fetching(lock) = lock {
106            if let Some(t) = lock.read().await.deref() {
107                stuffing_states.push((tx, t.clone()));
108            }
109        }
110    }
111
112    let mut tx_order: Vec<usize> = (0..stuffing_states.len()).collect();
113
114    let mut changing: Option<usize> = None;
115    let mut counter = 0;
116
117    let db_org = market_state.read().await.state_db.clone();
118
119    let rdb: Option<DB> = loop {
120        counter += 1;
121        if counter > 10 {
122            break None;
123        }
124
125        let mut ok = true;
126
127        let tx_and_state: Vec<&(Transaction, GethStateUpdate)> = tx_order.iter().map(|i| stuffing_states.get(*i).unwrap()).collect();
128
129        let states: GethStateUpdateVec = tx_and_state.iter().map(|(_tx, state)| state.clone()).collect();
130
131        let mut db = db_org.clone();
132
133        DatabaseHelpers::apply_geth_state_update_vec(&mut db, states);
134
135        let block_env = BlockEnv {
136            number: U256::from(request.tx_compose.next_block_number),
137            timestamp: U256::from(request.tx_compose.next_block_timestamp),
138            basefee: request.tx_compose.next_block_base_fee,
139            ..Default::default()
140        };
141
142        let mut evm = Context::mainnet().with_db(db).with_block(block_env).build_mainnet();
143
144        for (idx, tx_idx) in tx_order.clone().iter().enumerate() {
145            // set tx context for evm
146            let tx = &stuffing_states[*tx_idx].0;
147
148            let tx_req: TransactionRequest = tx.to_transaction_request();
149            let tx_env = tx_req_to_env(tx_req);
150
151            // TODO: EVM transact functionality is placeholder for now
152
153            match evm_transact(&mut evm, tx_env) {
154                Ok(_c) => {
155                    trace!("Transaction {} committed successfully {:?}", idx, tx.get_tx_hash());
156                }
157                Err(e) => {
158                    error!("Transaction {} {:?} commit error: {}", idx, tx.get_tx_hash(), e);
159                    match changing {
160                        Some(changing_idx) => {
161                            if (changing_idx == idx && idx == 0) || (changing_idx == idx - 1) {
162                                tx_order.remove(changing_idx);
163                                trace!("Removing Some {idx} {changing_idx}");
164                                changing = None;
165                            } else if idx < tx_order.len() && idx > 0 {
166                                tx_order.swap(idx, idx - 1);
167                                trace!("Swapping Some {idx} {changing_idx}");
168                                changing = Some(idx - 1)
169                            }
170                        }
171                        None => {
172                            if idx > 0 {
173                                trace!("Swapping None {idx}");
174                                tx_order.swap(idx, idx - 1);
175                                changing = Some(idx - 1)
176                            } else {
177                                trace!("Removing None {idx}");
178                                tx_order.remove(0);
179                                changing = None
180                            }
181                        }
182                    }
183                    ok = false;
184                    break;
185                }
186            }
187        }
188
189        if ok {
190            debug!("Transaction sequence found {tx_order:?}");
191            // TODO: Consume DB properly
192            let db = evm.ctx.db_ref().clone();
193            break Some(db);
194        }
195    };
196
197    if tx_order.len() < 2 {
198        return Err(eyre!("NOT_MERGED"));
199    }
200
201    if let Some(db) = rdb {
202        let _block_env = BlockEnv {
203            number: U256::from(request.tx_compose.next_block_number),
204            timestamp: U256::from(request.tx_compose.next_block_timestamp),
205            basefee: request.tx_compose.next_block_base_fee,
206            ..Default::default()
207        };
208
209        if let Swap::BackrunSwapLine(swap_line) = request.swap.clone() {
210            let first_token = swap_line.get_first_token().unwrap();
211            let _amount_in = first_token.calc_token_value_from_eth(U256::from(10).pow(U256::from(17))).unwrap();
212
213            // TODO: Update optimize_with_in_amount to work with KabuEVMWrapper
214            match Ok::<(), eyre::Error>(()) {
215                Ok(_r) => {
216                    let encode_request = MessageSwapCompose::prepare(SwapComposeData {
217                        tx_compose: TxComposeData {
218                            stuffing_txs_hashes: tx_order.iter().map(|i| stuffing_states[*i].0.get_tx_hash()).collect(),
219                            stuffing_txs: tx_order.iter().map(|i| stuffing_states[*i].0.clone()).collect(),
220                            ..request.tx_compose
221                        },
222                        swap: Swap::BackrunSwapLine(swap_line.clone()),
223                        origin: Some("samepath_merger".to_string()),
224                        tips_pct: None,
225                        poststate: Some(db),
226                        poststate_update: None,
227                        ..request
228                    });
229
230                    if let Err(e) = swap_request_tx.send(encode_request) {
231                        error!("{}", e)
232                    }
233                    info!("+++ Calculation finished {swap_line}");
234                }
235                Err(e) => {
236                    error!("optimization error : {e:?}")
237                }
238            }
239        }
240    }
241
242    trace!("same_path_merger_task stuffing_states len {}", stuffing_states.len());
243
244    Ok(())
245}
246
247async fn same_path_merger_worker<
248    N: Network<TransactionRequest = TransactionRequest>,
249    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
250    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
251>(
252    client: P,
253    latest_block: SharedState<LatestBlock>,
254    market_state: SharedState<MarketState<DB>>,
255    market_events_rx: Broadcaster<MarketEvents>,
256    compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
257    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
258) -> WorkerResult {
259    subscribe!(market_events_rx);
260    subscribe!(compose_channel_rx);
261
262    let mut swap_paths: HashMap<TxHash, Vec<SwapComposeData<DB>>> = HashMap::new();
263
264    let prestate = Arc::new(RwLock::new(DataFetcher::<TxHash, GethStateUpdate>::new()));
265
266    //let mut affecting_tx: HashMap<TxHash, bool> = HashMap::new();
267    //let mut cur_base_fee: u128 = 0;
268    let mut cur_next_base_fee: u64 = 0;
269    let mut cur_block_number: Option<alloy_primitives::BlockNumber> = None;
270    let mut cur_block_time: Option<u64> = None;
271    let mut cur_state_override: StateOverride = StateOverride::default();
272
273    loop {
274        tokio::select! {
275            msg = market_events_rx.recv() => {
276                if let Ok(msg) = msg {
277                    let market_event_msg : MarketEvents = msg;
278                    if let MarketEvents::BlockHeaderUpdate{block_number, block_hash,  base_fee, next_base_fee, timestamp} =  market_event_msg {
279                        debug!("Block header update {} {} base_fee {} ", block_number, block_hash, base_fee);
280                        cur_block_number = Some( block_number + 1);
281                        cur_block_time = Some(timestamp + 12 );
282                        cur_next_base_fee = next_base_fee;
283                        //cur_base_fee = base_fee;
284                        *prestate.write().await = DataFetcher::<TxHash, GethStateUpdate>::new();
285                        swap_paths = HashMap::new();
286
287                        let new_block_hash = block_hash;
288
289                        for _counter in 0..5  {
290                            if let Ok(MarketEvents::BlockStateUpdate{block_hash}) = market_events_rx.recv().await {
291                                if new_block_hash == block_hash {
292                                    cur_state_override = latest_block.read().await.node_state_override();
293                                    debug!("Block state update received {} {}", block_number, block_hash);
294                                    break;
295                                }
296                            }
297                        }
298                    }
299                }
300            }
301
302
303            msg = compose_channel_rx.recv() => {
304                let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
305                match msg {
306                    Ok(compose_request)=>{
307                        if let SwapComposeMessage::Ready(sign_request) = compose_request.inner() {
308
309                            if sign_request.tx_compose.stuffing_txs_hashes.len() == 1 {
310                                if let Swap::BackrunSwapLine( _swap_line ) = &sign_request.swap {
311                                    let stuffing_tx_hash = sign_request.first_stuffing_hash();
312
313                                    let requests_vec = get_merge_list(sign_request, &swap_paths);
314                                    if !requests_vec.is_empty() {
315
316                                        let mut stuffing_txs : Vec<Transaction> = vec![sign_request.tx_compose.stuffing_txs[0].clone()];
317                                        stuffing_txs.extend( requests_vec.iter().map(|r| r.tx_compose.stuffing_txs[0].clone() ).collect::<Vec<Transaction>>());
318                                        let client_clone = client.clone();
319                                        let prestate_clone = prestate.clone();
320
321                                        let call_opts : GethDebugTracingCallOptions = GethDebugTracingCallOptions{
322                                            block_overrides : Some(BlockOverrides {
323                                                number : Some( U256::from(cur_block_number.unwrap_or_default())),
324                                                time : Some(cur_block_time.unwrap_or_default()),
325                                                coinbase : Some(*COINBASE),
326                                                base_fee : Some(U256::from(cur_next_base_fee)),
327                                                ..Default::default()
328                                            }),
329                                            state_overrides : Some(cur_state_override.clone()),
330                                            ..TRACING_CALL_OPTS.clone()
331                                        };
332
333                                        tokio::task::spawn(
334                                            same_path_merger_task(
335                                                client_clone,
336                                                stuffing_txs,
337                                                prestate_clone,
338                                                market_state.clone(),
339                                                call_opts,
340                                                sign_request.clone(),
341                                                compose_channel_tx.clone()
342                                            )
343                                        );
344                                    }
345
346                                    let e = swap_paths.entry(stuffing_tx_hash).or_default();
347                                    e.push( sign_request.clone() );
348
349                                }
350                            }
351                        }
352                    },
353                    Err(e)=>{
354                        error!("{e}")
355                    }
356                }
357            }
358        }
359    }
360}
361
362#[derive(Consumer, Producer, Accessor)]
363pub struct SamePathMergerActor<P, N, DB: Send + Sync + Clone + 'static> {
364    client: P,
365    //encoder: SwapStepEncoder,
366    #[accessor]
367    market_state: Option<SharedState<MarketState<DB>>>,
368    #[accessor]
369    latest_block: Option<SharedState<LatestBlock>>,
370    #[consumer]
371    market_events: Option<Broadcaster<MarketEvents>>,
372    #[consumer]
373    compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
374    #[producer]
375    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
376    _n: PhantomData<N>,
377}
378
379impl<P, N, DB> SamePathMergerActor<P, N, DB>
380where
381    N: Network,
382    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
383    DB: DatabaseRef<Error = KabuDBError> + DatabaseRef<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
384{
385    pub fn new(client: P) -> Self {
386        Self {
387            client,
388            market_state: None,
389            latest_block: None,
390            market_events: None,
391            compose_channel_rx: None,
392            compose_channel_tx: None,
393            _n: PhantomData,
394        }
395    }
396
397    pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>, strategy: &Strategy<DB>) -> Self {
398        Self {
399            market_state: Some(state.market_state_commit()),
400            latest_block: Some(bc.latest_block()),
401            market_events: Some(bc.market_events_channel()),
402            compose_channel_tx: Some(strategy.swap_compose_channel()),
403            compose_channel_rx: Some(strategy.swap_compose_channel()),
404            ..self
405        }
406    }
407}
408
409impl<P, N, DB> Actor for SamePathMergerActor<P, N, DB>
410where
411    N: Network<TransactionRequest = TransactionRequest>,
412    P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
413    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
414{
415    fn start(&self) -> ActorResult {
416        let task = tokio::task::spawn(same_path_merger_worker(
417            self.client.clone(),
418            self.latest_block.clone().unwrap(),
419            self.market_state.clone().unwrap(),
420            self.market_events.clone().unwrap(),
421            self.compose_channel_rx.clone().unwrap(),
422            self.compose_channel_tx.clone().unwrap(),
423        ));
424        Ok(vec![task])
425    }
426
427    fn name(&self) -> &'static str {
428        "SamePathMergerActor"
429    }
430}