1use alloy_consensus::TxEnvelope;
2use alloy_eips::eip2718::Encodable2718;
3use alloy_eips::BlockNumberOrTag;
4use alloy_evm::EvmEnv;
5use alloy_network::{Ethereum, Network};
6use alloy_primitives::{Bytes, TxKind, U256};
7use alloy_provider::Provider;
8use alloy_rpc_types::{TransactionInput, TransactionRequest};
9use eyre::{eyre, Result};
10use influxdb::{Timestamp, WriteQuery};
11use std::marker::PhantomData;
12use tokio::sync::broadcast::error::RecvError;
13use tracing::{debug, error, info, trace};
14
15use kabu_core_blockchain::{Blockchain, Strategy};
16use kabu_evm_utils::{evm_access_list, NWETH};
17use kabu_types_entities::{EstimationError, Swap, SwapEncoder};
18
19use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
20use kabu_core_actors_macros::{Consumer, Producer};
21use kabu_evm_db::{AlloyDB, DatabaseKabuExt, KabuDBError};
22use kabu_evm_utils::evm_env::tx_req_to_env;
23use kabu_types_events::{HealthEvent, MessageHealthEvent, MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData, TxState};
24use revm::context::BlockEnv;
25use revm::{Database, DatabaseCommit, DatabaseRef};
26
27async fn estimator_task<N, DB>(
28 client: Option<impl Provider<N> + 'static>,
29 swap_encoder: impl SwapEncoder,
30 estimate_request: SwapComposeData<DB>,
31 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
32 health_monitor_channel_tx: Option<Broadcaster<MessageHealthEvent>>,
33 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
34) -> Result<()>
35where
36 N: Network,
37 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone + 'static,
38{
39 debug!(
40 gas_limit = estimate_request.tx_compose.gas,
41 base_fee = NWETH::to_float_gwei(estimate_request.tx_compose.next_block_base_fee as u128),
42 gas_cost = NWETH::to_float_wei(estimate_request.gas_cost()),
43 stuffing_txs_len = estimate_request.tx_compose.stuffing_txs_hashes.len(),
44 "EVM estimation",
45 );
46
47 let start_time = chrono::Utc::now();
48
49 let tx_signer = estimate_request.tx_compose.signer.clone().ok_or(eyre!("NO_SIGNER"))?;
50 let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
51
52 let (to, call_value, call_data, _) = swap_encoder.encode(
53 estimate_request.swap.clone(),
54 estimate_request.tips_pct,
55 Some(estimate_request.tx_compose.next_block_number),
56 None,
57 Some(tx_signer.address()),
58 Some(estimate_request.tx_compose.eth_balance),
59 )?;
60
61 let tx_request = TransactionRequest {
62 transaction_type: Some(2),
63 chain_id: Some(1),
64 from: Some(tx_signer.address()),
65 to: Some(TxKind::Call(to)),
66 gas: Some(estimate_request.tx_compose.gas),
67 value: call_value,
68 input: TransactionInput::new(call_data.clone()),
69 nonce: Some(estimate_request.tx_compose.nonce),
70 max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
71 max_fee_per_gas: Some(
72 estimate_request.tx_compose.next_block_base_fee as u128 + estimate_request.tx_compose.priority_gas_fee as u128,
73 ),
74 ..TransactionRequest::default()
75 };
76
77 let Some(mut db) = estimate_request.poststate else {
78 error!("StateDB is None");
79 return Err(eyre!("STATE_DB_IS_NONE"));
80 };
81
82 if let Some(client) = client {
83 let ext_db = AlloyDB::new(client, BlockNumberOrTag::Latest.into());
84 if let Some(ext_db) = ext_db {
85 db.with_ext_db(ext_db)
86 } else {
87 error!("AlloyDB is None");
88 }
89 }
90
91 let evm_env = EvmEnv {
92 block_env: BlockEnv {
93 timestamp: U256::from(estimate_request.tx_compose.next_block_timestamp),
94 number: U256::from(estimate_request.tx_compose.next_block_number),
95 ..Default::default()
96 },
97 ..Default::default()
98 };
99
100 let tx_env = tx_req_to_env(tx_request);
101
102 let (gas_used, access_list) = match evm_access_list(&db, &evm_env, tx_env) {
103 Ok((gas_used, access_list)) => {
104 let pool_id_vec = estimate_request.swap.get_pool_id_vec();
105
106 tokio::task::spawn(async move {
107 for pool_id in pool_id_vec {
108 let pool_id_string = format!("{pool_id}");
109 let write_query = WriteQuery::new(Timestamp::from(start_time), "estimation")
110 .add_field("success", 1i64)
111 .add_tag("pool", pool_id_string);
112
113 if let Some(influxdb_write_channel_tx) = &influxdb_write_channel_tx {
114 if let Err(e) = influxdb_write_channel_tx.send(write_query) {
115 error!("Failed to send successful estimation latency to influxdb: {:?}", e);
116 }
117 }
118 }
119 });
120
121 (gas_used, access_list)
122 }
123 Err(e) => {
124 trace!(
125 "evm_access_list error for block_number={}, block_timestamp={}, swap={}, err={e}",
126 estimate_request.tx_compose.next_block_number,
127 estimate_request.tx_compose.next_block_timestamp,
128 estimate_request.swap
129 );
130 trace!("evm_access_list error calldata : {} {}", to, call_data);
132
133 if let Some(health_monitor_channel_tx) = &health_monitor_channel_tx {
134 if let Swap::BackrunSwapLine(swap_line) = estimate_request.swap {
135 if let Err(e) =
136 health_monitor_channel_tx.send(MessageHealthEvent::new(HealthEvent::SwapLineEstimationError(EstimationError {
137 swap_path: swap_line.path,
138 msg: e.to_string(),
139 })))
140 {
141 error!("Failed to send message to health monitor channel: {:?}", e);
142 }
143 }
144 }
145
146 return Ok(());
147 }
148 };
149 let swap = estimate_request.swap.clone();
150
151 if gas_used < 60_000 {
152 error!(gas_used, %swap, "Incorrect transaction estimation");
153 return Err(eyre!("TRANSACTION_ESTIMATED_INCORRECTLY"));
154 }
155
156 let gas_cost = U256::from(gas_used as u128 * gas_price as u128);
157
158 debug!(
159 "Swap encode swap={}, tips_pct={:?}, next_block_number={}, gas_cost={}, signer={}",
160 estimate_request.swap,
161 estimate_request.tips_pct,
162 estimate_request.tx_compose.next_block_number,
163 gas_cost,
164 tx_signer.address()
165 );
166
167 let (to, call_value, call_data, tips_vec) = match swap_encoder.encode(
168 estimate_request.swap.clone(),
169 estimate_request.tips_pct,
170 Some(estimate_request.tx_compose.next_block_number),
171 Some(gas_cost),
172 Some(tx_signer.address()),
173 Some(estimate_request.tx_compose.eth_balance),
174 ) {
175 Ok((to, call_value, call_data, tips_vec)) => (to, call_value, call_data, tips_vec),
176 Err(error) => {
177 error!(%error, %swap, "swap_encoder.encode");
178 return Err(error);
179 }
180 };
181
182 let tx_request = TransactionRequest {
183 transaction_type: Some(2),
184 chain_id: Some(1),
185 from: Some(tx_signer.address()),
186 to: Some(TxKind::Call(to)),
187 gas: Some((gas_used * 1500) / 1000),
188 value: call_value,
189 input: TransactionInput::new(call_data),
190 nonce: Some(estimate_request.tx_compose.nonce),
191 access_list: Some(access_list),
192 max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
193 max_fee_per_gas: Some(
194 estimate_request.tx_compose.priority_gas_fee as u128 + estimate_request.tx_compose.next_block_base_fee as u128,
195 ),
196 ..TransactionRequest::default()
197 };
198
199 let encoded_txes: Vec<TxEnvelope> =
200 estimate_request.tx_compose.stuffing_txs.iter().map(|item| TxEnvelope::from(item.clone())).collect();
201
202 let stuffing_txs_rlp: Vec<Bytes> = encoded_txes.into_iter().map(|x| Bytes::from(x.encoded_2718())).collect();
203
204 let mut tx_with_state: Vec<TxState> = stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();
205
206 tx_with_state.push(TxState::SignatureRequired(tx_request));
207
208 let total_tips = tips_vec.into_iter().map(|v| v.tips).sum();
209 let profit_eth = estimate_request.swap.arb_profit_eth();
210 let gas_cost_f64 = NWETH::to_float(gas_cost);
211 let tips_f64 = NWETH::to_float(total_tips);
212 let profit_eth_f64 = NWETH::to_float(profit_eth);
213 let profit_f64 = match estimate_request.swap.get_first_token() {
214 Some(token_in) => token_in.to_float(estimate_request.swap.arb_profit()),
215 None => profit_eth_f64,
216 };
217
218 let sign_request = MessageSwapCompose::ready(SwapComposeData {
219 tx_compose: TxComposeData { tx_bundle: Some(tx_with_state), ..estimate_request.tx_compose },
220 poststate: Some(db),
221 tips: Some(total_tips + gas_cost),
222 ..estimate_request
223 });
224
225 let result = match compose_channel_tx.send(sign_request) {
226 Err(error) => {
227 error!(%error, "compose_channel_tx.send");
228 Err(eyre!("COMPOSE_CHANNEL_SEND_ERROR"))
229 }
230 _ => Ok(()),
231 };
232
233 let sim_duration = chrono::Utc::now() - start_time;
234
235 info!(
236 cost=gas_cost_f64,
237 profit=profit_f64,
238 tips=tips_f64,
239 gas_used,
240 %swap,
241 duration=sim_duration.num_microseconds().unwrap_or_default(),
242 " +++ Simulation successful",
243 );
244
245 result
246}
247
248async fn estimator_worker<N, DB>(
249 client: Option<impl Provider<N> + Clone + 'static>,
250 encoder: impl SwapEncoder + Send + Sync + Clone + 'static,
251 compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
252 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
253 health_monitor_channel_tx: Option<Broadcaster<MessageHealthEvent>>,
254 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
255) -> WorkerResult
256where
257 N: Network,
258 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone + 'static,
259{
260 subscribe!(compose_channel_rx);
261
262 loop {
263 tokio::select! {
264 msg = compose_channel_rx.recv() => {
265 let compose_request_msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
266 match compose_request_msg {
267 Ok(compose_request) =>{
268 if let SwapComposeMessage::Estimate(estimate_request) = compose_request.inner {
269 let compose_channel_tx_cloned = compose_channel_tx.clone();
270 let encoder_cloned = encoder.clone();
271 let client_cloned = client.clone();
272 let influxdb_channel_tx_cloned = influxdb_write_channel_tx.clone();
273 let health_monitor_channel_tx_cloned = health_monitor_channel_tx.clone();
274 tokio::task::spawn(
275 async move {
276 if let Err(e) = estimator_task(
277 client_cloned,
278 encoder_cloned,
279 estimate_request.clone(),
280 compose_channel_tx_cloned,
281 health_monitor_channel_tx_cloned,
282 influxdb_channel_tx_cloned,
283 ).await {
284 error!("Error in EVM estimator_task: {:?}", e);
285 }
286 }
287 );
288 }
289 }
290 Err(e)=>{error!("{e}")}
291 }
292 }
293 }
294 }
295}
296
297#[derive(Consumer, Producer)]
298pub struct EvmEstimatorActor<P, N, E, DB: Clone + Send + Sync + 'static> {
299 encoder: E,
300 client: Option<P>,
301 #[consumer]
302 compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
303 #[producer]
304 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
305 #[producer]
306 health_monitor_channel_tx: Option<Broadcaster<MessageHealthEvent>>,
307 #[producer]
308 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
309 _n: PhantomData<N>,
310}
311
312impl<P, N, E, DB> EvmEstimatorActor<P, N, E, DB>
313where
314 N: Network,
315 P: Provider<Ethereum>,
316 E: SwapEncoder + Send + Sync + Clone + 'static,
317 DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + 'static,
318{
319 pub fn new(encoder: E) -> Self {
320 Self {
321 encoder,
322 client: None,
323 compose_channel_tx: None,
324 compose_channel_rx: None,
325 health_monitor_channel_tx: None,
326 influxdb_write_channel_tx: None,
327 _n: PhantomData::<N>,
328 }
329 }
330
331 pub fn new_with_provider(encoder: E, client: Option<P>) -> Self {
332 Self {
333 encoder,
334 client,
335 compose_channel_tx: None,
336 compose_channel_rx: None,
337 health_monitor_channel_tx: None,
338 influxdb_write_channel_tx: None,
339 _n: PhantomData::<N>,
340 }
341 }
342
343 pub fn on_bc(self, bc: &Blockchain, strategy: &Strategy<DB>) -> Self {
344 Self {
345 compose_channel_tx: Some(strategy.swap_compose_channel()),
346 compose_channel_rx: Some(strategy.swap_compose_channel()),
347 health_monitor_channel_tx: Some(bc.health_monitor_channel()),
348 influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
349 ..self
350 }
351 }
352}
353
354impl<P, N, E, DB> Actor for EvmEstimatorActor<P, N, E, DB>
355where
356 N: Network,
357 P: Provider<N> + Send + Sync + Clone + 'static,
358 E: SwapEncoder + Clone + Send + Sync + 'static,
359 DB: DatabaseRef<Error = KabuDBError> + Database<Error = KabuDBError> + DatabaseCommit + DatabaseKabuExt + Send + Sync + Clone,
360{
361 fn start(&self) -> ActorResult {
362 let task = tokio::task::spawn(estimator_worker(
363 self.client.clone(),
364 self.encoder.clone(),
365 self.compose_channel_rx.clone().unwrap(),
366 self.compose_channel_tx.clone().unwrap(),
367 self.health_monitor_channel_tx.clone(),
368 self.influxdb_write_channel_tx.clone(),
369 ));
370 Ok(vec![task])
371 }
372 fn name(&self) -> &'static str {
373 "EvmEstimatorActor"
374 }
375}