kabu_execution_estimator/
geth.rs

1use revm::DatabaseRef;
2use std::sync::Arc;
3
4use alloy_consensus::TxEnvelope;
5use alloy_eips::eip2718::Encodable2718;
6use alloy_network::Ethereum;
7use alloy_primitives::{Bytes, TxKind, U256};
8use alloy_provider::Provider;
9use alloy_rpc_types::{TransactionInput, TransactionRequest};
10use eyre::{eyre, Result};
11use tokio::sync::broadcast::error::RecvError;
12use tracing::{debug, error, info};
13
14use kabu_core_blockchain::Strategy;
15use kabu_evm_utils::NWETH;
16use kabu_types_entities::{Swap, SwapEncoder};
17
18use kabu_broadcast_flashbots::Flashbots;
19use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
20use kabu_core_actors_macros::{Consumer, Producer};
21use kabu_types_blockchain::KabuTx;
22use kabu_types_events::{MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData, TxState};
23
24async fn estimator_task<P: Provider<Ethereum> + Send + Sync + Clone + 'static, DB: DatabaseRef + Send + Sync + Clone>(
25    estimate_request: SwapComposeData<DB>,
26    client: Arc<Flashbots<P>>,
27    swap_encoder: impl SwapEncoder,
28    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
29) -> Result<()> {
30    let token_in = estimate_request.swap.get_first_token().cloned().ok_or(eyre!("NO_TOKEN"))?;
31
32    let tx_signer = estimate_request.tx_compose.signer.clone().ok_or(eyre!("NO_SIGNER"))?;
33
34    let profit = estimate_request.swap.arb_profit();
35    if profit.is_zero() {
36        return Err(eyre!("NO_PROFIT"));
37    }
38
39    let profit_eth = token_in.calc_eth_value(profit).ok_or(eyre!("CALC_ETH_VALUE_FAILED"))?;
40
41    let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
42    let gas_cost = U256::from(100_000 * gas_price);
43
44    let (to, _, call_data, _) = swap_encoder.encode(
45        estimate_request.swap.clone(),
46        estimate_request.tips_pct,
47        Some(estimate_request.tx_compose.next_block_number),
48        Some(gas_cost),
49        Some(tx_signer.address()),
50        Some(estimate_request.tx_compose.eth_balance),
51    )?;
52
53    let mut tx_request = TransactionRequest {
54        transaction_type: Some(2),
55        chain_id: Some(1),
56        from: Some(tx_signer.address()),
57        to: Some(TxKind::Call(to)),
58        gas: Some(estimate_request.tx_compose.gas),
59        value: Some(U256::from(1000)),
60        nonce: Some(estimate_request.tx_compose.nonce),
61        max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
62        max_fee_per_gas: Some(estimate_request.tx_compose.next_block_base_fee as u128),
63        input: TransactionInput::new(call_data.clone()),
64        ..TransactionRequest::default()
65    };
66
67    let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
68
69    if U256::from(200_000 * gas_price) > profit_eth {
70        error!("Profit is too small");
71        return Err(eyre!("TOO_SMALL_PROFIT"));
72    }
73
74    let encoded_txes: Vec<TxEnvelope> =
75        estimate_request.tx_compose.stuffing_txs.iter().map(|item| TxEnvelope::from(item.clone())).collect();
76
77    let stuffing_txs_rlp: Vec<Bytes> = encoded_txes.into_iter().map(|x| Bytes::from(x.encoded_2718())).collect();
78
79    let mut simulation_bundle = stuffing_txs_rlp.clone();
80
81    //let typed_tx = tx_request.clone().into();
82    let tx = tx_signer.sign(tx_request.clone()).await?;
83    let tx_hash = KabuTx::get_tx_hash(&tx);
84    let tx_rlp = tx.encode();
85
86    simulation_bundle.push(Bytes::from(tx_rlp));
87
88    let start_time = chrono::Local::now();
89
90    match client.simulate_txes(simulation_bundle, estimate_request.tx_compose.next_block_number, Some(vec![tx_hash])).await {
91        Ok(sim_result) => {
92            let sim_duration = chrono::Local::now() - start_time;
93            debug!(
94                "Simulation result received Gas used : {} CB : {}  {} {}",
95                sim_result.gas_used, sim_result.coinbase_tip, sim_result.coinbase_diff, sim_duration
96            );
97            debug!("Simulation swap step");
98            for tx_sim_result in sim_result.transactions.iter() {
99                let prefix = if tx_sim_result.revert.is_none() && tx_sim_result.error.is_none() { "++" } else { "--" };
100                info!("{} {}", prefix, tx_sim_result);
101            }
102
103            if let Some(tx_sim_result) = sim_result.find_tx(tx_hash) {
104                if let Some(error) = &tx_sim_result.error {
105                    error!(" --- Simulation error : {} {}", error, sim_duration);
106                    return Err(eyre!("TX_SIMULATION_ERROR"));
107                }
108                if let Some(revert) = &tx_sim_result.revert {
109                    error!(" --- Simulation revert : {} {}", revert, sim_duration);
110                    return Err(eyre!("TX_SIMULATION_REVERT"));
111                }
112
113                let gas = tx_sim_result.gas_used.to();
114
115                if let Some(access_list) = tx_sim_result.access_list.clone() {
116                    let swap = estimate_request.swap.clone();
117
118                    tx_request.access_list = Some(access_list.clone());
119                    let gas_cost = U256::from(gas * gas_price);
120                    if gas_cost < profit_eth {
121                        let (to, call_value, call_data, tips_vec) = match estimate_request.swap {
122                            Swap::ExchangeSwapLine(_) => (to, None, call_data, vec![]),
123                            _ => swap_encoder.encode(
124                                estimate_request.swap.clone(),
125                                estimate_request.tips_pct,
126                                Some(estimate_request.tx_compose.next_block_number),
127                                Some(gas_cost),
128                                Some(tx_signer.address()),
129                                Some(estimate_request.tx_compose.eth_balance),
130                            )?,
131                        };
132
133                        let tx_request = TransactionRequest {
134                            transaction_type: Some(2),
135                            chain_id: Some(1),
136                            from: Some(tx_signer.address()),
137                            to: Some(TxKind::Call(to)),
138                            gas: Some((gas * 1500) / 1000),
139                            value: call_value,
140                            input: TransactionInput::new(call_data),
141                            nonce: Some(estimate_request.tx_compose.nonce),
142                            access_list: Some(access_list),
143                            max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
144                            max_fee_per_gas: Some(estimate_request.tx_compose.next_block_base_fee as u128), // TODO: Why not prio + base fee?
145                            ..TransactionRequest::default()
146                        };
147
148                        let mut tx_with_state: Vec<TxState> =
149                            stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();
150
151                        tx_with_state.push(TxState::SignatureRequired(tx_request));
152
153                        let total_tips = tips_vec.into_iter().map(|v| v.tips).sum();
154
155                        let sign_request = MessageSwapCompose::ready(SwapComposeData {
156                            tx_compose: TxComposeData { gas, ..estimate_request.tx_compose },
157                            tips: Some(total_tips + gas_cost),
158                            ..estimate_request
159                        });
160
161                        match compose_channel_tx.send(sign_request) {
162                            Ok(_) => {
163                                info!("Simulated bundle broadcast to flashbots")
164                            }
165                            Err(e) => {
166                                error!("{}", e)
167                            }
168                        }
169
170                        let gas_cost_f64 = NWETH::to_float(gas_cost);
171                        let tips_f64 = NWETH::to_float(total_tips);
172                        let profit_eth_f64 = NWETH::to_float(profit_eth);
173                        let profit_f64 = token_in.to_float(profit);
174                        info!(
175                            " +++ Simulation successful. {:#32x} Cost {} Profit {} ProfitEth {} Tips {} {} {} {}",
176                            tx_hash, gas_cost_f64, profit_f64, profit_eth_f64, tips_f64, tx_sim_result, swap, sim_duration
177                        )
178                    } else {
179                        error!(" --- Simulation error : profit does not cover gas cost {} {} {}", gas_cost, profit, sim_duration);
180                        return Err(eyre!("BAD_PROFIT"));
181                    }
182                } else {
183                    error!(" --- Simulation error : Access list not found in simulated transaction");
184                    return Err(eyre!("ACL_NOT_FOUND_IN_SIMULATION"));
185                }
186            } else {
187                error!("Simulation error : Transaction not found in simulated bundle");
188                return Err(eyre!("TX_NOT_FOUND_IN_SIMULATION"));
189            }
190        }
191        Err(e) => {
192            error!("Simulation error {}", e);
193            return Err(eyre!("SIMULATION_ERROR"));
194        }
195    }
196
197    Ok(())
198}
199
200async fn estimator_worker<P: Provider<Ethereum> + Send + Sync + Clone + 'static, DB: DatabaseRef + Send + Sync + Clone>(
201    client: Arc<Flashbots<P>>,
202    encoder: impl SwapEncoder + Send + Sync + Clone + 'static,
203    compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
204    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
205) -> WorkerResult {
206    subscribe!(compose_channel_rx);
207
208    loop {
209        tokio::select! {
210            msg = compose_channel_rx.recv() => {
211                let compose_request_msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
212                match compose_request_msg {
213                    Ok(compose_request) =>{
214                        if let SwapComposeMessage::Estimate(estimate_request) = compose_request.inner {
215                            let compose_channel_tx_cloned = compose_channel_tx.clone();
216                            let client_cloned = client.clone();
217                            let encoder_cloned = encoder.clone();
218                            tokio::task::spawn(async move {
219                                if let Err(e) = estimator_task(
220                                    estimate_request.clone(),
221                                    client_cloned,
222                                    encoder_cloned,
223                                    compose_channel_tx_cloned,
224                                ).await {
225                                        error!("Error in Geth estimator_task: {:?}", e);
226                                    }
227                                }
228                            );
229                        }
230                    }
231                    Err(e)=>{error!("{e}")}
232                }
233            }
234        }
235    }
236}
237
238#[derive(Consumer, Producer)]
239pub struct GethEstimatorActor<P, E, DB: Clone + Send + Sync + 'static> {
240    client: Arc<Flashbots<P>>,
241    encoder: E,
242    #[consumer]
243    compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
244    #[producer]
245    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
246}
247
248impl<P, E, DB> GethEstimatorActor<P, E, DB>
249where
250    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
251    E: SwapEncoder + Send + Sync + Clone + 'static,
252    DB: DatabaseRef + Send + Sync + Clone,
253{
254    pub fn new(client: Arc<Flashbots<P>>, encoder: E) -> Self {
255        Self { client, encoder, compose_channel_tx: None, compose_channel_rx: None }
256    }
257
258    pub fn on_bc(self, strategy: &Strategy<DB>) -> Self {
259        Self {
260            compose_channel_tx: Some(strategy.swap_compose_channel()),
261            compose_channel_rx: Some(strategy.swap_compose_channel()),
262            ..self
263        }
264    }
265}
266
267impl<P, E, DB> Actor for GethEstimatorActor<P, E, DB>
268where
269    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
270    E: SwapEncoder + Send + Sync + Clone + 'static,
271    DB: DatabaseRef + Send + Sync + Clone,
272{
273    fn start(&self) -> ActorResult {
274        let task = tokio::task::spawn(estimator_worker(
275            self.client.clone(),
276            self.encoder.clone(),
277            self.compose_channel_rx.clone().unwrap(),
278            self.compose_channel_tx.clone().unwrap(),
279        ));
280        Ok(vec![task])
281    }
282
283    fn name(&self) -> &'static str {
284        "GethEstimatorActor"
285    }
286}