kabu_execution_estimator/
hardhat.rs

1use alloy_consensus::TxEnvelope;
2use alloy_eips::eip2718::Encodable2718;
3use alloy_primitives::{Bytes, TxKind, U256};
4use alloy_provider::Provider;
5use alloy_rpc_types::{TransactionInput, TransactionRequest};
6use eyre::{eyre, Result};
7use revm::DatabaseRef;
8use tokio::sync::broadcast::error::RecvError;
9use tracing::{error, info};
10
11use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
12use kabu_core_actors_macros::{Consumer, Producer};
13use kabu_node_debug_provider::DebugProviderExt;
14use kabu_types_entities::SwapEncoder;
15use kabu_types_events::{MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData, TxState};
16
17async fn estimator_worker<DB: DatabaseRef + Send + Sync + Clone>(
18    swap_encoder: impl SwapEncoder,
19    compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
20    compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
21) -> WorkerResult {
22    subscribe!(compose_channel_rx);
23
24    loop {
25        tokio::select! {
26                    msg = compose_channel_rx.recv() => {
27                        let compose_request_msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
28                        match compose_request_msg {
29                            Ok(compose_request) =>{
30                                if let SwapComposeMessage::Estimate(estimate_request) = compose_request.inner {
31                                    info!("Hardhat estimation");
32                                    let token_in = estimate_request.swap.get_first_token().cloned().ok_or(eyre!("NO_TOKEN"))?;
33
34                                    let tx_signer = estimate_request.tx_compose.signer.clone().ok_or(eyre!("NO_SIGNER"))?;
35
36                                    let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
37                                    let gas_cost = U256::from(100_000 * gas_price);
38
39                                    let profit = estimate_request.swap.arb_profit();
40                                    if profit.is_zero() {
41                                        return Err(eyre!("NO_PROFIT"));
42                                    }
43                                    let profit_eth = token_in.calc_eth_value(profit).ok_or(eyre!("CALC_ETH_VALUE_FAILED"))?;
44
45                                    let (to, _call_value, call_data, _) = swap_encoder.encode(
46                                        estimate_request.swap.clone(),
47                                        estimate_request.tips_pct,
48                                        Some(estimate_request.tx_compose.next_block_number),
49                                        Some(gas_cost),
50                                        Some(tx_signer.address()),
51                                        Some(estimate_request.tx_compose.eth_balance),
52                                    )?;
53
54                                    let tx_request = TransactionRequest {
55                                        transaction_type : Some(2),
56                                        chain_id : Some(1),
57                                        from: Some(tx_signer.address()),
58                                        to: Some(TxKind::Call(to)),
59                                        gas: Some(estimate_request.tx_compose.gas),
60                                        value: Some(U256::from(1000)),
61                                        input: TransactionInput::new(call_data),
62                                        nonce: Some(estimate_request.tx_compose.nonce ),
63                                        max_priority_fee_per_gas: Some(estimate_request.tx_compose.priority_gas_fee as u128),
64                                        max_fee_per_gas: Some(estimate_request.tx_compose.next_block_base_fee as u128), // TODO: Why not prio + base fee?
65                                        ..TransactionRequest::default()
66                                    };
67
68                                    let gas_price = estimate_request.tx_compose.priority_gas_fee + estimate_request.tx_compose.next_block_base_fee;
69
70                                    if U256::from(300_000 * gas_price) > profit_eth {
71                                        error!("Profit is too small");
72                                        return Err(eyre!("TOO_SMALL_PROFIT"));
73                                    }
74
75                                    let enveloped_txs : Vec<TxEnvelope>= estimate_request.tx_compose.stuffing_txs.iter().map(|item| item.clone().into()).collect();
76                                    let stuffing_txs_rlp : Vec<Bytes> = enveloped_txs.into_iter().map(|x| Bytes::from(x.encoded_2718()) ).collect();
77
78                                    let mut tx_with_state: Vec<TxState> = stuffing_txs_rlp.into_iter().map(TxState::ReadyForBroadcastStuffing).collect();
79
80                                    tx_with_state.push(TxState::SignatureRequired(tx_request));
81
82                                    let sign_request = MessageSwapCompose::ready(
83                                        SwapComposeData{
84                                            tx_compose: TxComposeData{
85                                            tx_bundle : Some(tx_with_state),
86                                        ..estimate_request.tx_compose
87                                            },
88                                            ..estimate_request
89                                        }
90                                    );
91
92                                    if let Err(e) = compose_channel_tx.send(sign_request){
93                                        error!("{e}");
94                                    }
95                                }
96                            }
97                    Err(e)=>{error!("{e}")}
98                }
99            }
100        }
101    }
102}
103
104#[allow(dead_code)]
105#[derive(Consumer, Producer)]
106pub struct HardhatEstimatorActor<P, E, DB: Send + Sync + Clone + 'static> {
107    client: P,
108    encoder: E,
109    #[consumer]
110    compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
111    #[producer]
112    compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
113}
114
115impl<P, E, DB> HardhatEstimatorActor<P, E, DB>
116where
117    P: Provider + DebugProviderExt + Clone + Send + Sync + 'static,
118    E: SwapEncoder + Send + Sync + Clone + 'static,
119    DB: DatabaseRef + Send + Sync + Clone,
120{
121    pub fn new(client: P, encoder: E) -> Self {
122        Self { client, encoder, compose_channel_tx: None, compose_channel_rx: None }
123    }
124}
125
126impl<P, E, DB> Actor for HardhatEstimatorActor<P, E, DB>
127where
128    P: Provider + DebugProviderExt + Clone + Send + Sync + 'static,
129    E: SwapEncoder + Send + Sync + Clone + 'static,
130    DB: DatabaseRef + Send + Sync + Clone,
131{
132    fn start(&self) -> ActorResult {
133        let task = tokio::task::spawn(estimator_worker(
134            self.encoder.clone(),
135            self.compose_channel_rx.clone().unwrap(),
136            self.compose_channel_tx.clone().unwrap(),
137        ));
138        Ok(vec![task])
139    }
140
141    fn name(&self) -> &'static str {
142        "HardhatEstimatorActor"
143    }
144}