kabu_defi_health_monitor/
stuffing_tx_monitor.rs

1use alloy_consensus::transaction::Transaction;
2use alloy_network::{Ethereum, TransactionResponse};
3use alloy_primitives::{Address, TxHash, U256};
4use alloy_provider::Provider;
5use eyre::{eyre, Result};
6use influxdb::{Timestamp, WriteQuery};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::broadcast::error::RecvError;
10use tokio::sync::broadcast::Receiver;
11use tracing::{error, info};
12
13use kabu_core_blockchain::Blockchain;
14use kabu_evm_utils::NWETH;
15use kabu_types_entities::{LatestBlock, Swap, Token};
16
17use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
18use kabu_core_actors_macros::{Accessor, Consumer, Producer};
19use kabu_types_blockchain::debug_trace_transaction;
20use kabu_types_events::{MarketEvents, MessageTxCompose, TxComposeMessageType};
21
22#[derive(Clone, Debug)]
23struct TxToCheck {
24    block: u64,
25    token_in: Token,
26    profit: U256,
27    cost: U256,
28    tips: U256,
29    swap: Swap,
30}
31
32async fn calc_coinbase_diff<P: Provider<Ethereum> + 'static>(client: P, tx_hash: TxHash, coinbase: Address) -> Result<U256> {
33    let (pre, post) = debug_trace_transaction(client, tx_hash, true).await?;
34
35    let coinbase_pre = pre.get(&coinbase).ok_or(eyre!("COINBASE_NOT_FOUND_IN_PRE"))?;
36    let coinbase_post = post.get(&coinbase).ok_or(eyre!("COINBASE_NOT_FOUND_IN_POST"))?;
37
38    let balance_diff = coinbase_post.balance.unwrap_or_default().checked_sub(coinbase_pre.balance.unwrap_or_default()).unwrap_or_default();
39    info!("Stuffing tx mined MF tx: {:?} sent to coinbase: {}", tx_hash, NWETH::to_float(balance_diff));
40
41    Ok(balance_diff)
42}
43
44pub async fn stuffing_tx_monitor_worker<P: Provider<Ethereum> + Clone + 'static>(
45    client: P,
46    latest_block: SharedState<LatestBlock>,
47    tx_compose_channel_rx: Broadcaster<MessageTxCompose>,
48    market_events_rx: Broadcaster<MarketEvents>,
49    influxdb_write_channel_tx: Broadcaster<WriteQuery>,
50) -> WorkerResult {
51    let mut tx_compose_channel_rx: Receiver<MessageTxCompose> = tx_compose_channel_rx.subscribe();
52    let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe();
53
54    let mut txs_to_check: HashMap<TxHash, TxToCheck> = HashMap::new();
55
56    loop {
57        tokio::select! {
58            msg = market_events_rx.recv() => {
59                let market_event_msg : Result<MarketEvents, RecvError> = msg;
60                match market_event_msg {
61                    Ok(market_event)=>{
62                        if let MarketEvents::BlockTxUpdate{ block_number,..} = market_event {
63                            let coinbase =  latest_block.read().await.coinbase().unwrap_or_default();
64                            if let Some(txs) = latest_block.read().await.txs() {
65                                for (idx, tx) in txs.iter().enumerate() {
66                                    let tx_hash = tx.tx_hash();
67                                    if let Some(tx_to_check) = txs_to_check.get(&tx_hash).cloned(){
68                                        info!("Stuffing tx found mined {:?} block: {} -> {} idx: {} profit: {} tips: {} token: {} to: {:?} {}", tx.tx_hash(), tx_to_check.block, block_number, idx, NWETH::to_float(tx_to_check.profit), NWETH::to_float(tx_to_check.tips), tx_to_check.token_in.get_symbol(), tx.to().unwrap_or_default(), tx_to_check.swap );
69                                        if idx < txs.len() - 1 {
70                                            let others_tx = &txs[idx+1];
71                                            let others_tx_hash = others_tx.tx_hash();
72                                            let client_clone = client.clone();
73                                            let influx_channel_clone = influxdb_write_channel_tx.clone();
74                                            info!("Stuffing tx mined {:?} MF tx: {:?} to: {:?}", tx.tx_hash(), others_tx.tx_hash(), others_tx.to().unwrap_or_default() );
75                                            tokio::task::spawn( async move {
76                                                if let Ok(coinbase_diff)  = calc_coinbase_diff(client_clone, others_tx_hash, coinbase).await {
77                                                    let start_time_utc =   chrono::Utc::now();
78                                                    let bribe = NWETH::to_float(tx_to_check.tips);
79                                                    let others_bribe = NWETH::to_float(coinbase_diff);
80                                                    let cost = NWETH::to_float(tx_to_check.cost);
81
82                                                    let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "stuffing_mined")
83                                                        .add_field("our_bribe", bribe)
84                                                        .add_field("our_cost", cost)
85                                                        .add_field("others_bribe", others_bribe)
86                                                        .add_tag("tx_block", tx_to_check.block)
87                                                        .add_tag("block", block_number)
88                                                        .add_tag("block_idx", idx as u64)
89                                                        .add_tag("stuffing_tx", tx_hash.to_string())
90                                                        .add_tag("other_tx", others_tx_hash.to_string());
91                                                    if let Err(e) = influx_channel_clone.send(write_query) {
92                                                       error!("Failed to send block latency to influxdb: {:?}", e);
93                                                    }
94                                                };
95                                            });
96                                        }
97                                        txs_to_check.remove::<TxHash>(&tx.tx_hash());
98                                    }
99                                }
100                            }
101                            info!("Stuffing txs to check : {} at block {}", txs_to_check.len(), block_number);
102
103
104                            let start_time_utc =   chrono::Utc::now();
105
106                            let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "stuffing_waiting").add_field("value", txs_to_check.len() as u64).add_tag("block", block_number);
107                            if let Err(e) = influxdb_write_channel_tx.send(write_query) {
108                               error!("Failed to send block latency to influxdb: {:?}", e);
109                            }
110                        }
111                    }
112                    Err(e)=>{
113                        error!("market_event_rx error : {e}")
114                    }
115                }
116            },
117
118            msg = tx_compose_channel_rx.recv() => {
119                let tx_compose_update : Result<MessageTxCompose, RecvError>  = msg;
120                match tx_compose_update {
121                    Ok(tx_compose_msg)=>{
122                        if let TxComposeMessageType::Sign(tx_compose_data) = tx_compose_msg.inner {
123                            for stuffing_tx_hash in tx_compose_data.stuffing_txs_hashes.iter() {
124                                let Some(swap) = & tx_compose_data.swap else {continue};
125
126                                let token_in = swap.get_first_token().map_or(
127                                    Arc::new(Token::new(Address::repeat_byte(0x11))), |x| x.clone()
128                                );
129
130                                let cost = U256::from(tx_compose_data.next_block_base_fee + tx_compose_data.priority_gas_fee) * U256::from(tx_compose_data.gas);
131
132                                let entry = txs_to_check.entry(*stuffing_tx_hash).or_insert(
133                                        TxToCheck{
134                                                block : tx_compose_data.next_block_number,
135                                                token_in : token_in.as_ref().clone(),
136                                                profit : U256::ZERO,
137                                                tips : U256::ZERO,
138                                                swap : swap.clone(),
139                                                cost,
140                                        }
141                                );
142                                let profit = swap.arb_profit();
143                                let profit = token_in.calc_eth_value(profit).unwrap_or_default();
144
145                                if entry.profit < profit {
146                                    entry.token_in = token_in.as_ref().clone();
147                                    entry.profit = profit;
148                                    entry.tips = tx_compose_data.tips.unwrap_or_default();
149
150                                    entry.swap = swap.clone();
151                                }
152                            }
153                        }
154                    }
155                    Err(e)=>{
156                        error!("tx_compose_channel_rx : {e}")
157                    }
158                }
159            }
160        }
161    }
162}
163
164#[derive(Accessor, Consumer, Producer)]
165pub struct StuffingTxMonitorActor<P> {
166    client: P,
167    #[accessor]
168    latest_block: Option<SharedState<LatestBlock>>,
169    #[consumer]
170    tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose>>,
171    #[consumer]
172    market_events_rx: Option<Broadcaster<MarketEvents>>,
173    #[producer]
174    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
175}
176
177impl<P: Provider<Ethereum> + Send + Sync + Clone + 'static> StuffingTxMonitorActor<P> {
178    pub fn new(client: P) -> Self {
179        StuffingTxMonitorActor {
180            client,
181            latest_block: None,
182            tx_compose_channel_rx: None,
183            market_events_rx: None,
184            influxdb_write_channel_tx: None,
185        }
186    }
187
188    pub fn on_bc(self, bc: &Blockchain) -> Self {
189        Self {
190            latest_block: Some(bc.latest_block()),
191            tx_compose_channel_rx: Some(bc.tx_compose_channel()),
192            market_events_rx: Some(bc.market_events_channel()),
193            influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
194            ..self
195        }
196    }
197}
198
199impl<P> Actor for StuffingTxMonitorActor<P>
200where
201    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
202{
203    fn start(&self) -> ActorResult {
204        let task = tokio::task::spawn(stuffing_tx_monitor_worker(
205            self.client.clone(),
206            self.latest_block.clone().unwrap(),
207            self.tx_compose_channel_rx.clone().unwrap(),
208            self.market_events_rx.clone().unwrap(),
209            self.influxdb_write_channel_tx.clone().unwrap(),
210        ));
211        Ok(vec![task])
212    }
213
214    fn name(&self) -> &'static str {
215        "StuffingTxMonitorActor"
216    }
217}