1use alloy_eips::BlockNumberOrTag;
2use alloy_network::Network;
3use alloy_primitives::{Address, BlockNumber, TxHash, U256};
4use alloy_provider::Provider;
5use alloy_rpc_types::state::StateOverride;
6use alloy_rpc_types::BlockOverrides;
7use alloy_rpc_types_trace::geth::GethDebugTracingCallOptions;
8use eyre::{eyre, Result};
9use lazy_static::lazy_static;
10use revm::{Database, DatabaseCommit, DatabaseRef};
11use std::collections::hash_map::Entry;
12use std::collections::HashMap;
13use std::marker::PhantomData;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::{debug, error, warn};
17
18use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
19use kabu_core_actors_macros::{Accessor, Consumer, Producer};
20use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
21use kabu_node_debug_provider::DebugProviderExt;
22use kabu_types_blockchain::{debug_trace_call_diff, GethStateUpdateVec, KabuDataTypesEVM, KabuTx, Mempool, TRACING_CALL_OPTS};
23use kabu_types_entities::required_state::{accounts_vec_len, storage_vec_len};
24use kabu_types_entities::{LatestBlock, Market, MarketState};
25use kabu_types_events::{MarketEvents, MempoolEvents, StateUpdateEvent};
26
27use super::affected_pools_code::{get_affected_pools_from_code, is_pool_code};
28use super::affected_pools_state::get_affected_pools_from_state_update;
29
30lazy_static! {
31 static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
32}
33
34#[allow(clippy::too_many_arguments)]
36pub async fn pending_tx_state_change_task<P, N, DB, LDT>(
37 client: P,
38 tx_hash: TxHash,
39 market: SharedState<Market>,
40 mempool: SharedState<Mempool<LDT>>,
41 latest_block: SharedState<LatestBlock<LDT>>,
42 market_state: SharedState<MarketState<DB>>,
43 affecting_tx: Arc<RwLock<HashMap<TxHash, bool>>>,
44 cur_block_number: BlockNumber,
45 cur_block_time: u64,
46 cur_next_base_fee: u64,
47 cur_state_override: StateOverride,
48 state_updates_broadcaster: Broadcaster<StateUpdateEvent<DB, LDT>>,
49) -> Result<()>
50where
51 N: Network<TransactionRequest = LDT::TransactionRequest>,
52 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
53 DB: DatabaseRef + Database + DatabaseCommit + Clone + Send + Sync + 'static,
54 LDT: KabuDataTypesEVM,
55{
56 let mut state_update_vec: GethStateUpdateVec = Vec::new();
57 let mut state_required_vec: GethStateUpdateVec = Vec::new();
58
59 let mut merged_state_update_vec: GethStateUpdateVec = Vec::new();
60
61 let mempool_tx = match mempool.read().await.get_tx_by_hash(&tx_hash).cloned() {
62 Some(tx) => tx,
63 None => return Err(eyre!("MEMPOOL_TX_NOT_FOUND")),
64 };
65
66 let tx = match mempool_tx.tx.clone() {
67 Some(tx) => tx,
68 None => return Err(eyre!("NO_TX_IN_MEMPOOL")),
69 };
70
71 let source = mempool_tx.source.clone();
72
73 let transaction_request: LDT::TransactionRequest = tx.to_transaction_request();
74
75 let call_opts: GethDebugTracingCallOptions = GethDebugTracingCallOptions {
122 block_overrides: Some(BlockOverrides {
123 number: Some(U256::from(cur_block_number)),
124 time: Some(cur_block_time),
125 coinbase: Some(*COINBASE),
126 base_fee: Some(U256::from(cur_next_base_fee)),
127 ..Default::default()
128 }),
129 state_overrides: Some(cur_state_override.clone()),
130 ..TRACING_CALL_OPTS.clone()
131 };
132
133 if !(*affecting_tx.read().await.get(&tx_hash).unwrap_or(&true)) {
134 return Err(eyre!("NON_AFFECTING_TX"));
135 }
136
137 let diff_trace_result =
138 debug_trace_call_diff(client.clone(), transaction_request, BlockNumberOrTag::Latest.into(), Some(call_opts)).await;
139 match diff_trace_result {
140 Ok((pre, post)) => {
141 state_required_vec.push(pre.clone());
142 state_update_vec.push(post.clone());
143
144 merged_state_update_vec.push(pre);
145 merged_state_update_vec.push(post);
146 }
147 Err(error) => {
148 let tx_hash = tx.get_tx_hash();
149 mempool.write().await.set_failed(tx_hash);
150 debug!(block=cur_block_number, %tx_hash, %error, "debug_trace_call error for");
151 }
152 }
153
154 let affected_pools = get_affected_pools_from_state_update(market.clone(), &state_update_vec).await;
155
156 let accounts_len = accounts_vec_len(&state_update_vec);
157 let storage_len = storage_vec_len(&state_update_vec);
158
159 debug!(%tx_hash, %source, pools = affected_pools.len(), accounts = accounts_len, storage = storage_len, "Mempool affected pools");
160
161 affecting_tx.write().await.insert(tx_hash, !affected_pools.is_empty());
162
163 if let Some(latest_header) = latest_block.read().await.block_header.clone() {
165 let next_block_number = latest_header.number + 1;
166 let next_block_timestamp = latest_header.timestamp + 12;
167
168 if !affected_pools.is_empty() {
169 let cur_state_db = market_state.read().await.state_db.clone();
170 let request = StateUpdateEvent::new(
171 next_block_number,
172 next_block_timestamp,
173 cur_next_base_fee,
174 cur_state_db,
175 state_update_vec,
176 Some(state_required_vec.clone()),
177 affected_pools,
178 vec![tx_hash],
179 vec![mempool_tx.tx.clone().unwrap()],
180 "pending_tx_searcher".to_string(),
181 9000,
182 );
183 if let Err(e) = state_updates_broadcaster.send(request) {
184 error!("state_updates_broadcaster : {}", e)
185 }
186 }
187 } else {
188 error!("Latest header is empty")
189 }
190
191 if is_pool_code(&merged_state_update_vec) {
192 match get_affected_pools_from_code(client, market.clone(), &merged_state_update_vec).await {
193 Ok(affected_pools) => {
194 match affecting_tx.write().await.entry(tx_hash) {
195 Entry::Occupied(mut v) => {
196 if !v.get() {
197 v.insert(!affected_pools.is_empty());
198 }
199 }
200 Entry::Vacant(v) => {
201 v.insert(!affected_pools.is_empty());
202 }
203 };
204
205 debug!("Mempool code pools {} {} update len : {}", tx_hash, source, affected_pools.len());
206
207 if let Some(latest_header) = latest_block.read().await.block_header.clone() {
208 let block_number = latest_header.number + 1;
209 let block_timestamp = latest_header.timestamp + 12;
210
211 if !affected_pools.is_empty() {
212 let cur_state_db = market_state.read().await.state_db.clone();
213
214 let request = StateUpdateEvent::new(
215 block_number,
216 block_timestamp,
217 cur_next_base_fee,
218 cur_state_db,
219 merged_state_update_vec,
220 None,
221 affected_pools,
222 vec![tx_hash],
223 vec![mempool_tx.tx.unwrap()],
224 "poolcode_searcher".to_string(),
225 3000,
226 );
227 if let Err(e) = state_updates_broadcaster.send(request) {
228 error!("state_updates_broadcaster : {}", e)
229 }
230 }
231 } else {
232 error!("Latest header is empty")
233 }
234 }
235 Err(e) => {
236 debug!("code affected pools error : {e}")
237 }
238 }
239 }
240 Ok(())
241}
242
243#[allow(clippy::too_many_arguments)]
244pub async fn pending_tx_state_change_worker<P, N, DB, LDT>(
245 client: P,
246 market: SharedState<Market>,
247 mempool: SharedState<Mempool<LDT>>,
248 latest_block: SharedState<LatestBlock<LDT>>,
249 market_state: SharedState<MarketState<DB>>,
250 mempool_events_rx: Broadcaster<MempoolEvents>,
251 market_events_rx: Broadcaster<MarketEvents>,
252 state_updates_broadcaster: Broadcaster<StateUpdateEvent<DB, LDT>>,
253) -> WorkerResult
254where
255 N: Network<TransactionRequest = LDT::TransactionRequest>,
256 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
257 DB: DatabaseRef + Database + DatabaseCommit + Clone + Send + Sync + 'static,
258 LDT: KabuDataTypesEVM,
259{
260 subscribe!(mempool_events_rx);
261 subscribe!(market_events_rx);
262
263 let affecting_tx: Arc<RwLock<HashMap<TxHash, bool>>> = Arc::new(RwLock::new(HashMap::new()));
264 let mut cur_next_base_fee = 0;
265 let mut cur_block_number: Option<BlockNumber> = None;
266 let mut cur_block_time: Option<u64> = None;
267 let mut cur_state_override: StateOverride = StateOverride::default();
268
269 loop {
270 tokio::select! {
271 msg = market_events_rx.recv() => {
272 if let Ok(msg) = msg {
273 let market_event_msg : MarketEvents = msg;
274 if let MarketEvents::BlockHeaderUpdate{ block_number, block_hash, timestamp, base_fee, next_base_fee } = market_event_msg {
275 debug!("Block header update {} {} base_fee {} ", block_number, block_hash, base_fee);
276 cur_block_number = Some( block_number + 1);
277 cur_block_time = Some(timestamp + 12 );
278 cur_next_base_fee = next_base_fee;
279
280 for _counter in 0..5 {
281 if let Ok(msg) = market_events_rx.recv().await {
282 if matches!(msg, MarketEvents::BlockStateUpdate{..} ) {
283 cur_state_override = latest_block.read().await.node_state_override();
284 debug!("Block state update received {} {}", block_number, block_hash);
285 break;
286 }
287 }
288 }
289 }
290 }
291 }
292 msg = mempool_events_rx.recv() => {
293 if let Ok(msg) = msg {
294 let mempool_event_msg : MempoolEvents = msg;
295 if let MempoolEvents::MempoolActualTxUpdate{ tx_hash } = mempool_event_msg {
296 if cur_block_number.is_none() {
297 warn!("Did not received block header update yet!");
298 continue;
299 }
300
301 tokio::task::spawn(
302 pending_tx_state_change_task(
303 client.clone(),
304 tx_hash,
305 market.clone(),
306 mempool.clone(),
307 latest_block.clone(),
308 market_state.clone(),
309 affecting_tx.clone(),
310 cur_block_number.unwrap_or_default(),
311 cur_block_time.unwrap_or_default(),
312 cur_next_base_fee,
313 cur_state_override.clone(),
314 state_updates_broadcaster.clone(),
315 )
316 );
317 }
318 }
319 }
320 }
321 }
322}
323
324#[derive(Accessor, Consumer, Producer)]
325pub struct PendingTxStateChangeProcessorActor<P, N, DB: Clone + Send + Sync + 'static, LDT: KabuDataTypesEVM + 'static> {
326 client: P,
327 #[accessor]
328 market: Option<SharedState<Market>>,
329 #[accessor]
330 mempool: Option<SharedState<Mempool<LDT>>>,
331 #[accessor]
332 market_state: Option<SharedState<MarketState<DB>>>,
333 #[accessor]
334 latest_block: Option<SharedState<LatestBlock<LDT>>>,
335 #[consumer]
336 market_events_rx: Option<Broadcaster<MarketEvents>>,
337 #[consumer]
338 mempool_events_rx: Option<Broadcaster<MempoolEvents>>,
339 #[producer]
340 state_updates_tx: Option<Broadcaster<StateUpdateEvent<DB, LDT>>>,
341 _n: PhantomData<N>,
342}
343
344impl<P, N, DB, LDT> PendingTxStateChangeProcessorActor<P, N, DB, LDT>
345where
346 N: Network,
347 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
348 DB: DatabaseRef + Send + Sync + Clone + 'static,
349 LDT: KabuDataTypesEVM + 'static,
350{
351 pub fn new(client: P) -> PendingTxStateChangeProcessorActor<P, N, DB, LDT> {
352 PendingTxStateChangeProcessorActor {
353 client,
354 market: None,
355 mempool: None,
356 market_state: None,
357 latest_block: None,
358 market_events_rx: None,
359 mempool_events_rx: None,
360 state_updates_tx: None,
361 _n: PhantomData,
362 }
363 }
364
365 pub fn on_bc(self, bc: &Blockchain<LDT>, state: &BlockchainState<DB, LDT>, strategy: &Strategy<DB, LDT>) -> Self {
366 Self {
367 market: Some(bc.market()),
368 mempool: Some(bc.mempool()),
369 market_state: Some(state.market_state()),
370 latest_block: Some(bc.latest_block()),
371 market_events_rx: Some(bc.market_events_channel()),
372 mempool_events_rx: Some(bc.mempool_events_channel()),
373 state_updates_tx: Some(strategy.state_update_channel()),
374 ..self
375 }
376 }
377}
378
379impl<P, N, DB, LDT> Actor for PendingTxStateChangeProcessorActor<P, N, DB, LDT>
380where
381 N: Network<TransactionRequest = LDT::TransactionRequest>,
382 P: Provider<N> + DebugProviderExt<N> + Send + Sync + Clone + 'static,
383 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
384 LDT: KabuDataTypesEVM + 'static,
385{
386 fn start(&self) -> ActorResult {
387 let task = tokio::task::spawn(pending_tx_state_change_worker(
388 self.client.clone(),
389 self.market.clone().unwrap(),
390 self.mempool.clone().unwrap(),
391 self.latest_block.clone().unwrap(),
392 self.market_state.clone().unwrap(),
393 self.mempool_events_rx.clone().unwrap(),
394 self.market_events_rx.clone().unwrap(),
395 self.state_updates_tx.clone().unwrap(),
396 ));
397 Ok(vec![task])
398 }
399
400 fn name(&self) -> &'static str {
401 "PendingTxStateChangeProcessorActor"
402 }
403}