kabu_defi_health_monitor/
stuffing_tx_monitor.rs1use alloy_consensus::transaction::Transaction;
2use alloy_network::{Ethereum, TransactionResponse};
3use alloy_primitives::{Address, TxHash, U256};
4use alloy_provider::Provider;
5use eyre::{eyre, Result};
6use influxdb::{Timestamp, WriteQuery};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::broadcast::error::RecvError;
10use tokio::sync::broadcast::Receiver;
11use tracing::{error, info};
12
13use kabu_core_blockchain::Blockchain;
14use kabu_evm_utils::NWETH;
15use kabu_types_entities::{LatestBlock, Swap, Token};
16
17use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
18use kabu_core_actors_macros::{Accessor, Consumer, Producer};
19use kabu_types_blockchain::debug_trace_transaction;
20use kabu_types_events::{MarketEvents, MessageTxCompose, TxComposeMessageType};
21
22#[derive(Clone, Debug)]
23struct TxToCheck {
24 block: u64,
25 token_in: Token,
26 profit: U256,
27 cost: U256,
28 tips: U256,
29 swap: Swap,
30}
31
32async fn calc_coinbase_diff<P: Provider<Ethereum> + 'static>(client: P, tx_hash: TxHash, coinbase: Address) -> Result<U256> {
33 let (pre, post) = debug_trace_transaction(client, tx_hash, true).await?;
34
35 let coinbase_pre = pre.get(&coinbase).ok_or(eyre!("COINBASE_NOT_FOUND_IN_PRE"))?;
36 let coinbase_post = post.get(&coinbase).ok_or(eyre!("COINBASE_NOT_FOUND_IN_POST"))?;
37
38 let balance_diff = coinbase_post.balance.unwrap_or_default().checked_sub(coinbase_pre.balance.unwrap_or_default()).unwrap_or_default();
39 info!("Stuffing tx mined MF tx: {:?} sent to coinbase: {}", tx_hash, NWETH::to_float(balance_diff));
40
41 Ok(balance_diff)
42}
43
44pub async fn stuffing_tx_monitor_worker<P: Provider<Ethereum> + Clone + 'static>(
45 client: P,
46 latest_block: SharedState<LatestBlock>,
47 tx_compose_channel_rx: Broadcaster<MessageTxCompose>,
48 market_events_rx: Broadcaster<MarketEvents>,
49 influxdb_write_channel_tx: Broadcaster<WriteQuery>,
50) -> WorkerResult {
51 let mut tx_compose_channel_rx: Receiver<MessageTxCompose> = tx_compose_channel_rx.subscribe();
52 let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe();
53
54 let mut txs_to_check: HashMap<TxHash, TxToCheck> = HashMap::new();
55
56 loop {
57 tokio::select! {
58 msg = market_events_rx.recv() => {
59 let market_event_msg : Result<MarketEvents, RecvError> = msg;
60 match market_event_msg {
61 Ok(market_event)=>{
62 if let MarketEvents::BlockTxUpdate{ block_number,..} = market_event {
63 let coinbase = latest_block.read().await.coinbase().unwrap_or_default();
64 if let Some(txs) = latest_block.read().await.txs() {
65 for (idx, tx) in txs.iter().enumerate() {
66 let tx_hash = tx.tx_hash();
67 if let Some(tx_to_check) = txs_to_check.get(&tx_hash).cloned(){
68 info!("Stuffing tx found mined {:?} block: {} -> {} idx: {} profit: {} tips: {} token: {} to: {:?} {}", tx.tx_hash(), tx_to_check.block, block_number, idx, NWETH::to_float(tx_to_check.profit), NWETH::to_float(tx_to_check.tips), tx_to_check.token_in.get_symbol(), tx.to().unwrap_or_default(), tx_to_check.swap );
69 if idx < txs.len() - 1 {
70 let others_tx = &txs[idx+1];
71 let others_tx_hash = others_tx.tx_hash();
72 let client_clone = client.clone();
73 let influx_channel_clone = influxdb_write_channel_tx.clone();
74 info!("Stuffing tx mined {:?} MF tx: {:?} to: {:?}", tx.tx_hash(), others_tx.tx_hash(), others_tx.to().unwrap_or_default() );
75 tokio::task::spawn( async move {
76 if let Ok(coinbase_diff) = calc_coinbase_diff(client_clone, others_tx_hash, coinbase).await {
77 let start_time_utc = chrono::Utc::now();
78 let bribe = NWETH::to_float(tx_to_check.tips);
79 let others_bribe = NWETH::to_float(coinbase_diff);
80 let cost = NWETH::to_float(tx_to_check.cost);
81
82 let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "stuffing_mined")
83 .add_field("our_bribe", bribe)
84 .add_field("our_cost", cost)
85 .add_field("others_bribe", others_bribe)
86 .add_tag("tx_block", tx_to_check.block)
87 .add_tag("block", block_number)
88 .add_tag("block_idx", idx as u64)
89 .add_tag("stuffing_tx", tx_hash.to_string())
90 .add_tag("other_tx", others_tx_hash.to_string());
91 if let Err(e) = influx_channel_clone.send(write_query) {
92 error!("Failed to send block latency to influxdb: {:?}", e);
93 }
94 };
95 });
96 }
97 txs_to_check.remove::<TxHash>(&tx.tx_hash());
98 }
99 }
100 }
101 info!("Stuffing txs to check : {} at block {}", txs_to_check.len(), block_number);
102
103
104 let start_time_utc = chrono::Utc::now();
105
106 let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "stuffing_waiting").add_field("value", txs_to_check.len() as u64).add_tag("block", block_number);
107 if let Err(e) = influxdb_write_channel_tx.send(write_query) {
108 error!("Failed to send block latency to influxdb: {:?}", e);
109 }
110 }
111 }
112 Err(e)=>{
113 error!("market_event_rx error : {e}")
114 }
115 }
116 },
117
118 msg = tx_compose_channel_rx.recv() => {
119 let tx_compose_update : Result<MessageTxCompose, RecvError> = msg;
120 match tx_compose_update {
121 Ok(tx_compose_msg)=>{
122 if let TxComposeMessageType::Sign(tx_compose_data) = tx_compose_msg.inner {
123 for stuffing_tx_hash in tx_compose_data.stuffing_txs_hashes.iter() {
124 let Some(swap) = & tx_compose_data.swap else {continue};
125
126 let token_in = swap.get_first_token().map_or(
127 Arc::new(Token::new(Address::repeat_byte(0x11))), |x| x.clone()
128 );
129
130 let cost = U256::from(tx_compose_data.next_block_base_fee + tx_compose_data.priority_gas_fee) * U256::from(tx_compose_data.gas);
131
132 let entry = txs_to_check.entry(*stuffing_tx_hash).or_insert(
133 TxToCheck{
134 block : tx_compose_data.next_block_number,
135 token_in : token_in.as_ref().clone(),
136 profit : U256::ZERO,
137 tips : U256::ZERO,
138 swap : swap.clone(),
139 cost,
140 }
141 );
142 let profit = swap.arb_profit();
143 let profit = token_in.calc_eth_value(profit).unwrap_or_default();
144
145 if entry.profit < profit {
146 entry.token_in = token_in.as_ref().clone();
147 entry.profit = profit;
148 entry.tips = tx_compose_data.tips.unwrap_or_default();
149
150 entry.swap = swap.clone();
151 }
152 }
153 }
154 }
155 Err(e)=>{
156 error!("tx_compose_channel_rx : {e}")
157 }
158 }
159 }
160 }
161 }
162}
163
164#[derive(Accessor, Consumer, Producer)]
165pub struct StuffingTxMonitorActor<P> {
166 client: P,
167 #[accessor]
168 latest_block: Option<SharedState<LatestBlock>>,
169 #[consumer]
170 tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose>>,
171 #[consumer]
172 market_events_rx: Option<Broadcaster<MarketEvents>>,
173 #[producer]
174 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
175}
176
177impl<P: Provider<Ethereum> + Send + Sync + Clone + 'static> StuffingTxMonitorActor<P> {
178 pub fn new(client: P) -> Self {
179 StuffingTxMonitorActor {
180 client,
181 latest_block: None,
182 tx_compose_channel_rx: None,
183 market_events_rx: None,
184 influxdb_write_channel_tx: None,
185 }
186 }
187
188 pub fn on_bc(self, bc: &Blockchain) -> Self {
189 Self {
190 latest_block: Some(bc.latest_block()),
191 tx_compose_channel_rx: Some(bc.tx_compose_channel()),
192 market_events_rx: Some(bc.market_events_channel()),
193 influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
194 ..self
195 }
196 }
197}
198
199impl<P> Actor for StuffingTxMonitorActor<P>
200where
201 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
202{
203 fn start(&self) -> ActorResult {
204 let task = tokio::task::spawn(stuffing_tx_monitor_worker(
205 self.client.clone(),
206 self.latest_block.clone().unwrap(),
207 self.tx_compose_channel_rx.clone().unwrap(),
208 self.market_events_rx.clone().unwrap(),
209 self.influxdb_write_channel_tx.clone().unwrap(),
210 ));
211 Ok(vec![task])
212 }
213
214 fn name(&self) -> &'static str {
215 "StuffingTxMonitorActor"
216 }
217}