kabu_strategy_merger/
diffpath_merger_actor.rs

1use alloy_network::TransactionResponse;
2use alloy_primitives::{Address, TxHash};
3use alloy_rpc_types::Transaction;
4use eyre::{OptionExt, Result};
5use lazy_static::lazy_static;
6use revm::{Database, DatabaseCommit, DatabaseRef};
7use tokio::sync::broadcast::error::RecvError;
8use tokio::sync::broadcast::Receiver;
9use tracing::{debug, error, info};
10
11use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
12use kabu_core_actors_macros::{Accessor, Consumer, Producer};
13use kabu_core_blockchain::{Blockchain, Strategy};
14use kabu_evm_utils::NWETH;
15use kabu_types_entities::{MarketState, Swap};
16use kabu_types_events::{MarketEvents, MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
17
18lazy_static! {
19    static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
20}
21
22fn get_merge_list<'a, DB: Clone + Send + Sync + 'static>(
23    request: &SwapComposeData<DB>,
24    swap_paths: &'a [SwapComposeData<DB>],
25) -> Vec<&'a SwapComposeData<DB>> {
26    let mut ret: Vec<&SwapComposeData<DB>> = Vec::new();
27    let mut pools = request.swap.get_pool_id_vec();
28    for p in swap_paths.iter() {
29        if !p.cross_pools(&pools) {
30            pools.extend(p.swap.get_pool_id_vec());
31            ret.push(p);
32        }
33    }
34    ret
35}
36
37async fn diff_path_merger_worker<DB>(
38    market_events_rx: Broadcaster<MarketEvents>,
39    compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
40    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
41) -> WorkerResult
42where
43    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
44{
45    let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe();
46
47    let mut compose_channel_rx: Receiver<MessageSwapCompose<DB>> = compose_channel_rx.subscribe();
48
49    let mut swap_paths: Vec<SwapComposeData<DB>> = Vec::new();
50
51    loop {
52        tokio::select! {
53            msg = market_events_rx.recv() => {
54                if let Ok(msg) = msg {
55                    let market_event_msg : MarketEvents = msg;
56                    if let MarketEvents::BlockHeaderUpdate{block_number, block_hash, timestamp, base_fee, next_base_fee} =  market_event_msg {
57                        debug!("Block header update {} {} ts {} base_fee {} next {} ", block_number, block_hash, timestamp, base_fee, next_base_fee);
58                        //cur_block_number = Some( block_number + 1);
59                        //cur_block_time = Some(timestamp + 12 );
60                        //cur_next_base_fee = next_base_fee;
61                        //cur_base_fee = base_fee;
62                        swap_paths = Vec::new();
63
64                        // for _counter in 0..5  {
65                        //     if let Ok(msg) = market_events_rx.recv().await {
66                        //         if matches!(msg, MarketEvents::BlockStateUpdate{ block_hash } ) {
67                        //             cur_state_override = latest_block.read().await.node_state_override();
68                        //             debug!("Block state update received {} {}", block_number, block_hash);
69                        //             break;
70                        //         }
71                        //     }
72                        // }
73                    }
74                }
75            }
76
77
78            msg = compose_channel_rx.recv() => {
79                let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
80                match msg {
81                    Ok(compose_request)=>{
82                        if let SwapComposeMessage::Ready(sign_request) = compose_request.inner() {
83                            if matches!( sign_request.swap, Swap::BackrunSwapLine(_)) || matches!( sign_request.swap, Swap::BackrunSwapSteps(_)) {
84                                let mut merge_list = get_merge_list(sign_request, &swap_paths);
85
86                                if !merge_list.is_empty() {
87                                    let swap_vec : Vec<Swap> = merge_list.iter().map(|x|x.swap.clone()).collect();
88                                    info!("Merging started {:?}", swap_vec );
89
90                                    let mut state = MarketState::new(sign_request.poststate.clone().unwrap().clone());
91
92                                    for dbs in merge_list.iter() {
93                                        state.apply_geth_update_vec( dbs.poststate_update.clone().ok_or_eyre("NO_STATE_UPDATE")?);
94                                    }
95
96                                    merge_list.push(sign_request);
97
98                                    let mut stuffing_txs_hashes : Vec<TxHash> = Vec::new();
99                                    let mut stuffing_txs : Vec<Transaction> = Vec::new();
100
101                                    for req in merge_list.iter() {
102                                        for tx in req.tx_compose.stuffing_txs.iter() {
103                                            if !stuffing_txs_hashes.contains(&tx.tx_hash()) {
104                                                stuffing_txs_hashes.push(tx.tx_hash());
105                                                stuffing_txs.push(tx.clone());
106                                            }
107                                        }
108                                    }
109
110                                    let encode_request = MessageSwapCompose::prepare(
111                                        SwapComposeData {
112                                            tx_compose : TxComposeData {
113                                                stuffing_txs_hashes,
114                                                stuffing_txs,
115                                                ..sign_request.tx_compose.clone()
116                                            },
117                                            swap : Swap::Multiple( merge_list.iter().map(|i| i.swap.clone()  ).collect()) ,
118                                            origin : Some("diffpath_merger".to_string()),
119                                            tips_pct : Some(9000),
120                                            poststate : Some(state.state_db),
121                                            ..sign_request.clone()
122                                        }
123                                    );
124                                    info!("+++ Calculation finished. Merge list : {} profit : {}",merge_list.len(), NWETH::to_float(encode_request.inner.swap.arb_profit_eth())  );
125
126                                    if let Err(e) = compose_channel_tx.send(encode_request) {
127                                       error!("{}",e)
128                                    }
129                                }
130
131                                swap_paths.push(sign_request.clone());
132                                swap_paths.sort_by(|a, b| b.swap.arb_profit_eth().cmp(&a.swap.arb_profit_eth() ) )
133                            }
134                        }
135                    }
136                    Err(e)=>{error!("{e}")}
137                }
138
139            }
140
141
142        }
143    }
144}
145
146#[derive(Consumer, Producer, Accessor, Default)]
147pub struct DiffPathMergerActor<DB: Clone + Send + Sync + 'static> {
148    #[consumer]
149    market_events: Option<Broadcaster<MarketEvents>>,
150    #[consumer]
151    compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
152    #[producer]
153    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
154}
155
156impl<DB> DiffPathMergerActor<DB>
157where
158    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + Default + 'static,
159{
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    pub fn on_bc(self, bc: &Blockchain) -> Self {
165        Self { market_events: Some(bc.market_events_channel()), ..self }
166    }
167
168    pub fn on_strategy(self, strategy: &Strategy<DB>) -> Self {
169        Self {
170            compose_channel_tx: Some(strategy.swap_compose_channel()),
171            compose_channel_rx: Some(strategy.swap_compose_channel()),
172            ..self
173        }
174    }
175}
176
177impl<DB> Actor for DiffPathMergerActor<DB>
178where
179    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + Default + 'static,
180{
181    fn start(&self) -> ActorResult {
182        let task = tokio::task::spawn(diff_path_merger_worker(
183            self.market_events.clone().unwrap(),
184            self.compose_channel_rx.clone().unwrap(),
185            self.compose_channel_tx.clone().unwrap(),
186        ));
187        Ok(vec![task])
188    }
189
190    fn name(&self) -> &'static str {
191        "DiffPathMergerActor"
192    }
193}