kabu_execution_estimator/
hardhat.rs1use alloy_consensus::TxEnvelope;
2use alloy_eips::eip2718::Encodable2718;
3use alloy_primitives::{Bytes, TxKind, U256};
4use alloy_provider::Provider;
5use alloy_rpc_types::{TransactionInput, TransactionRequest};
6use eyre::{eyre, Result};
7use revm::DatabaseRef;
8use tokio::sync::broadcast::error::RecvError;
9use tracing::{error, info};
10
11use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
12use kabu_core_actors_macros::{Consumer, Producer};
13use kabu_node_debug_provider::DebugProviderExt;
14use kabu_types_entities::SwapEncoder;
15use kabu_types_events::{MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData, TxState};
16
17async fn estimator_worker<DB: DatabaseRef + Send + Sync + Clone>(
18 swap_encoder: impl SwapEncoder,
19 compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
20 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
21) -> WorkerResult {
22 subscribe!(compose_channel_rx);
23
24 loop {
25 tokio::select! {
26 msg = compose_channel_rx.recv() => {
27 let compose_request_msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
28 match compose_request_msg {
29 Ok(compose_request) =>{
30 if let SwapComposeMessage::Estimate(estimate_request) = compose_request.inner {
31 info!("Hardhat estimation");
32 let token_in = estimate_request.swap.get_first_token().cloned().ok_or(eyre!("NO_TOKEN"))?;
33
34 let tx_signer = estimate_request.tx_compose.signer.clone().ok_or(eyre!("NO_SIGNER"))?;
35
36 let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
37 let gas_cost = U256::from(100_000 * gas_price);
38
39 let profit = estimate_request.swap.arb_profit();
40 if profit.is_zero() {
41 return Err(eyre!("NO_PROFIT"));
42 }
43 let profit_eth = token_in.calc_eth_value(profit).ok_or(eyre!("CALC_ETH_VALUE_FAILED"))?;
44
45 let (to, _call_value, call_data, _) = swap_encoder.encode(
46 estimate_request.swap.clone(),
47 estimate_request.tips_pct,
48 Some(estimate_request.tx_compose.next_block_number),
49 Some(gas_cost),
50 Some(tx_signer.address()),
51 Some(estimate_request.tx_compose.eth_balance),
52 )?;
53
54 let tx_request = TransactionRequest {
55 transaction_type : Some(2),
56 chain_id : Some(1),
57 from: Some(tx_signer.address()),
58 to: Some(TxKind::Call(to)),
59 gas: Some(estimate_request.tx_compose.gas),
60 value: Some(U256::from(1000)),
61 input: TransactionInput::new(call_data),
62 nonce: Some(estimate_request.tx_compose.nonce ),
63 max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
64 max_fee_per_gas: Some(estimate_request.tx_compose.next_block_base_fee as u128), ..TransactionRequest::default()
66 };
67
68 let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
69
70 if U256::from(300_000 * gas_price) > profit_eth {
71 error!("Profit is too small");
72 return Err(eyre!("TOO_SMALL_PROFIT"));
73 }
74
75 let enveloped_txs : Vec<TxEnvelope>= estimate_request.tx_compose.stuffing_txs.iter().map(|item| item.clone().into()).collect();
76 let stuffing_txs_rlp : Vec<Bytes> = enveloped_txs.into_iter().map(|x| Bytes::from(x.encoded_2718()) ).collect();
77
78 let mut tx_with_state: Vec<TxState> = stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();
79
80 tx_with_state.push(TxState::SignatureRequired(tx_request));
81
82 let sign_request = MessageSwapCompose::ready(
83 SwapComposeData{
84 tx_compose: TxComposeData{
85 tx_bundle : Some(tx_with_state),
86 ..estimate_request.tx_compose
87 },
88 ..estimate_request
89 }
90 );
91
92 if let Err(e) = compose_channel_tx.send(sign_request){
93 error!("{e}");
94 }
95 }
96 }
97 Err(e)=>{error!("{e}")}
98 }
99 }
100 }
101 }
102}
103
104#[allow(dead_code)]
105#[derive(Consumer, Producer)]
106pub struct HardhatEstimatorActor<P, E, DB: Send + Sync + Clone + 'static> {
107 client: P,
108 encoder: E,
109 #[consumer]
110 compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
111 #[producer]
112 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
113}
114
115impl<P, E, DB> HardhatEstimatorActor<P, E, DB>
116where
117 P: Provider + DebugProviderExt + Clone + Send + Sync + 'static,
118 E: SwapEncoder + Send + Sync + Clone + 'static,
119 DB: DatabaseRef + Send + Sync + Clone,
120{
121 pub fn new(client: P, encoder: E) -> Self {
122 Self { client, encoder, compose_channel_tx: None, compose_channel_rx: None }
123 }
124}
125
126impl<P, E, DB> Actor for HardhatEstimatorActor<P, E, DB>
127where
128 P: Provider + DebugProviderExt + Clone + Send + Sync + 'static,
129 E: SwapEncoder + Send + Sync + Clone + 'static,
130 DB: DatabaseRef + Send + Sync + Clone,
131{
132 fn start(&self) -> ActorResult {
133 let task = tokio::task::spawn(estimator_worker(
134 self.encoder.clone(),
135 self.compose_channel_rx.clone().unwrap(),
136 self.compose_channel_tx.clone().unwrap(),
137 ));
138 Ok(vec![task])
139 }
140
141 fn name(&self) -> &'static str {
142 "HardhatEstimatorActor"
143 }
144}