kabu_strategy_merger/
swappath_merger_actor.rs

1use alloy_primitives::{Address, U256};
2use alloy_rpc_types::Header;
3use eyre::{eyre, Result};
4use revm::{Database, DatabaseCommit, DatabaseRef};
5use tokio::sync::broadcast::error::RecvError;
6use tracing::{debug, error, info};
7
8use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
9use kabu_core_actors_macros::{Accessor, Consumer, Producer};
10use kabu_core_blockchain::{Blockchain, Strategy};
11use kabu_evm_db::KabuDBError;
12use kabu_types_entities::{LatestBlock, Swap, SwapStep};
13use kabu_types_events::{MarketEvents, MessageSwapCompose, SwapComposeData, SwapComposeMessage};
14use revm::context::BlockEnv;
15
16async fn arb_swap_steps_optimizer_task<
17    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone,
18>(
19    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
20    _state_db: DB,
21    header: Header,
22    request: SwapComposeData<DB>,
23) -> Result<()> {
24    debug!("Step Simulation started");
25
26    if let Swap::BackrunSwapSteps((sp0, sp1)) = request.swap {
27        let start_time = chrono::Local::now();
28
29        let _block_env =
30            BlockEnv { number: U256::from(header.number + 1), timestamp: U256::from(header.timestamp + 12), ..Default::default() };
31
32        // TODO: Update optimize_swap_steps to work with KabuEVMWrapper
33        match Ok::<(SwapStep, SwapStep), eyre::Error>((sp0.clone(), sp1.clone())) {
34            Ok((s0, s1)) => {
35                let encode_request = MessageSwapCompose::prepare(SwapComposeData {
36                    origin: Some("merger_searcher".to_string()),
37                    tips_pct: None,
38                    swap: Swap::BackrunSwapSteps((s0, s1)),
39                    ..request
40                });
41                compose_channel_tx.send(encode_request).map_err(|_| eyre!("CANNOT_SEND"))?;
42            }
43            Err(e) => {
44                error!("Optimization error:{}", e);
45                return Err(eyre!("OPTIMIZATION_ERROR"));
46            }
47        }
48        debug!("Step Optimization finished {} + {} {}", &sp0, &sp1, chrono::Local::now() - start_time);
49    } else {
50        error!("Incorrect swap_type");
51        return Err(eyre!("INCORRECT_SWAP_TYPE"));
52    }
53
54    Ok(())
55}
56
57async fn arb_swap_path_merger_worker<
58    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
59>(
60    multicaller_address: Address,
61    latest_block: SharedState<LatestBlock>,
62    market_events_rx: Broadcaster<MarketEvents>,
63    compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
64    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
65) -> WorkerResult {
66    subscribe!(market_events_rx);
67    subscribe!(compose_channel_rx);
68
69    let mut ready_requests: Vec<SwapComposeData<DB>> = Vec::new();
70
71    loop {
72        tokio::select! {
73            msg = market_events_rx.recv() => {
74                let msg : Result<MarketEvents, RecvError> = msg;
75                match msg {
76                    Ok(event) => {
77                        match event {
78                            MarketEvents::BlockHeaderUpdate{..} =>{
79                                debug!("Cleaning ready requests");
80                                ready_requests = Vec::new();
81                            }
82                            MarketEvents::BlockStateUpdate{..}=>{
83                                debug!("State updated");
84                                //state_db = market_state.read().await.state_db.clone();
85                            }
86                            _=>{}
87                        }
88                    }
89                    Err(e)=>{error!("{}", e)}
90                }
91
92            },
93            msg = compose_channel_rx.recv() => {
94                let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
95                match msg {
96                    Ok(swap) => {
97
98                        let compose_data = match swap.inner() {
99                            SwapComposeMessage::Ready(data) => data,
100                            _=>continue,
101                        };
102
103                        let swap_path = match &compose_data.swap {
104                            Swap::BackrunSwapLine(path) => path,
105                            _=>continue,
106                        };
107
108
109                        info!("MessageSwapPathEncodeRequest received. stuffing: {:?} swap: {}", compose_data.tx_compose.stuffing_txs_hashes, compose_data.swap);
110
111                        for req in ready_requests.iter() {
112
113                            let req_swap = match &req.swap {
114                                Swap::BackrunSwapLine(path)=>path,
115                                _ => continue,
116                            };
117
118                            // todo!() mega bundle merge
119                            if !compose_data.same_stuffing(&req.tx_compose.stuffing_txs_hashes) {
120                                continue
121                            };
122
123
124                            match SwapStep::merge_swap_paths( req_swap.clone(), swap_path.clone(), multicaller_address ){
125                                Ok((sp0, sp1)) => {
126                                    let latest_block_guard = latest_block.read().await;
127                                    let block_header = latest_block_guard.block_header.clone().unwrap();
128                                    drop(latest_block_guard);
129
130                                    let request = SwapComposeData{
131                                        swap : Swap::BackrunSwapSteps((sp0,sp1)),
132                                        ..compose_data.clone()
133                                    };
134
135
136
137
138                                    if let Some(db) = compose_data.poststate.clone() {
139                                        let db_clone = db.clone();
140
141                                        let compose_channel_clone = compose_channel_tx.clone();
142                                        tokio::task::spawn( async move {
143                                                arb_swap_steps_optimizer_task(
144                                                    compose_channel_clone,
145                                                    db_clone,
146                                                    block_header,
147                                                    request
148                                                ).await
149                                        });
150                                    }
151                                    break; // only first
152                                }
153                                Err(e)=>{
154                                    error!("SwapPath merge error : {} {}", ready_requests.len(), e);
155                                }
156                            }
157                        }
158                        ready_requests.push(compose_data.clone());
159                        ready_requests.sort_by(|r0,r1| r1.swap.arb_profit().cmp(&r0.swap.arb_profit())  )
160
161                    }
162                    Err(e)=>{error!("{}",e)}
163                }
164            }
165        }
166    }
167}
168
169#[derive(Consumer, Producer, Accessor)]
170pub struct ArbSwapPathMergerActor<DB: Send + Sync + Clone + 'static> {
171    multicaller_address: Address,
172    #[accessor]
173    latest_block: Option<SharedState<LatestBlock>>,
174    #[consumer]
175    market_events: Option<Broadcaster<MarketEvents>>,
176    #[consumer]
177    compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
178    #[producer]
179    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
180}
181
182impl<DB> ArbSwapPathMergerActor<DB>
183where
184    DB: DatabaseRef + Send + Sync + Clone + 'static,
185{
186    pub fn new(multicaller_address: Address) -> ArbSwapPathMergerActor<DB> {
187        ArbSwapPathMergerActor {
188            multicaller_address,
189            latest_block: None,
190            market_events: None,
191            compose_channel_rx: None,
192            compose_channel_tx: None,
193        }
194    }
195    pub fn on_bc(self, bc: &Blockchain, strategy: &Strategy<DB>) -> Self {
196        Self {
197            latest_block: Some(bc.latest_block()),
198            market_events: Some(bc.market_events_channel()),
199            compose_channel_tx: Some(strategy.swap_compose_channel()),
200            compose_channel_rx: Some(strategy.swap_compose_channel()),
201            ..self
202        }
203    }
204}
205
206impl<DB> Actor for ArbSwapPathMergerActor<DB>
207where
208    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
209{
210    fn start(&self) -> ActorResult {
211        let task = tokio::task::spawn(arb_swap_path_merger_worker(
212            self.multicaller_address,
213            self.latest_block.clone().unwrap(),
214            self.market_events.clone().unwrap(),
215            self.compose_channel_rx.clone().unwrap(),
216            self.compose_channel_tx.clone().unwrap(),
217        ));
218        Ok(vec![task])
219    }
220
221    fn name(&self) -> &'static str {
222        "ArbSwapPathMergerActor"
223    }
224}
225
226#[cfg(test)]
227mod test {
228    use alloy_primitives::{Address, U256};
229    use kabu_evm_db::KabuDB;
230    use kabu_types_entities::{Swap, SwapAmountType, SwapLine, SwapPath, Token};
231    use kabu_types_events::SwapComposeData;
232    use std::sync::Arc;
233
234    #[test]
235    pub fn test_sort() {
236        let mut ready_requests: Vec<SwapComposeData<KabuDB>> = Vec::new();
237        let token = Arc::new(Token::new(Address::random()));
238
239        let sp0 = SwapLine {
240            path: SwapPath {
241                tokens: vec![token.clone(), token.clone()],
242                pools: vec![],
243                disabled: false,
244                disabled_pool: Default::default(),
245                score: None,
246            },
247            amount_in: SwapAmountType::Set(U256::from(1)),
248            amount_out: SwapAmountType::Set(U256::from(2)),
249            ..Default::default()
250        };
251        ready_requests.push(SwapComposeData { swap: Swap::BackrunSwapLine(sp0), ..SwapComposeData::default() });
252
253        let sp1 = SwapLine {
254            path: SwapPath {
255                tokens: vec![token.clone(), token.clone()],
256                pools: vec![],
257                disabled: false,
258                disabled_pool: Default::default(),
259                score: None,
260            },
261            amount_in: SwapAmountType::Set(U256::from(10)),
262            amount_out: SwapAmountType::Set(U256::from(20)),
263            ..Default::default()
264        };
265        ready_requests.push(SwapComposeData { swap: Swap::BackrunSwapLine(sp1), ..SwapComposeData::default() });
266
267        let sp2 = SwapLine {
268            path: SwapPath {
269                tokens: vec![token.clone(), token.clone()],
270                pools: vec![],
271                disabled: false,
272                disabled_pool: Default::default(),
273                score: None,
274            },
275            amount_in: SwapAmountType::Set(U256::from(3)),
276            amount_out: SwapAmountType::Set(U256::from(5)),
277            ..Default::default()
278        };
279        ready_requests.push(SwapComposeData { swap: Swap::BackrunSwapLine(sp2), ..SwapComposeData::default() });
280
281        ready_requests.sort_by(|a, b| a.swap.arb_profit().cmp(&b.swap.arb_profit()));
282
283        assert_eq!(ready_requests[0].swap.arb_profit(), U256::from(1));
284        assert_eq!(ready_requests[1].swap.arb_profit(), U256::from(2));
285        assert_eq!(ready_requests[2].swap.arb_profit(), U256::from(10));
286    }
287}