kabu_execution_estimator/
evm.rs

1use alloy_consensus::TxEnvelope;
2use alloy_eips::eip2718::Encodable2718;
3use alloy_eips::BlockNumberOrTag;
4use alloy_evm::EvmEnv;
5use alloy_network::{Ethereum, Network};
6use alloy_primitives::{Bytes, TxKind, U256};
7use alloy_provider::Provider;
8use alloy_rpc_types::{TransactionInput, TransactionRequest};
9use eyre::{eyre, Result};
10use influxdb::{Timestamp, WriteQuery};
11use std::marker::PhantomData;
12use tokio::sync::broadcast::error::RecvError;
13use tracing::{debug, error, info, trace};
14
15use kabu_core_blockchain::{Blockchain, Strategy};
16use kabu_evm_utils::{evm_access_list, NWETH};
17use kabu_types_entities::{EstimationError, Swap, SwapEncoder};
18
19use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
20use kabu_core_actors_macros::{Consumer, Producer};
21use kabu_evm_db::{AlloyDB, DatabaseKabuExt, KabuDBError};
22use kabu_evm_utils::evm_env::tx_req_to_env;
23use kabu_types_events::{HealthEvent, MessageHealthEvent, MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData, TxState};
24use revm::context::BlockEnv;
25use revm::{Database, DatabaseCommit, DatabaseRef};
26
27async fn estimator_task<N, DB>(
28    client: Option<impl Provider<N> + 'static>,
29    swap_encoder: impl SwapEncoder,
30    estimate_request: SwapComposeData<DB>,
31    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
32    health_monitor_channel_tx: Option<Broadcaster<MessageHealthEvent>>,
33    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
34) -> Result<()>
35where
36    N: Network,
37    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone + 'static,
38{
39    debug!(
40        gas_limit = estimate_request.tx_compose.gas,
41        base_fee = NWETH::to_float_gwei(estimate_request.tx_compose.next_block_base_fee as u128),
42        gas_cost = NWETH::to_float_wei(estimate_request.gas_cost()),
43        stuffing_txs_len = estimate_request.tx_compose.stuffing_txs_hashes.len(),
44        "EVM estimation",
45    );
46
47    let start_time = chrono::Utc::now();
48
49    let tx_signer = estimate_request.tx_compose.signer.clone().ok_or(eyre!("NO_SIGNER"))?;
50    let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
51
52    let (to, call_value, call_data, _) = swap_encoder.encode(
53        estimate_request.swap.clone(),
54        estimate_request.tips_pct,
55        Some(estimate_request.tx_compose.next_block_number),
56        None,
57        Some(tx_signer.address()),
58        Some(estimate_request.tx_compose.eth_balance),
59    )?;
60
61    let tx_request = TransactionRequest {
62        transaction_type: Some(2),
63        chain_id: Some(1),
64        from: Some(tx_signer.address()),
65        to: Some(TxKind::Call(to)),
66        gas: Some(estimate_request.tx_compose.gas),
67        value: call_value,
68        input: TransactionInput::new(call_data.clone()),
69        nonce: Some(estimate_request.tx_compose.nonce),
70        max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
71        max_fee_per_gas: Some(
72            estimate_request.tx_compose.next_block_base_fee as u128 + estimate_request.tx_compose.priority_gas_fee as u128,
73        ),
74        ..TransactionRequest::default()
75    };
76
77    let Some(mut db) = estimate_request.poststate else {
78        error!("StateDB is None");
79        return Err(eyre!("STATE_DB_IS_NONE"));
80    };
81
82    if let Some(client) = client {
83        let ext_db = AlloyDB::new(client, BlockNumberOrTag::Latest.into());
84        if let Some(ext_db) = ext_db {
85            db.with_ext_db(ext_db)
86        } else {
87            error!("AlloyDB is None");
88        }
89    }
90
91    let evm_env = EvmEnv {
92        block_env: BlockEnv {
93            timestamp: U256::from(estimate_request.tx_compose.next_block_timestamp),
94            number: U256::from(estimate_request.tx_compose.next_block_number),
95            ..Default::default()
96        },
97        ..Default::default()
98    };
99
100    let tx_env = tx_req_to_env(tx_request);
101
102    let (gas_used, access_list) = match evm_access_list(&db, &evm_env, tx_env) {
103        Ok((gas_used, access_list)) => {
104            let pool_id_vec = estimate_request.swap.get_pool_id_vec();
105
106            tokio::task::spawn(async move {
107                for pool_id in pool_id_vec {
108                    let pool_id_string = format!("{pool_id}");
109                    let write_query = WriteQuery::new(Timestamp::from(start_time), "estimation")
110                        .add_field("success", 1i64)
111                        .add_tag("pool", pool_id_string);
112
113                    if let Some(influxdb_write_channel_tx) = &influxdb_write_channel_tx {
114                        if let Err(e) = influxdb_write_channel_tx.send(write_query) {
115                            error!("Failed to send successful estimation latency to influxdb: {:?}", e);
116                        }
117                    }
118                }
119            });
120
121            (gas_used, access_list)
122        }
123        Err(e) => {
124            trace!(
125                "evm_access_list error for block_number={}, block_timestamp={}, swap={}, err={e}",
126                estimate_request.tx_compose.next_block_number,
127                estimate_request.tx_compose.next_block_timestamp,
128                estimate_request.swap
129            );
130            // simulation has failed but this could be caused by a token / pool with unsupported fee issue
131            trace!("evm_access_list error calldata : {} {}", to, call_data);
132
133            if let Some(health_monitor_channel_tx) = &health_monitor_channel_tx {
134                if let Swap::BackrunSwapLine(swap_line) = estimate_request.swap {
135                    if let Err(e) =
136                        health_monitor_channel_tx.send(MessageHealthEvent::new(HealthEvent::SwapLineEstimationError(EstimationError {
137                            swap_path: swap_line.path,
138                            msg: e.to_string(),
139                        })))
140                    {
141                        error!("Failed to send message to health monitor channel: {:?}", e);
142                    }
143                }
144            }
145
146            return Ok(());
147        }
148    };
149    let swap = estimate_request.swap.clone();
150
151    if gas_used < 60_000 {
152        error!(gas_used, %swap, "Incorrect transaction estimation");
153        return Err(eyre!("TRANSACTION_ESTIMATED_INCORRECTLY"));
154    }
155
156    let gas_cost = U256::from(gas_used as u128 * gas_price as u128);
157
158    debug!(
159        "Swap encode swap={}, tips_pct={:?}, next_block_number={}, gas_cost={}, signer={}",
160        estimate_request.swap,
161        estimate_request.tips_pct,
162        estimate_request.tx_compose.next_block_number,
163        gas_cost,
164        tx_signer.address()
165    );
166
167    let (to, call_value, call_data, tips_vec) = match swap_encoder.encode(
168        estimate_request.swap.clone(),
169        estimate_request.tips_pct,
170        Some(estimate_request.tx_compose.next_block_number),
171        Some(gas_cost),
172        Some(tx_signer.address()),
173        Some(estimate_request.tx_compose.eth_balance),
174    ) {
175        Ok((to, call_value, call_data, tips_vec)) => (to, call_value, call_data, tips_vec),
176        Err(error) => {
177            error!(%error, %swap, "swap_encoder.encode");
178            return Err(error);
179        }
180    };
181
182    let tx_request = TransactionRequest {
183        transaction_type: Some(2),
184        chain_id: Some(1),
185        from: Some(tx_signer.address()),
186        to: Some(TxKind::Call(to)),
187        gas: Some((gas_used * 1500) / 1000),
188        value: call_value,
189        input: TransactionInput::new(call_data),
190        nonce: Some(estimate_request.tx_compose.nonce),
191        access_list: Some(access_list),
192        max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
193        max_fee_per_gas: Some(
194            estimate_request.tx_compose.priority_gas_fee as u128 + estimate_request.tx_compose.next_block_base_fee as u128,
195        ),
196        ..TransactionRequest::default()
197    };
198
199    let encoded_txes: Vec<TxEnvelope> =
200        estimate_request.tx_compose.stuffing_txs.iter().map(|item| TxEnvelope::from(item.clone())).collect();
201
202    let stuffing_txs_rlp: Vec<Bytes> = encoded_txes.into_iter().map(|x| Bytes::from(x.encoded_2718())).collect();
203
204    let mut tx_with_state: Vec<TxState> = stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();
205
206    tx_with_state.push(TxState::SignatureRequired(tx_request));
207
208    let total_tips = tips_vec.into_iter().map(|v| v.tips).sum();
209    let profit_eth = estimate_request.swap.arb_profit_eth();
210    let gas_cost_f64 = NWETH::to_float(gas_cost);
211    let tips_f64 = NWETH::to_float(total_tips);
212    let profit_eth_f64 = NWETH::to_float(profit_eth);
213    let profit_f64 = match estimate_request.swap.get_first_token() {
214        Some(token_in) => token_in.to_float(estimate_request.swap.arb_profit()),
215        None => profit_eth_f64,
216    };
217
218    let sign_request = MessageSwapCompose::ready(SwapComposeData {
219        tx_compose: TxComposeData { tx_bundle: Some(tx_with_state), ..estimate_request.tx_compose },
220        poststate: Some(db),
221        tips: Some(total_tips + gas_cost),
222        ..estimate_request
223    });
224
225    let result = match compose_channel_tx.send(sign_request) {
226        Err(error) => {
227            error!(%error, "compose_channel_tx.send");
228            Err(eyre!("COMPOSE_CHANNEL_SEND_ERROR"))
229        }
230        _ => Ok(()),
231    };
232
233    let sim_duration = chrono::Utc::now() - start_time;
234
235    info!(
236        cost=gas_cost_f64,
237        profit=profit_f64,
238        tips=tips_f64,
239        gas_used,
240        %swap,
241        duration=sim_duration.num_microseconds().unwrap_or_default(),
242        " +++ Simulation successful",
243    );
244
245    result
246}
247
248async fn estimator_worker<N, DB>(
249    client: Option<impl Provider<N> + Clone + 'static>,
250    encoder: impl SwapEncoder + Send + Sync + Clone + 'static,
251    compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
252    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
253    health_monitor_channel_tx: Option<Broadcaster<MessageHealthEvent>>,
254    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
255) -> WorkerResult
256where
257    N: Network,
258    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone + 'static,
259{
260    subscribe!(compose_channel_rx);
261
262    loop {
263        tokio::select! {
264            msg = compose_channel_rx.recv() => {
265                let compose_request_msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
266                match compose_request_msg {
267                    Ok(compose_request) =>{
268                        if let SwapComposeMessage::Estimate(estimate_request) = compose_request.inner {
269                            let compose_channel_tx_cloned = compose_channel_tx.clone();
270                            let encoder_cloned = encoder.clone();
271                            let client_cloned = client.clone();
272                            let influxdb_channel_tx_cloned = influxdb_write_channel_tx.clone();
273                            let health_monitor_channel_tx_cloned = health_monitor_channel_tx.clone();
274                            tokio::task::spawn(
275                                async move {
276                                if let Err(e) = estimator_task(
277                                        client_cloned,
278                                        encoder_cloned,
279                                        estimate_request.clone(),
280                                        compose_channel_tx_cloned,
281                                        health_monitor_channel_tx_cloned,
282                                        influxdb_channel_tx_cloned,
283                                ).await {
284                                        error!("Error in EVM estimator_task: {:?}", e);
285                                    }
286                                }
287                            );
288                        }
289                    }
290                    Err(e)=>{error!("{e}")}
291                }
292            }
293        }
294    }
295}
296
297#[derive(Consumer, Producer)]
298pub struct EvmEstimatorActor<P, N, E, DB: Clone + Send + Sync + 'static> {
299    encoder: E,
300    client: Option<P>,
301    #[consumer]
302    compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
303    #[producer]
304    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
305    #[producer]
306    health_monitor_channel_tx: Option<Broadcaster<MessageHealthEvent>>,
307    #[producer]
308    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
309    _n: PhantomData<N>,
310}
311
312impl<P, N, E, DB> EvmEstimatorActor<P, N, E, DB>
313where
314    N: Network,
315    P: Provider<Ethereum>,
316    E: SwapEncoder + Send + Sync + Clone + 'static,
317    DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + 'static,
318{
319    pub fn new(encoder: E) -> Self {
320        Self {
321            encoder,
322            client: None,
323            compose_channel_tx: None,
324            compose_channel_rx: None,
325            health_monitor_channel_tx: None,
326            influxdb_write_channel_tx: None,
327            _n: PhantomData::<N>,
328        }
329    }
330
331    pub fn new_with_provider(encoder: E, client: Option<P>) -> Self {
332        Self {
333            encoder,
334            client,
335            compose_channel_tx: None,
336            compose_channel_rx: None,
337            health_monitor_channel_tx: None,
338            influxdb_write_channel_tx: None,
339            _n: PhantomData::<N>,
340        }
341    }
342
343    pub fn on_bc(self, bc: &Blockchain, strategy: &Strategy<DB>) -> Self {
344        Self {
345            compose_channel_tx: Some(strategy.swap_compose_channel()),
346            compose_channel_rx: Some(strategy.swap_compose_channel()),
347            health_monitor_channel_tx: Some(bc.health_monitor_channel()),
348            influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
349            ..self
350        }
351    }
352}
353
354impl<P, N, E, DB> Actor for EvmEstimatorActor<P, N, E, DB>
355where
356    N: Network,
357    P: Provider<N> + Send + Sync + Clone + 'static,
358    E: SwapEncoder + Clone + Send + Sync + 'static,
359    DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone,
360{
361    fn start(&self) -> ActorResult {
362        let task = tokio::task::spawn(estimator_worker(
363            self.client.clone(),
364            self.encoder.clone(),
365            self.compose_channel_rx.clone().unwrap(),
366            self.compose_channel_tx.clone().unwrap(),
367            self.health_monitor_channel_tx.clone(),
368            self.influxdb_write_channel_tx.clone(),
369        ));
370        Ok(vec![task])
371    }
372    fn name(&self) -> &'static str {
373        "EvmEstimatorActor"
374    }
375}