kabu_execution_estimator/
geth.rs1use revm::DatabaseRef;
2use std::sync::Arc;
3
4use alloy_consensus::TxEnvelope;
5use alloy_eips::eip2718::Encodable2718;
6use alloy_network::Ethereum;
7use alloy_primitives::{Bytes, TxKind, U256};
8use alloy_provider::Provider;
9use alloy_rpc_types::{TransactionInput, TransactionRequest};
10use eyre::{eyre, Result};
11use tokio::sync::broadcast::error::RecvError;
12use tracing::{debug, error, info};
13
14use kabu_core_blockchain::Strategy;
15use kabu_evm_utils::NWETH;
16use kabu_types_entities::{Swap, SwapEncoder};
17
18use kabu_broadcast_flashbots::Flashbots;
19use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
20use kabu_core_actors_macros::{Consumer, Producer};
21use kabu_types_blockchain::KabuTx;
22use kabu_types_events::{MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData, TxState};
23
24async fn estimator_task<P: Provider<Ethereum> + Send + Sync + Clone + 'static, DB: DatabaseRef + Send + Sync + Clone>(
25 estimate_request: SwapComposeData<DB>,
26 client: Arc<Flashbots<P>>,
27 swap_encoder: impl SwapEncoder,
28 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
29) -> Result<()> {
30 let token_in = estimate_request.swap.get_first_token().cloned().ok_or(eyre!("NO_TOKEN"))?;
31
32 let tx_signer = estimate_request.tx_compose.signer.clone().ok_or(eyre!("NO_SIGNER"))?;
33
34 let profit = estimate_request.swap.arb_profit();
35 if profit.is_zero() {
36 return Err(eyre!("NO_PROFIT"));
37 }
38
39 let profit_eth = token_in.calc_eth_value(profit).ok_or(eyre!("CALC_ETH_VALUE_FAILED"))?;
40
41 let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
42 let gas_cost = U256::from(100_000 * gas_price);
43
44 let (to, _, call_data, _) = swap_encoder.encode(
45 estimate_request.swap.clone(),
46 estimate_request.tips_pct,
47 Some(estimate_request.tx_compose.next_block_number),
48 Some(gas_cost),
49 Some(tx_signer.address()),
50 Some(estimate_request.tx_compose.eth_balance),
51 )?;
52
53 let mut tx_request = TransactionRequest {
54 transaction_type: Some(2),
55 chain_id: Some(1),
56 from: Some(tx_signer.address()),
57 to: Some(TxKind::Call(to)),
58 gas: Some(estimate_request.tx_compose.gas),
59 value: Some(U256::from(1000)),
60 nonce: Some(estimate_request.tx_compose.nonce),
61 max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
62 max_fee_per_gas: Some(estimate_request.tx_compose.next_block_base_fee as u128),
63 input: TransactionInput::new(call_data.clone()),
64 ..TransactionRequest::default()
65 };
66
67 let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
68
69 if U256::from(200_000 * gas_price) > profit_eth {
70 error!("Profit is too small");
71 return Err(eyre!("TOO_SMALL_PROFIT"));
72 }
73
74 let encoded_txes: Vec<TxEnvelope> =
75 estimate_request.tx_compose.stuffing_txs.iter().map(|item| TxEnvelope::from(item.clone())).collect();
76
77 let stuffing_txs_rlp: Vec<Bytes> = encoded_txes.into_iter().map(|x| Bytes::from(x.encoded_2718())).collect();
78
79 let mut simulation_bundle = stuffing_txs_rlp.clone();
80
81 let tx = tx_signer.sign(tx_request.clone()).await?;
83 let tx_hash = KabuTx::get_tx_hash(&tx);
84 let tx_rlp = tx.encode();
85
86 simulation_bundle.push(Bytes::from(tx_rlp));
87
88 let start_time = chrono::Local::now();
89
90 match client.simulate_txes(simulation_bundle, estimate_request.tx_compose.next_block_number, Some(vec![tx_hash])).await {
91 Ok(sim_result) => {
92 let sim_duration = chrono::Local::now() - start_time;
93 debug!(
94 "Simulation result received Gas used : {} CB : {} {} {}",
95 sim_result.gas_used, sim_result.coinbase_tip, sim_result.coinbase_diff, sim_duration
96 );
97 debug!("Simulation swap step");
98 for tx_sim_result in sim_result.transactions.iter() {
99 let prefix = if tx_sim_result.revert.is_none() && tx_sim_result.error.is_none() { "++" } else { "--" };
100 info!("{} {}", prefix, tx_sim_result);
101 }
102
103 if let Some(tx_sim_result) = sim_result.find_tx(tx_hash) {
104 if let Some(error) = &tx_sim_result.error {
105 error!(" --- Simulation error : {} {}", error, sim_duration);
106 return Err(eyre!("TX_SIMULATION_ERROR"));
107 }
108 if let Some(revert) = &tx_sim_result.revert {
109 error!(" --- Simulation revert : {} {}", revert, sim_duration);
110 return Err(eyre!("TX_SIMULATION_REVERT"));
111 }
112
113 let gas = tx_sim_result.gas_used.to();
114
115 if let Some(access_list) = tx_sim_result.access_list.clone() {
116 let swap = estimate_request.swap.clone();
117
118 tx_request.access_list = Some(access_list.clone());
119 let gas_cost = U256::from(gas * gas_price);
120 if gas_cost < profit_eth {
121 let (to, call_value, call_data, tips_vec) = match estimate_request.swap {
122 Swap::ExchangeSwapLine(_) => (to, None, call_data, vec![]),
123 _ => swap_encoder.encode(
124 estimate_request.swap.clone(),
125 estimate_request.tips_pct,
126 Some(estimate_request.tx_compose.next_block_number),
127 Some(gas_cost),
128 Some(tx_signer.address()),
129 Some(estimate_request.tx_compose.eth_balance),
130 )?,
131 };
132
133 let tx_request = TransactionRequest {
134 transaction_type: Some(2),
135 chain_id: Some(1),
136 from: Some(tx_signer.address()),
137 to: Some(TxKind::Call(to)),
138 gas: Some((gas * 1500) / 1000),
139 value: call_value,
140 input: TransactionInput::new(call_data),
141 nonce: Some(estimate_request.tx_compose.nonce),
142 access_list: Some(access_list),
143 max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
144 max_fee_per_gas: Some(estimate_request.tx_compose.next_block_base_fee as u128), ..TransactionRequest::default()
146 };
147
148 let mut tx_with_state: Vec<TxState> =
149 stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();
150
151 tx_with_state.push(TxState::SignatureRequired(tx_request));
152
153 let total_tips = tips_vec.into_iter().map(|v| v.tips).sum();
154
155 let sign_request = MessageSwapCompose::ready(SwapComposeData {
156 tx_compose: TxComposeData { gas, ..estimate_request.tx_compose },
157 tips: Some(total_tips + gas_cost),
158 ..estimate_request
159 });
160
161 match compose_channel_tx.send(sign_request) {
162 Ok(_) => {
163 info!("Simulated bundle broadcast to flashbots")
164 }
165 Err(e) => {
166 error!("{}", e)
167 }
168 }
169
170 let gas_cost_f64 = NWETH::to_float(gas_cost);
171 let tips_f64 = NWETH::to_float(total_tips);
172 let profit_eth_f64 = NWETH::to_float(profit_eth);
173 let profit_f64 = token_in.to_float(profit);
174 info!(
175 " +++ Simulation successful. {:#32x} Cost {} Profit {} ProfitEth {} Tips {} {} {} {}",
176 tx_hash, gas_cost_f64, profit_f64, profit_eth_f64, tips_f64, tx_sim_result, swap, sim_duration
177 )
178 } else {
179 error!(" --- Simulation error : profit does not cover gas cost {} {} {}", gas_cost, profit, sim_duration);
180 return Err(eyre!("BAD_PROFIT"));
181 }
182 } else {
183 error!(" --- Simulation error : Access list not found in simulated transaction");
184 return Err(eyre!("ACL_NOT_FOUND_IN_SIMULATION"));
185 }
186 } else {
187 error!("Simulation error : Transaction not found in simulated bundle");
188 return Err(eyre!("TX_NOT_FOUND_IN_SIMULATION"));
189 }
190 }
191 Err(e) => {
192 error!("Simulation error {}", e);
193 return Err(eyre!("SIMULATION_ERROR"));
194 }
195 }
196
197 Ok(())
198}
199
200async fn estimator_worker<P: Provider<Ethereum> + Send + Sync + Clone + 'static, DB: DatabaseRef + Send + Sync + Clone>(
201 client: Arc<Flashbots<P>>,
202 encoder: impl SwapEncoder + Send + Sync + Clone + 'static,
203 compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
204 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
205) -> WorkerResult {
206 subscribe!(compose_channel_rx);
207
208 loop {
209 tokio::select! {
210 msg = compose_channel_rx.recv() => {
211 let compose_request_msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
212 match compose_request_msg {
213 Ok(compose_request) =>{
214 if let SwapComposeMessage::Estimate(estimate_request) = compose_request.inner {
215 let compose_channel_tx_cloned = compose_channel_tx.clone();
216 let client_cloned = client.clone();
217 let encoder_cloned = encoder.clone();
218 tokio::task::spawn(async move {
219 if let Err(e) = estimator_task(
220 estimate_request.clone(),
221 client_cloned,
222 encoder_cloned,
223 compose_channel_tx_cloned,
224 ).await {
225 error!("Error in Geth estimator_task: {:?}", e);
226 }
227 }
228 );
229 }
230 }
231 Err(e)=>{error!("{e}")}
232 }
233 }
234 }
235 }
236}
237
238#[derive(Consumer, Producer)]
239pub struct GethEstimatorActor<P, E, DB: Clone + Send + Sync + 'static> {
240 client: Arc<Flashbots<P>>,
241 encoder: E,
242 #[consumer]
243 compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
244 #[producer]
245 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
246}
247
248impl<P, E, DB> GethEstimatorActor<P, E, DB>
249where
250 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
251 E: SwapEncoder + Send + Sync + Clone + 'static,
252 DB: DatabaseRef + Send + Sync + Clone,
253{
254 pub fn new(client: Arc<Flashbots<P>>, encoder: E) -> Self {
255 Self { client, encoder, compose_channel_tx: None, compose_channel_rx: None }
256 }
257
258 pub fn on_bc(self, strategy: &Strategy<DB>) -> Self {
259 Self {
260 compose_channel_tx: Some(strategy.swap_compose_channel()),
261 compose_channel_rx: Some(strategy.swap_compose_channel()),
262 ..self
263 }
264 }
265}
266
267impl<P, E, DB> Actor for GethEstimatorActor<P, E, DB>
268where
269 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
270 E: SwapEncoder + Send + Sync + Clone + 'static,
271 DB: DatabaseRef + Send + Sync + Clone,
272{
273 fn start(&self) -> ActorResult {
274 let task = tokio::task::spawn(estimator_worker(
275 self.client.clone(),
276 self.encoder.clone(),
277 self.compose_channel_rx.clone().unwrap(),
278 self.compose_channel_tx.clone().unwrap(),
279 ));
280 Ok(vec![task])
281 }
282
283 fn name(&self) -> &'static str {
284 "GethEstimatorActor"
285 }
286}