1use alloy_primitives::{Address, U256};
2use alloy_rpc_types::Header;
3use eyre::{eyre, Result};
4use revm::{Database, DatabaseCommit, DatabaseRef};
5use tokio::sync::broadcast::error::RecvError;
6use tracing::{debug, error, info};
7
8use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
9use kabu_core_actors_macros::{Accessor, Consumer, Producer};
10use kabu_core_blockchain::{Blockchain, Strategy};
11use kabu_evm_db::KabuDBError;
12use kabu_types_entities::{LatestBlock, Swap, SwapStep};
13use kabu_types_events::{MarketEvents, MessageSwapCompose, SwapComposeData, SwapComposeMessage};
14use revm::context::BlockEnv;
15
16async fn arb_swap_steps_optimizer_task<
17 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone,
18>(
19 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
20 _state_db: DB,
21 header: Header,
22 request: SwapComposeData<DB>,
23) -> Result<()> {
24 debug!("Step Simulation started");
25
26 if let Swap::BackrunSwapSteps((sp0, sp1)) = request.swap {
27 let start_time = chrono::Local::now();
28
29 let _block_env =
30 BlockEnv { number: U256::from(header.number + 1), timestamp: U256::from(header.timestamp + 12), ..Default::default() };
31
32 match Ok::<(SwapStep, SwapStep), eyre::Error>((sp0.clone(), sp1.clone())) {
34 Ok((s0, s1)) => {
35 let encode_request = MessageSwapCompose::prepare(SwapComposeData {
36 origin: Some("merger_searcher".to_string()),
37 tips_pct: None,
38 swap: Swap::BackrunSwapSteps((s0, s1)),
39 ..request
40 });
41 compose_channel_tx.send(encode_request).map_err(|_| eyre!("CANNOT_SEND"))?;
42 }
43 Err(e) => {
44 error!("Optimization error:{}", e);
45 return Err(eyre!("OPTIMIZATION_ERROR"));
46 }
47 }
48 debug!("Step Optimization finished {} + {} {}", &sp0, &sp1, chrono::Local::now() - start_time);
49 } else {
50 error!("Incorrect swap_type");
51 return Err(eyre!("INCORRECT_SWAP_TYPE"));
52 }
53
54 Ok(())
55}
56
57async fn arb_swap_path_merger_worker<
58 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
59>(
60 multicaller_address: Address,
61 latest_block: SharedState<LatestBlock>,
62 market_events_rx: Broadcaster<MarketEvents>,
63 compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
64 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
65) -> WorkerResult {
66 subscribe!(market_events_rx);
67 subscribe!(compose_channel_rx);
68
69 let mut ready_requests: Vec<SwapComposeData<DB>> = Vec::new();
70
71 loop {
72 tokio::select! {
73 msg = market_events_rx.recv() => {
74 let msg : Result<MarketEvents, RecvError> = msg;
75 match msg {
76 Ok(event) => {
77 match event {
78 MarketEvents::BlockHeaderUpdate{..} =>{
79 debug!("Cleaning ready requests");
80 ready_requests = Vec::new();
81 }
82 MarketEvents::BlockStateUpdate{..}=>{
83 debug!("State updated");
84 }
86 _=>{}
87 }
88 }
89 Err(e)=>{error!("{}", e)}
90 }
91
92 },
93 msg = compose_channel_rx.recv() => {
94 let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
95 match msg {
96 Ok(swap) => {
97
98 let compose_data = match swap.inner() {
99 SwapComposeMessage::Ready(data) => data,
100 _=>continue,
101 };
102
103 let swap_path = match &compose_data.swap {
104 Swap::BackrunSwapLine(path) => path,
105 _=>continue,
106 };
107
108
109 info!("MessageSwapPathEncodeRequest received. stuffing: {:?} swap: {}", compose_data.tx_compose.stuffing_txs_hashes, compose_data.swap);
110
111 for req in ready_requests.iter() {
112
113 let req_swap = match &req.swap {
114 Swap::BackrunSwapLine(path)=>path,
115 _ => continue,
116 };
117
118 if !compose_data.same_stuffing(&req.tx_compose.stuffing_txs_hashes) {
120 continue
121 };
122
123
124 match SwapStep::merge_swap_paths( req_swap.clone(), swap_path.clone(), multicaller_address ){
125 Ok((sp0, sp1)) => {
126 let latest_block_guard = latest_block.read().await;
127 let block_header = latest_block_guard.block_header.clone().unwrap();
128 drop(latest_block_guard);
129
130 let request = SwapComposeData{
131 swap : Swap::BackrunSwapSteps((sp0,sp1)),
132 ..compose_data.clone()
133 };
134
135
136
137
138 if let Some(db) = compose_data.poststate.clone() {
139 let db_clone = db.clone();
140
141 let compose_channel_clone = compose_channel_tx.clone();
142 tokio::task::spawn( async move {
143 arb_swap_steps_optimizer_task(
144 compose_channel_clone,
145 db_clone,
146 block_header,
147 request
148 ).await
149 });
150 }
151 break; }
153 Err(e)=>{
154 error!("SwapPath merge error : {} {}", ready_requests.len(), e);
155 }
156 }
157 }
158 ready_requests.push(compose_data.clone());
159 ready_requests.sort_by(|r0,r1| r1.swap.arb_profit().cmp(&r0.swap.arb_profit()) )
160
161 }
162 Err(e)=>{error!("{}",e)}
163 }
164 }
165 }
166 }
167}
168
169#[derive(Consumer, Producer, Accessor)]
170pub struct ArbSwapPathMergerActor<DB: Send + Sync + Clone + 'static> {
171 multicaller_address: Address,
172 #[accessor]
173 latest_block: Option<SharedState<LatestBlock>>,
174 #[consumer]
175 market_events: Option<Broadcaster<MarketEvents>>,
176 #[consumer]
177 compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
178 #[producer]
179 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
180}
181
182impl<DB> ArbSwapPathMergerActor<DB>
183where
184 DB: DatabaseRef + Send + Sync + Clone + 'static,
185{
186 pub fn new(multicaller_address: Address) -> ArbSwapPathMergerActor<DB> {
187 ArbSwapPathMergerActor {
188 multicaller_address,
189 latest_block: None,
190 market_events: None,
191 compose_channel_rx: None,
192 compose_channel_tx: None,
193 }
194 }
195 pub fn on_bc(self, bc: &Blockchain, strategy: &Strategy<DB>) -> Self {
196 Self {
197 latest_block: Some(bc.latest_block()),
198 market_events: Some(bc.market_events_channel()),
199 compose_channel_tx: Some(strategy.swap_compose_channel()),
200 compose_channel_rx: Some(strategy.swap_compose_channel()),
201 ..self
202 }
203 }
204}
205
206impl<DB> Actor for ArbSwapPathMergerActor<DB>
207where
208 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static,
209{
210 fn start(&self) -> ActorResult {
211 let task = tokio::task::spawn(arb_swap_path_merger_worker(
212 self.multicaller_address,
213 self.latest_block.clone().unwrap(),
214 self.market_events.clone().unwrap(),
215 self.compose_channel_rx.clone().unwrap(),
216 self.compose_channel_tx.clone().unwrap(),
217 ));
218 Ok(vec![task])
219 }
220
221 fn name(&self) -> &'static str {
222 "ArbSwapPathMergerActor"
223 }
224}
225
226#[cfg(test)]
227mod test {
228 use alloy_primitives::{Address, U256};
229 use kabu_evm_db::KabuDB;
230 use kabu_types_entities::{Swap, SwapAmountType, SwapLine, SwapPath, Token};
231 use kabu_types_events::SwapComposeData;
232 use std::sync::Arc;
233
234 #[test]
235 pub fn test_sort() {
236 let mut ready_requests: Vec<SwapComposeData<KabuDB>> = Vec::new();
237 let token = Arc::new(Token::new(Address::random()));
238
239 let sp0 = SwapLine {
240 path: SwapPath {
241 tokens: vec![token.clone(), token.clone()],
242 pools: vec![],
243 disabled: false,
244 disabled_pool: Default::default(),
245 score: None,
246 },
247 amount_in: SwapAmountType::Set(U256::from(1)),
248 amount_out: SwapAmountType::Set(U256::from(2)),
249 ..Default::default()
250 };
251 ready_requests.push(SwapComposeData { swap: Swap::BackrunSwapLine(sp0), ..SwapComposeData::default() });
252
253 let sp1 = SwapLine {
254 path: SwapPath {
255 tokens: vec![token.clone(), token.clone()],
256 pools: vec![],
257 disabled: false,
258 disabled_pool: Default::default(),
259 score: None,
260 },
261 amount_in: SwapAmountType::Set(U256::from(10)),
262 amount_out: SwapAmountType::Set(U256::from(20)),
263 ..Default::default()
264 };
265 ready_requests.push(SwapComposeData { swap: Swap::BackrunSwapLine(sp1), ..SwapComposeData::default() });
266
267 let sp2 = SwapLine {
268 path: SwapPath {
269 tokens: vec![token.clone(), token.clone()],
270 pools: vec![],
271 disabled: false,
272 disabled_pool: Default::default(),
273 score: None,
274 },
275 amount_in: SwapAmountType::Set(U256::from(3)),
276 amount_out: SwapAmountType::Set(U256::from(5)),
277 ..Default::default()
278 };
279 ready_requests.push(SwapComposeData { swap: Swap::BackrunSwapLine(sp2), ..SwapComposeData::default() });
280
281 ready_requests.sort_by(|a, b| a.swap.arb_profit().cmp(&b.swap.arb_profit()));
282
283 assert_eq!(ready_requests[0].swap.arb_profit(), U256::from(1));
284 assert_eq!(ready_requests[1].swap.arb_profit(), U256::from(2));
285 assert_eq!(ready_requests[2].swap.arb_profit(), U256::from(10));
286 }
287}