kabu_anvil/
main.rs

1use std::env;
2use std::fmt::{Display, Formatter};
3use std::process::exit;
4use std::sync::Arc;
5use std::time::Duration;
6
7use alloy_rpc_types_eth::TransactionTrait;
8
9use alloy_provider::network::TransactionResponse;
10
11use crate::flashbots_mock::mount_flashbots_mock;
12use crate::flashbots_mock::BundleRequest;
13use crate::test_config::TestConfig;
14use alloy_primitives::{address, TxHash, U256};
15use alloy_provider::network::eip2718::Encodable2718;
16use alloy_provider::Provider;
17use alloy_rpc_types::{BlockId, BlockNumberOrTag};
18use clap::Parser;
19use kabu::node::debug_provider::AnvilDebugProviderFactory;
20
21use eyre::{OptionExt, Result};
22use influxdb::WriteQuery;
23use kabu::broadcast::accounts::{InitializeSignersOneShotBlockingActor, NonceAndBalanceMonitorActor, TxSignersActor};
24use kabu::broadcast::broadcaster::{AnvilBroadcastActor, FlashbotsBroadcastActor};
25use kabu::broadcast::flashbots::client::RelayConfig;
26use kabu::broadcast::flashbots::Flashbots;
27use kabu::core::actors::{Accessor, Actor, Broadcaster, Consumer, Producer, SharedState};
28use kabu::core::block_history::BlockHistoryActor;
29use kabu::core::router::SwapRouterActor;
30use kabu::defi::address_book::TokenAddressEth;
31use kabu::defi::health_monitor::StuffingTxMonitorActor;
32use kabu::defi::market::{fetch_and_add_pool_by_pool_id, fetch_state_and_add_pool};
33use kabu::defi::pools::protocols::CurveProtocol;
34use kabu::defi::pools::{CurvePool, PoolLoadersBuilder, PoolsLoadingConfig};
35use kabu::defi::preloader::MarketStatePreloadedOneShotActor;
36use kabu::defi::price::PriceActor;
37use kabu::evm::db::KabuDBType;
38use kabu::evm::utils::NWETH;
39use kabu::execution::estimator::EvmEstimatorActor;
40use kabu::execution::multicaller::{MulticallerDeployer, MulticallerSwapEncoder};
41use kabu::node::actor_config::NodeBlockActorConfig;
42use kabu::node::json_rpc::NodeBlockActor;
43use kabu::strategy::backrun::{BackrunConfig, StateChangeArbActor};
44use kabu::strategy::merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
45use kabu::types::blockchain::{debug_trace_block, ChainParameters, KabuDataTypesEthereum, Mempool};
46use kabu::types::entities::{
47    AccountNonceAndBalanceState, BlockHistory, LatestBlock, Market, MarketState, PoolClass, PoolId, Swap, Token, TxSigners,
48};
49use kabu::types::events::{
50    MarketEvents, MempoolEvents, MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageHealthEvent,
51    MessageSwapCompose, MessageTxCompose, SwapComposeMessage,
52};
53use tracing::{debug, error, info};
54use tracing_subscriber::layer::SubscriberExt;
55use tracing_subscriber::util::SubscriberInitExt;
56use tracing_subscriber::{fmt, EnvFilter, Layer};
57use wiremock::MockServer;
58
59mod flashbots_mock;
60mod test_config;
61
62#[derive(Clone, Default, Debug)]
63struct Stat {
64    found_counter: usize,
65    sign_counter: usize,
66    best_profit_eth: U256,
67    best_swap: Option<Swap>,
68}
69
70impl Display for Stat {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        match &self.best_swap {
73            Some(swap) => match swap.get_first_token() {
74                Some(token) => {
75                    write!(
76                        f,
77                        "Found: {} Ok: {} Profit : {} / ProfitEth : {} Path : {} ",
78                        self.found_counter,
79                        self.sign_counter,
80                        token.to_float(swap.arb_profit()),
81                        NWETH::to_float(swap.arb_profit_eth()),
82                        swap
83                    )
84                }
85                None => {
86                    write!(
87                        f,
88                        "Found: {} Ok: {} Profit : {} / ProfitEth : {} Path : {} ",
89                        self.found_counter,
90                        self.sign_counter,
91                        swap.arb_profit(),
92                        swap.arb_profit_eth(),
93                        swap
94                    )
95                }
96            },
97            _ => {
98                write!(f, "NO BEST SWAP")
99            }
100        }
101    }
102}
103
104#[allow(dead_code)]
105fn parse_tx_hashes(tx_hash_vec: Vec<&str>) -> Result<Vec<TxHash>> {
106    let mut ret: Vec<TxHash> = Vec::new();
107    for tx_hash in tx_hash_vec {
108        ret.push(tx_hash.parse()?);
109    }
110    Ok(ret)
111}
112
113#[derive(Parser, Debug)]
114struct Commands {
115    #[arg(short, long)]
116    config: String,
117
118    /// Timout in seconds after the test fails
119    #[arg(short, long, default_value = "10")]
120    timeout: u64,
121
122    /// Wait xx seconds before start re-broadcasting
123    #[arg(short, long, default_value = "1")]
124    wait_init: u64,
125}
126
127#[tokio::main]
128async fn main() -> Result<()> {
129    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "debug,alloy_rpc_client=off,kabu_multicaller=trace".into());
130    let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
131
132    tracing_subscriber::registry().with(fmt_layer).init();
133
134    let args = Commands::parse();
135    let test_config = TestConfig::from_file(args.config.clone()).await?;
136    let node_url = env::var("MAINNET_WS")?;
137    let client = AnvilDebugProviderFactory::from_node_on_block(node_url, test_config.settings.block).await?;
138    let priv_key = client.privkey()?.to_bytes().to_vec();
139
140    let mut mock_server: Option<MockServer> = None;
141    if test_config.modules.flashbots {
142        // Start flashbots mock server
143        mock_server = Some(MockServer::start().await);
144        mount_flashbots_mock(mock_server.as_ref().unwrap()).await;
145    }
146
147    //let multicaller_address = MulticallerDeployer::new().deploy(client.clone(), priv_key.clone()).await?.address().ok_or_eyre("MULTICALLER_NOT_DEPLOYED")?;
148    let multicaller_address = MulticallerDeployer::new()
149        .set_code(client.clone(), address!("FCfCfcfC0AC30164AFdaB927F441F2401161F358"))
150        .await?
151        .address()
152        .ok_or_eyre("MULTICALLER_NOT_DEPLOYED")?;
153    info!("Multicaller deployed at {:?}", multicaller_address);
154
155    let multicaller_encoder = MulticallerSwapEncoder::default_with_address(multicaller_address);
156
157    let block_number = client.get_block_number().await?;
158    info!("Current block_number={}", block_number);
159
160    let block_header = client.get_block(block_number.into()).await?.unwrap().header;
161    info!("Current block_header={:?}", block_header);
162
163    let block_header_with_txes = client.get_block(block_number.into()).await?.unwrap();
164
165    let cache_db = KabuDBType::default();
166    let mut market_instance = Market::default();
167    let market_state_instance = MarketState::new(cache_db.clone());
168
169    // Add default tokens for price actor
170    let usdc_token = Token::new_with_data(TokenAddressEth::USDC, Some("USDC".to_string()), None, Some(6), true, false);
171    let usdt_token = Token::new_with_data(TokenAddressEth::USDT, Some("USDT".to_string()), None, Some(6), true, false);
172    let wbtc_token = Token::new_with_data(TokenAddressEth::WBTC, Some("WBTC".to_string()), None, Some(8), true, false);
173    let dai_token = Token::new_with_data(TokenAddressEth::DAI, Some("DAI".to_string()), None, Some(18), true, false);
174    market_instance.add_token(usdc_token);
175    market_instance.add_token(usdt_token);
176    market_instance.add_token(wbtc_token);
177    market_instance.add_token(dai_token);
178
179    let mempool_instance = Mempool::<KabuDataTypesEthereum>::new();
180
181    info!("Creating channels");
182    let new_block_headers_channel: Broadcaster<MessageBlockHeader> = Broadcaster::new(10);
183    let new_block_with_tx_channel: Broadcaster<MessageBlock> = Broadcaster::new(10);
184    let new_block_state_update_channel: Broadcaster<MessageBlockStateUpdate> = Broadcaster::new(10);
185    let new_block_logs_channel: Broadcaster<MessageBlockLogs> = Broadcaster::new(10);
186
187    let market_events_channel: Broadcaster<MarketEvents> = Broadcaster::new(100);
188    let mempool_events_channel: Broadcaster<MempoolEvents> = Broadcaster::new(500);
189    let pool_health_monitor_channel: Broadcaster<MessageHealthEvent> = Broadcaster::new(100);
190
191    let influx_channel: Broadcaster<WriteQuery> = Broadcaster::new(100);
192
193    let market_instance = SharedState::new(market_instance);
194    let market_state = SharedState::new(market_state_instance);
195    let mempool_instance = SharedState::new(mempool_instance);
196    let block_history_state = SharedState::new(BlockHistory::new(10));
197
198    let tx_signers = TxSigners::new();
199    let accounts_state = AccountNonceAndBalanceState::new();
200
201    let tx_signers = SharedState::new(tx_signers);
202    let accounts_state = SharedState::new(accounts_state);
203
204    let latest_block = SharedState::new(LatestBlock::new(block_number, block_header.hash));
205
206    let (_, post) = debug_trace_block(client.clone(), BlockId::Number(BlockNumberOrTag::Number(block_number)), true).await?;
207    latest_block.write().await.update(
208        block_number,
209        block_header.hash,
210        Some(block_header.clone()),
211        Some(block_header_with_txes),
212        None,
213        Some(post),
214    );
215
216    info!("Starting initialize signers actor");
217
218    let mut initialize_signers_actor = InitializeSignersOneShotBlockingActor::new(Some(priv_key));
219    match initialize_signers_actor.access(tx_signers.clone()).access(accounts_state.clone()).start_and_wait() {
220        Err(e) => {
221            error!("{}", e);
222            panic!("Cannot initialize signers");
223        }
224        _ => info!("Signers have been initialized"),
225    }
226
227    for (token_name, token_config) in test_config.tokens {
228        let symbol = token_config.symbol.unwrap_or(token_config.address.to_checksum(None));
229        let name = token_config.name.unwrap_or(symbol.clone());
230        let token = Token::new_with_data(
231            token_config.address,
232            Some(symbol),
233            Some(name),
234            Some(token_config.decimals.map_or(18, |x| x)),
235            token_config.basic.unwrap_or_default(),
236            token_config.middle.unwrap_or_default(),
237        );
238        if let Some(price_float) = token_config.price {
239            let price_u256 = NWETH::from_float(price_float) * token.get_exp() / NWETH::get_exp();
240            debug!("Setting price : {} -> {} ({})", token_name, price_u256, price_u256.to::<u128>());
241
242            token.set_eth_price(Some(price_u256));
243        };
244
245        market_instance.write().await.add_token(token);
246    }
247
248    info!("Starting market state preload actor");
249    let mut market_state_preload_actor = MarketStatePreloadedOneShotActor::new(client.clone())
250        .with_copied_account(multicaller_encoder.get_contract_address())
251        .with_signers(tx_signers.clone());
252    match market_state_preload_actor.access(market_state.clone()).start_and_wait() {
253        Err(e) => {
254            error!("{}", e)
255        }
256        _ => {
257            info!("Market state preload actor started successfully")
258        }
259    }
260
261    info!("Starting node actor");
262    let mut node_block_actor = NodeBlockActor::new(client.clone(), NodeBlockActorConfig::all_enabled());
263    match node_block_actor
264        .produce(new_block_headers_channel.clone())
265        .produce(new_block_with_tx_channel.clone())
266        .produce(new_block_logs_channel.clone())
267        .produce(new_block_state_update_channel.clone())
268        .start()
269    {
270        Err(e) => {
271            error!("{}", e)
272        }
273        _ => {
274            info!("Node actor started successfully")
275        }
276    }
277
278    info!("Starting nonce and balance monitor actor");
279    let mut nonce_and_balance_monitor = NonceAndBalanceMonitorActor::new(client.clone());
280    match nonce_and_balance_monitor
281        .access(accounts_state.clone())
282        .access(latest_block.clone())
283        .consume(market_events_channel.clone())
284        .start()
285    {
286        Err(e) => {
287            error!("{}", e);
288            panic!("Cannot initialize nonce and balance monitor");
289        }
290        _ => info!("Nonce monitor has been initialized"),
291    }
292
293    info!("Starting price actor");
294    let mut price_actor = PriceActor::new(client.clone()).only_once();
295    match price_actor.access(market_instance.clone()).start_and_wait() {
296        Err(e) => {
297            error!("{}", e);
298            panic!("Cannot initialize price actor");
299        }
300        _ => info!("Price actor has been initialized"),
301    }
302
303    let pool_loaders =
304        Arc::new(PoolLoadersBuilder::<_, _, KabuDataTypesEthereum>::default_pool_loaders(client.clone(), PoolsLoadingConfig::default()));
305
306    for (pool_name, pool_config) in test_config.pools {
307        match pool_config.class {
308            PoolClass::UniswapV2 | PoolClass::UniswapV3 => {
309                debug!(address=%pool_config.address, class=%pool_config.class, "Loading pool");
310                fetch_and_add_pool_by_pool_id(
311                    client.clone(),
312                    market_instance.clone(),
313                    market_state.clone(),
314                    pool_loaders.clone(),
315                    PoolId::Address(pool_config.address),
316                    pool_config.class,
317                )
318                .await?;
319                debug!(address=%pool_config.address, class=%pool_config.class, "Loaded pool");
320            }
321            PoolClass::Curve => {
322                debug!("Loading curve pool");
323                if let Ok(curve_contract) = CurveProtocol::get_contract_from_code(client.clone(), pool_config.address).await {
324                    let curve_pool = CurvePool::fetch_pool_data_with_default_encoder(client.clone(), curve_contract).await?;
325                    fetch_state_and_add_pool::<_, _, _, KabuDataTypesEthereum>(
326                        client.clone(),
327                        market_instance.clone(),
328                        market_state.clone(),
329                        curve_pool.into(),
330                    )
331                    .await?;
332                } else {
333                    error!("CURVE_POOL_NOT_LOADED");
334                }
335                debug!("Loaded curve pool");
336            }
337            _ => {
338                error!("Unknown pool class")
339            }
340        }
341        let swap_path_len = market_instance.read().await.get_pool_paths(&PoolId::Address(pool_config.address)).unwrap_or_default().len();
342        info!(
343            "Loaded pool '{}' with address={}, pool_class={}, swap_paths={}",
344            pool_name, pool_config.address, pool_config.class, swap_path_len
345        );
346    }
347
348    info!("Starting block history actor");
349    let mut block_history_actor = BlockHistoryActor::new(client.clone());
350    match block_history_actor
351        .access(latest_block.clone())
352        .access(market_state.clone())
353        .access(block_history_state.clone())
354        .consume(new_block_headers_channel.clone())
355        .consume(new_block_with_tx_channel.clone())
356        .consume(new_block_logs_channel.clone())
357        .consume(new_block_state_update_channel.clone())
358        .produce(market_events_channel.clone())
359        .start()
360    {
361        Err(e) => {
362            error!("{}", e)
363        }
364        _ => {
365            info!("Block history actor started successfully")
366        }
367    }
368
369    let swap_compose_channel: Broadcaster<MessageSwapCompose<KabuDBType>> = Broadcaster::new(100);
370    let tx_compose_channel: Broadcaster<MessageTxCompose> = Broadcaster::new(100);
371
372    let mut broadcast_actor = AnvilBroadcastActor::new(client.clone());
373    match broadcast_actor.consume(tx_compose_channel.clone()).start() {
374        Err(e) => error!("{}", e),
375        _ => {
376            info!("Broadcast actor started successfully")
377        }
378    }
379
380    // Start estimator actor
381    let mut estimator_actor = EvmEstimatorActor::new_with_provider(multicaller_encoder.clone(), Some(client.clone()));
382    match estimator_actor.consume(swap_compose_channel.clone()).produce(swap_compose_channel.clone()).start() {
383        Err(e) => error!("{e}"),
384        _ => {
385            info!("Estimate actor started successfully")
386        }
387    }
388
389    let mut health_monitor_actor = StuffingTxMonitorActor::new(client.clone());
390    match health_monitor_actor
391        .access(latest_block.clone())
392        .consume(market_events_channel.clone())
393        .consume(tx_compose_channel.clone())
394        .produce(influx_channel.clone())
395        .start()
396    {
397        Ok(_) => {
398            //tasks.extend(r);
399            info!("Stuffing tx monitor actor started")
400        }
401        Err(e) => {
402            panic!("StuffingTxMonitorActor error {e}")
403        }
404    }
405
406    // Start actor that encodes paths found
407    if test_config.modules.encoder {
408        info!("Starting swap router actor");
409
410        let mut swap_router_actor = SwapRouterActor::new();
411
412        match swap_router_actor
413            .access(tx_signers.clone())
414            .access(accounts_state.clone())
415            .consume(swap_compose_channel.clone())
416            .produce(swap_compose_channel.clone())
417            .produce(tx_compose_channel.clone())
418            .start()
419        {
420            Err(e) => {
421                error!("{}", e)
422            }
423            _ => {
424                info!("Swap router actor started successfully")
425            }
426        }
427    }
428
429    // Start signer actor that signs paths before broadcasting
430    if test_config.modules.signer {
431        info!("Starting signers actor");
432        let mut signers_actor = TxSignersActor::new();
433        match signers_actor.consume(tx_compose_channel.clone()).produce(tx_compose_channel.clone()).start() {
434            Err(e) => {
435                error!("{}", e);
436                panic!("Cannot start signers");
437            }
438            _ => info!("Signers actor started"),
439        }
440    }
441
442    // Start state change arb actor
443    if test_config.modules.arb_block || test_config.modules.arb_mempool {
444        info!("Starting state change arb actor");
445        let mut state_change_arb_actor = StateChangeArbActor::new(
446            client.clone(),
447            test_config.modules.arb_block,
448            test_config.modules.arb_mempool,
449            BackrunConfig::new_dumb(),
450        );
451        match state_change_arb_actor
452            .access(mempool_instance.clone())
453            .access(latest_block.clone())
454            .access(market_instance.clone())
455            .access(market_state.clone())
456            .access(block_history_state.clone())
457            .consume(market_events_channel.clone())
458            .consume(mempool_events_channel.clone())
459            .produce(swap_compose_channel.clone())
460            .produce(pool_health_monitor_channel.clone())
461            .produce(influx_channel.clone())
462            .start()
463        {
464            Err(e) => {
465                error!("{}", e)
466            }
467            _ => {
468                info!("State change arb actor started successfully")
469            }
470        }
471    }
472
473    // Swap path merger tries to build swap steps from swap lines
474    if test_config.modules.arb_path_merger {
475        info!("Starting swap path merger actor");
476
477        let mut swap_path_merger_actor = ArbSwapPathMergerActor::new(multicaller_address);
478        match swap_path_merger_actor
479            .access(latest_block.clone())
480            .consume(swap_compose_channel.clone())
481            .consume(market_events_channel.clone())
482            .produce(swap_compose_channel.clone())
483            .start()
484        {
485            Err(e) => {
486                error!("{}", e)
487            }
488            _ => {
489                info!("Swap path merger actor started successfully")
490            }
491        }
492    }
493
494    // Same path merger tries to merge different stuffing tx to optimize swap line
495    if test_config.modules.same_path_merger {
496        let mut same_path_merger_actor = SamePathMergerActor::new(client.clone());
497        match same_path_merger_actor
498            .access(market_state.clone())
499            .access(latest_block.clone())
500            .consume(swap_compose_channel.clone())
501            .consume(market_events_channel.clone())
502            .produce(swap_compose_channel.clone())
503            .start()
504        {
505            Err(e) => {
506                error!("{}", e)
507            }
508            _ => {
509                info!("Same path merger actor started successfully")
510            }
511        }
512    }
513    if test_config.modules.flashbots {
514        let relays = vec![RelayConfig { id: 1, url: mock_server.as_ref().unwrap().uri(), name: "relay".to_string(), no_sign: Some(false) }];
515        let flashbots = Flashbots::new(client.clone(), "https://unused", None).with_relays(relays);
516        let mut flashbots_broadcast_actor = FlashbotsBroadcastActor::new(flashbots, true);
517        match flashbots_broadcast_actor.consume(tx_compose_channel.clone()).start() {
518            Err(e) => {
519                error!("{}", e)
520            }
521            _ => {
522                info!("Flashbots broadcast actor started successfully")
523            }
524        }
525    }
526
527    // Diff path merger tries to merge all found swaplines into one transaction s
528    let mut diff_path_merger_actor = DiffPathMergerActor::new();
529    match diff_path_merger_actor
530        .consume(swap_compose_channel.clone())
531        .consume(market_events_channel.clone())
532        .produce(swap_compose_channel.clone())
533        .start()
534    {
535        Err(e) => {
536            error!("{}", e)
537        }
538        _ => {
539            info!("Diff path merger actor started successfully")
540        }
541    }
542
543    // #### Blockchain events
544    // we need to wait for all actors to start. For the CI it can be a bit longer
545    tokio::time::sleep(Duration::from_secs(args.wait_init)).await;
546
547    let next_block_base_fee = ChainParameters::ethereum().calc_next_block_base_fee(
548        block_header.gas_used,
549        block_header.gas_limit,
550        block_header.base_fee_per_gas.unwrap_or_default(),
551    );
552
553    let market_events_channel_clone = market_events_channel.clone();
554
555    // Sending block header update message
556    if let Err(e) = market_events_channel_clone.send(MarketEvents::BlockHeaderUpdate {
557        block_number: block_header.number,
558        block_hash: block_header.hash,
559        timestamp: block_header.timestamp,
560        base_fee: block_header.base_fee_per_gas.unwrap_or_default(),
561        next_base_fee: next_block_base_fee,
562    }) {
563        error!("{}", e);
564    }
565
566    // Sending block state update message
567    if let Err(e) = market_events_channel_clone.send(MarketEvents::BlockStateUpdate { block_hash: block_header.hash }) {
568        error!("{}", e);
569    }
570
571    // #### RE-BROADCASTER
572    //starting broadcasting transactions from eth to anvil
573    let client_clone = client.clone();
574    tokio::spawn(async move {
575        info!("Re-broadcaster task started");
576
577        for (_, tx_config) in test_config.txs.iter() {
578            debug!("Fetching original tx {}", tx_config.hash);
579            let Some(tx) = client_clone.get_transaction_by_hash(tx_config.hash).await.unwrap() else {
580                panic!("Cannot get tx: {}", tx_config.hash);
581            };
582
583            let from = tx.from();
584            let to = tx.to().unwrap_or_default();
585
586            match tx_config.send.to_lowercase().as_str() {
587                "mempool" => {
588                    let mut mempool_guard = mempool_instance.write().await;
589                    let tx_hash: TxHash = tx.tx_hash();
590
591                    mempool_guard.add_tx(tx.clone());
592                    if let Err(e) = mempool_events_channel.send(MempoolEvents::MempoolActualTxUpdate { tx_hash }) {
593                        error!("{e}");
594                    }
595                }
596                "block" => match client_clone.send_raw_transaction(tx.inner.encoded_2718().as_slice()).await {
597                    Ok(p) => {
598                        debug!("Transaction sent {}", p.tx_hash());
599                    }
600                    Err(e) => {
601                        error!("Error sending transaction : {e}");
602                    }
603                },
604                _ => {
605                    debug!("Incorrect action {} for : hash {} from {} to {}  ", tx_config.send, tx.tx_hash(), from, to);
606                }
607            }
608        }
609    });
610
611    println!("Test '{}' is started!", args.config);
612
613    let mut tx_compose_sub = swap_compose_channel.subscribe();
614
615    let mut stat = Stat::default();
616    let timeout_duration = Duration::from_secs(args.timeout);
617
618    loop {
619        tokio::select! {
620            msg = tx_compose_sub.recv() => {
621                match msg {
622                    Ok(msg) => match msg.inner {
623                        SwapComposeMessage::Ready(ready_message) => {
624                            debug!(swap=%ready_message.swap, "Ready message");
625                            stat.sign_counter += 1;
626
627                            if stat.best_profit_eth < ready_message.swap.arb_profit_eth() {
628                                stat.best_profit_eth = ready_message.swap.arb_profit_eth();
629                                stat.best_swap = Some(ready_message.swap.clone());
630                            }
631
632                            if let Some(swaps_ok) = test_config.assertions.swaps_ok {
633                                if stat.sign_counter >= swaps_ok  {
634                                    break;
635                                }
636                            }
637                        }
638                        SwapComposeMessage::Prepare(encode_message) => {
639                            debug!(swap=%encode_message.swap, "Prepare message");
640                            stat.found_counter += 1;
641                        }
642                        _ => {}
643                    },
644                    Err(error) => {
645                        error!(%error, "tx_compose_sub.recv")
646                    }
647                }
648            }
649            msg = tokio::time::sleep(timeout_duration) => {
650                debug!(?msg, "Timed out");
651                break;
652            }
653        }
654    }
655    if test_config.modules.flashbots {
656        // wait for flashbots mock server to receive all requests
657        tokio::time::sleep(Duration::from_secs(2)).await;
658        if let Some(last_requests) = mock_server.unwrap().received_requests().await {
659            if last_requests.is_empty() {
660                println!("Mock server did not received any request!")
661            } else {
662                println!("Received {} flashbots requests", last_requests.len());
663                for request in last_requests {
664                    let bundle_request: BundleRequest = serde_json::from_slice(&request.body)?;
665                    println!(
666                        "bundle_count={}, target_blocks={:?}, txs_in_bundles={:?}",
667                        bundle_request.params.len(),
668                        bundle_request.params.iter().map(|b| b.target_block).collect::<Vec<_>>(),
669                        bundle_request.params.iter().map(|b| b.transactions.len()).collect::<Vec<_>>()
670                    );
671                    // print all transactions
672                    for bundle in bundle_request.params {
673                        println!("Bundle with {} transactions", bundle.transactions.len());
674                    }
675                }
676            }
677        } else {
678            println!("Mock server did not received any request!")
679        }
680    }
681
682    println!("\n\n-------------------\nStat : {stat}\n-------------------\n");
683
684    if let Some(swaps_encoded) = test_config.assertions.swaps_encoded {
685        if swaps_encoded > stat.found_counter {
686            println!("Test failed. Not enough encoded swaps : {} need {}", stat.found_counter, swaps_encoded);
687            exit(1)
688        } else {
689            println!("Test passed. Encoded swaps : {} required {}", stat.found_counter, swaps_encoded);
690        }
691    }
692    if let Some(swaps_ok) = test_config.assertions.swaps_ok {
693        if swaps_ok > stat.sign_counter {
694            println!("Test failed. Not enough verified swaps : {} need {}", stat.sign_counter, swaps_ok);
695            exit(1)
696        } else {
697            println!("Test passed. swaps : {} required {}", stat.sign_counter, swaps_ok);
698        }
699    }
700    if let Some(best_profit) = test_config.assertions.best_profit_eth {
701        if NWETH::from_float(best_profit) > stat.best_profit_eth {
702            println!("Profit is too small {} need {}", NWETH::to_float(stat.best_profit_eth), best_profit);
703            exit(1)
704        } else {
705            println!("Test passed. best profit : {} > {}", NWETH::to_float(stat.best_profit_eth), best_profit);
706        }
707    }
708
709    Ok(())
710}