1use std::env;
2use std::fmt::{Display, Formatter};
3use std::process::exit;
4use std::sync::Arc;
5use std::time::Duration;
6
7use alloy_rpc_types_eth::TransactionTrait;
8
9use alloy_provider::network::TransactionResponse;
10
11use crate::flashbots_mock::mount_flashbots_mock;
12use crate::flashbots_mock::BundleRequest;
13use crate::test_config::TestConfig;
14use alloy_primitives::{address, TxHash, U256};
15use alloy_provider::network::eip2718::Encodable2718;
16use alloy_provider::Provider;
17use alloy_rpc_types::{BlockId, BlockNumberOrTag};
18use clap::Parser;
19use kabu::node::debug_provider::AnvilDebugProviderFactory;
20
21use eyre::{OptionExt, Result};
22use influxdb::WriteQuery;
23use kabu::broadcast::accounts::{InitializeSignersOneShotBlockingActor, NonceAndBalanceMonitorActor, TxSignersActor};
24use kabu::broadcast::broadcaster::{AnvilBroadcastActor, FlashbotsBroadcastActor};
25use kabu::broadcast::flashbots::client::RelayConfig;
26use kabu::broadcast::flashbots::Flashbots;
27use kabu::core::actors::{Accessor, Actor, Broadcaster, Consumer, Producer, SharedState};
28use kabu::core::block_history::BlockHistoryActor;
29use kabu::core::router::SwapRouterActor;
30use kabu::defi::address_book::TokenAddressEth;
31use kabu::defi::health_monitor::StuffingTxMonitorActor;
32use kabu::defi::market::{fetch_and_add_pool_by_pool_id, fetch_state_and_add_pool};
33use kabu::defi::pools::protocols::CurveProtocol;
34use kabu::defi::pools::{CurvePool, PoolLoadersBuilder, PoolsLoadingConfig};
35use kabu::defi::preloader::MarketStatePreloadedOneShotActor;
36use kabu::defi::price::PriceActor;
37use kabu::evm::db::KabuDBType;
38use kabu::evm::utils::NWETH;
39use kabu::execution::estimator::EvmEstimatorActor;
40use kabu::execution::multicaller::{MulticallerDeployer, MulticallerSwapEncoder};
41use kabu::node::actor_config::NodeBlockActorConfig;
42use kabu::node::json_rpc::NodeBlockActor;
43use kabu::strategy::backrun::{BackrunConfig, StateChangeArbActor};
44use kabu::strategy::merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
45use kabu::types::blockchain::{debug_trace_block, ChainParameters, KabuDataTypesEthereum, Mempool};
46use kabu::types::entities::{
47 AccountNonceAndBalanceState, BlockHistory, LatestBlock, Market, MarketState, PoolClass, PoolId, Swap, Token, TxSigners,
48};
49use kabu::types::events::{
50 MarketEvents, MempoolEvents, MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageHealthEvent,
51 MessageSwapCompose, MessageTxCompose, SwapComposeMessage,
52};
53use tracing::{debug, error, info};
54use tracing_subscriber::layer::SubscriberExt;
55use tracing_subscriber::util::SubscriberInitExt;
56use tracing_subscriber::{fmt, EnvFilter, Layer};
57use wiremock::MockServer;
58
59mod flashbots_mock;
60mod test_config;
61
62#[derive(Clone, Default, Debug)]
63struct Stat {
64 found_counter: usize,
65 sign_counter: usize,
66 best_profit_eth: U256,
67 best_swap: Option<Swap>,
68}
69
70impl Display for Stat {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 match &self.best_swap {
73 Some(swap) => match swap.get_first_token() {
74 Some(token) => {
75 write!(
76 f,
77 "Found: {} Ok: {} Profit : {} / ProfitEth : {} Path : {} ",
78 self.found_counter,
79 self.sign_counter,
80 token.to_float(swap.arb_profit()),
81 NWETH::to_float(swap.arb_profit_eth()),
82 swap
83 )
84 }
85 None => {
86 write!(
87 f,
88 "Found: {} Ok: {} Profit : {} / ProfitEth : {} Path : {} ",
89 self.found_counter,
90 self.sign_counter,
91 swap.arb_profit(),
92 swap.arb_profit_eth(),
93 swap
94 )
95 }
96 },
97 _ => {
98 write!(f, "NO BEST SWAP")
99 }
100 }
101 }
102}
103
104#[allow(dead_code)]
105fn parse_tx_hashes(tx_hash_vec: Vec<&str>) -> Result<Vec<TxHash>> {
106 let mut ret: Vec<TxHash> = Vec::new();
107 for tx_hash in tx_hash_vec {
108 ret.push(tx_hash.parse()?);
109 }
110 Ok(ret)
111}
112
113#[derive(Parser, Debug)]
114struct Commands {
115 #[arg(short, long)]
116 config: String,
117
118 #[arg(short, long, default_value = "10")]
120 timeout: u64,
121
122 #[arg(short, long, default_value = "1")]
124 wait_init: u64,
125}
126
127#[tokio::main]
128async fn main() -> Result<()> {
129 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "debug,alloy_rpc_client=off,kabu_multicaller=trace".into());
130 let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
131
132 tracing_subscriber::registry().with(fmt_layer).init();
133
134 let args = Commands::parse();
135 let test_config = TestConfig::from_file(args.config.clone()).await?;
136 let node_url = env::var("MAINNET_WS")?;
137 let client = AnvilDebugProviderFactory::from_node_on_block(node_url, test_config.settings.block).await?;
138 let priv_key = client.privkey()?.to_bytes().to_vec();
139
140 let mut mock_server: Option<MockServer> = None;
141 if test_config.modules.flashbots {
142 mock_server = Some(MockServer::start().await);
144 mount_flashbots_mock(mock_server.as_ref().unwrap()).await;
145 }
146
147 let multicaller_address = MulticallerDeployer::new()
149 .set_code(client.clone(), address!("FCfCfcfC0AC30164AFdaB927F441F2401161F358"))
150 .await?
151 .address()
152 .ok_or_eyre("MULTICALLER_NOT_DEPLOYED")?;
153 info!("Multicaller deployed at {:?}", multicaller_address);
154
155 let multicaller_encoder = MulticallerSwapEncoder::default_with_address(multicaller_address);
156
157 let block_number = client.get_block_number().await?;
158 info!("Current block_number={}", block_number);
159
160 let block_header = client.get_block(block_number.into()).await?.unwrap().header;
161 info!("Current block_header={:?}", block_header);
162
163 let block_header_with_txes = client.get_block(block_number.into()).await?.unwrap();
164
165 let cache_db = KabuDBType::default();
166 let mut market_instance = Market::default();
167 let market_state_instance = MarketState::new(cache_db.clone());
168
169 let usdc_token = Token::new_with_data(TokenAddressEth::USDC, Some("USDC".to_string()), None, Some(6), true, false);
171 let usdt_token = Token::new_with_data(TokenAddressEth::USDT, Some("USDT".to_string()), None, Some(6), true, false);
172 let wbtc_token = Token::new_with_data(TokenAddressEth::WBTC, Some("WBTC".to_string()), None, Some(8), true, false);
173 let dai_token = Token::new_with_data(TokenAddressEth::DAI, Some("DAI".to_string()), None, Some(18), true, false);
174 market_instance.add_token(usdc_token);
175 market_instance.add_token(usdt_token);
176 market_instance.add_token(wbtc_token);
177 market_instance.add_token(dai_token);
178
179 let mempool_instance = Mempool::<KabuDataTypesEthereum>::new();
180
181 info!("Creating channels");
182 let new_block_headers_channel: Broadcaster<MessageBlockHeader> = Broadcaster::new(10);
183 let new_block_with_tx_channel: Broadcaster<MessageBlock> = Broadcaster::new(10);
184 let new_block_state_update_channel: Broadcaster<MessageBlockStateUpdate> = Broadcaster::new(10);
185 let new_block_logs_channel: Broadcaster<MessageBlockLogs> = Broadcaster::new(10);
186
187 let market_events_channel: Broadcaster<MarketEvents> = Broadcaster::new(100);
188 let mempool_events_channel: Broadcaster<MempoolEvents> = Broadcaster::new(500);
189 let pool_health_monitor_channel: Broadcaster<MessageHealthEvent> = Broadcaster::new(100);
190
191 let influx_channel: Broadcaster<WriteQuery> = Broadcaster::new(100);
192
193 let market_instance = SharedState::new(market_instance);
194 let market_state = SharedState::new(market_state_instance);
195 let mempool_instance = SharedState::new(mempool_instance);
196 let block_history_state = SharedState::new(BlockHistory::new(10));
197
198 let tx_signers = TxSigners::new();
199 let accounts_state = AccountNonceAndBalanceState::new();
200
201 let tx_signers = SharedState::new(tx_signers);
202 let accounts_state = SharedState::new(accounts_state);
203
204 let latest_block = SharedState::new(LatestBlock::new(block_number, block_header.hash));
205
206 let (_, post) = debug_trace_block(client.clone(), BlockId::Number(BlockNumberOrTag::Number(block_number)), true).await?;
207 latest_block.write().await.update(
208 block_number,
209 block_header.hash,
210 Some(block_header.clone()),
211 Some(block_header_with_txes),
212 None,
213 Some(post),
214 );
215
216 info!("Starting initialize signers actor");
217
218 let mut initialize_signers_actor = InitializeSignersOneShotBlockingActor::new(Some(priv_key));
219 match initialize_signers_actor.access(tx_signers.clone()).access(accounts_state.clone()).start_and_wait() {
220 Err(e) => {
221 error!("{}", e);
222 panic!("Cannot initialize signers");
223 }
224 _ => info!("Signers have been initialized"),
225 }
226
227 for (token_name, token_config) in test_config.tokens {
228 let symbol = token_config.symbol.unwrap_or(token_config.address.to_checksum(None));
229 let name = token_config.name.unwrap_or(symbol.clone());
230 let token = Token::new_with_data(
231 token_config.address,
232 Some(symbol),
233 Some(name),
234 Some(token_config.decimals.map_or(18, |x| x)),
235 token_config.basic.unwrap_or_default(),
236 token_config.middle.unwrap_or_default(),
237 );
238 if let Some(price_float) = token_config.price {
239 let price_u256 = NWETH::from_float(price_float) * token.get_exp() / NWETH::get_exp();
240 debug!("Setting price : {} -> {} ({})", token_name, price_u256, price_u256.to::<u128>());
241
242 token.set_eth_price(Some(price_u256));
243 };
244
245 market_instance.write().await.add_token(token);
246 }
247
248 info!("Starting market state preload actor");
249 let mut market_state_preload_actor = MarketStatePreloadedOneShotActor::new(client.clone())
250 .with_copied_account(multicaller_encoder.get_contract_address())
251 .with_signers(tx_signers.clone());
252 match market_state_preload_actor.access(market_state.clone()).start_and_wait() {
253 Err(e) => {
254 error!("{}", e)
255 }
256 _ => {
257 info!("Market state preload actor started successfully")
258 }
259 }
260
261 info!("Starting node actor");
262 let mut node_block_actor = NodeBlockActor::new(client.clone(), NodeBlockActorConfig::all_enabled());
263 match node_block_actor
264 .produce(new_block_headers_channel.clone())
265 .produce(new_block_with_tx_channel.clone())
266 .produce(new_block_logs_channel.clone())
267 .produce(new_block_state_update_channel.clone())
268 .start()
269 {
270 Err(e) => {
271 error!("{}", e)
272 }
273 _ => {
274 info!("Node actor started successfully")
275 }
276 }
277
278 info!("Starting nonce and balance monitor actor");
279 let mut nonce_and_balance_monitor = NonceAndBalanceMonitorActor::new(client.clone());
280 match nonce_and_balance_monitor
281 .access(accounts_state.clone())
282 .access(latest_block.clone())
283 .consume(market_events_channel.clone())
284 .start()
285 {
286 Err(e) => {
287 error!("{}", e);
288 panic!("Cannot initialize nonce and balance monitor");
289 }
290 _ => info!("Nonce monitor has been initialized"),
291 }
292
293 info!("Starting price actor");
294 let mut price_actor = PriceActor::new(client.clone()).only_once();
295 match price_actor.access(market_instance.clone()).start_and_wait() {
296 Err(e) => {
297 error!("{}", e);
298 panic!("Cannot initialize price actor");
299 }
300 _ => info!("Price actor has been initialized"),
301 }
302
303 let pool_loaders =
304 Arc::new(PoolLoadersBuilder::<_, _, KabuDataTypesEthereum>::default_pool_loaders(client.clone(), PoolsLoadingConfig::default()));
305
306 for (pool_name, pool_config) in test_config.pools {
307 match pool_config.class {
308 PoolClass::UniswapV2 | PoolClass::UniswapV3 => {
309 debug!(address=%pool_config.address, class=%pool_config.class, "Loading pool");
310 fetch_and_add_pool_by_pool_id(
311 client.clone(),
312 market_instance.clone(),
313 market_state.clone(),
314 pool_loaders.clone(),
315 PoolId::Address(pool_config.address),
316 pool_config.class,
317 )
318 .await?;
319 debug!(address=%pool_config.address, class=%pool_config.class, "Loaded pool");
320 }
321 PoolClass::Curve => {
322 debug!("Loading curve pool");
323 if let Ok(curve_contract) = CurveProtocol::get_contract_from_code(client.clone(), pool_config.address).await {
324 let curve_pool = CurvePool::fetch_pool_data_with_default_encoder(client.clone(), curve_contract).await?;
325 fetch_state_and_add_pool::<_, _, _, KabuDataTypesEthereum>(
326 client.clone(),
327 market_instance.clone(),
328 market_state.clone(),
329 curve_pool.into(),
330 )
331 .await?;
332 } else {
333 error!("CURVE_POOL_NOT_LOADED");
334 }
335 debug!("Loaded curve pool");
336 }
337 _ => {
338 error!("Unknown pool class")
339 }
340 }
341 let swap_path_len = market_instance.read().await.get_pool_paths(&PoolId::Address(pool_config.address)).unwrap_or_default().len();
342 info!(
343 "Loaded pool '{}' with address={}, pool_class={}, swap_paths={}",
344 pool_name, pool_config.address, pool_config.class, swap_path_len
345 );
346 }
347
348 info!("Starting block history actor");
349 let mut block_history_actor = BlockHistoryActor::new(client.clone());
350 match block_history_actor
351 .access(latest_block.clone())
352 .access(market_state.clone())
353 .access(block_history_state.clone())
354 .consume(new_block_headers_channel.clone())
355 .consume(new_block_with_tx_channel.clone())
356 .consume(new_block_logs_channel.clone())
357 .consume(new_block_state_update_channel.clone())
358 .produce(market_events_channel.clone())
359 .start()
360 {
361 Err(e) => {
362 error!("{}", e)
363 }
364 _ => {
365 info!("Block history actor started successfully")
366 }
367 }
368
369 let swap_compose_channel: Broadcaster<MessageSwapCompose<KabuDBType>> = Broadcaster::new(100);
370 let tx_compose_channel: Broadcaster<MessageTxCompose> = Broadcaster::new(100);
371
372 let mut broadcast_actor = AnvilBroadcastActor::new(client.clone());
373 match broadcast_actor.consume(tx_compose_channel.clone()).start() {
374 Err(e) => error!("{}", e),
375 _ => {
376 info!("Broadcast actor started successfully")
377 }
378 }
379
380 let mut estimator_actor = EvmEstimatorActor::new_with_provider(multicaller_encoder.clone(), Some(client.clone()));
382 match estimator_actor.consume(swap_compose_channel.clone()).produce(swap_compose_channel.clone()).start() {
383 Err(e) => error!("{e}"),
384 _ => {
385 info!("Estimate actor started successfully")
386 }
387 }
388
389 let mut health_monitor_actor = StuffingTxMonitorActor::new(client.clone());
390 match health_monitor_actor
391 .access(latest_block.clone())
392 .consume(market_events_channel.clone())
393 .consume(tx_compose_channel.clone())
394 .produce(influx_channel.clone())
395 .start()
396 {
397 Ok(_) => {
398 info!("Stuffing tx monitor actor started")
400 }
401 Err(e) => {
402 panic!("StuffingTxMonitorActor error {e}")
403 }
404 }
405
406 if test_config.modules.encoder {
408 info!("Starting swap router actor");
409
410 let mut swap_router_actor = SwapRouterActor::new();
411
412 match swap_router_actor
413 .access(tx_signers.clone())
414 .access(accounts_state.clone())
415 .consume(swap_compose_channel.clone())
416 .produce(swap_compose_channel.clone())
417 .produce(tx_compose_channel.clone())
418 .start()
419 {
420 Err(e) => {
421 error!("{}", e)
422 }
423 _ => {
424 info!("Swap router actor started successfully")
425 }
426 }
427 }
428
429 if test_config.modules.signer {
431 info!("Starting signers actor");
432 let mut signers_actor = TxSignersActor::new();
433 match signers_actor.consume(tx_compose_channel.clone()).produce(tx_compose_channel.clone()).start() {
434 Err(e) => {
435 error!("{}", e);
436 panic!("Cannot start signers");
437 }
438 _ => info!("Signers actor started"),
439 }
440 }
441
442 if test_config.modules.arb_block || test_config.modules.arb_mempool {
444 info!("Starting state change arb actor");
445 let mut state_change_arb_actor = StateChangeArbActor::new(
446 client.clone(),
447 test_config.modules.arb_block,
448 test_config.modules.arb_mempool,
449 BackrunConfig::new_dumb(),
450 );
451 match state_change_arb_actor
452 .access(mempool_instance.clone())
453 .access(latest_block.clone())
454 .access(market_instance.clone())
455 .access(market_state.clone())
456 .access(block_history_state.clone())
457 .consume(market_events_channel.clone())
458 .consume(mempool_events_channel.clone())
459 .produce(swap_compose_channel.clone())
460 .produce(pool_health_monitor_channel.clone())
461 .produce(influx_channel.clone())
462 .start()
463 {
464 Err(e) => {
465 error!("{}", e)
466 }
467 _ => {
468 info!("State change arb actor started successfully")
469 }
470 }
471 }
472
473 if test_config.modules.arb_path_merger {
475 info!("Starting swap path merger actor");
476
477 let mut swap_path_merger_actor = ArbSwapPathMergerActor::new(multicaller_address);
478 match swap_path_merger_actor
479 .access(latest_block.clone())
480 .consume(swap_compose_channel.clone())
481 .consume(market_events_channel.clone())
482 .produce(swap_compose_channel.clone())
483 .start()
484 {
485 Err(e) => {
486 error!("{}", e)
487 }
488 _ => {
489 info!("Swap path merger actor started successfully")
490 }
491 }
492 }
493
494 if test_config.modules.same_path_merger {
496 let mut same_path_merger_actor = SamePathMergerActor::new(client.clone());
497 match same_path_merger_actor
498 .access(market_state.clone())
499 .access(latest_block.clone())
500 .consume(swap_compose_channel.clone())
501 .consume(market_events_channel.clone())
502 .produce(swap_compose_channel.clone())
503 .start()
504 {
505 Err(e) => {
506 error!("{}", e)
507 }
508 _ => {
509 info!("Same path merger actor started successfully")
510 }
511 }
512 }
513 if test_config.modules.flashbots {
514 let relays = vec![RelayConfig { id: 1, url: mock_server.as_ref().unwrap().uri(), name: "relay".to_string(), no_sign: Some(false) }];
515 let flashbots = Flashbots::new(client.clone(), "https://unused", None).with_relays(relays);
516 let mut flashbots_broadcast_actor = FlashbotsBroadcastActor::new(flashbots, true);
517 match flashbots_broadcast_actor.consume(tx_compose_channel.clone()).start() {
518 Err(e) => {
519 error!("{}", e)
520 }
521 _ => {
522 info!("Flashbots broadcast actor started successfully")
523 }
524 }
525 }
526
527 let mut diff_path_merger_actor = DiffPathMergerActor::new();
529 match diff_path_merger_actor
530 .consume(swap_compose_channel.clone())
531 .consume(market_events_channel.clone())
532 .produce(swap_compose_channel.clone())
533 .start()
534 {
535 Err(e) => {
536 error!("{}", e)
537 }
538 _ => {
539 info!("Diff path merger actor started successfully")
540 }
541 }
542
543 tokio::time::sleep(Duration::from_secs(args.wait_init)).await;
546
547 let next_block_base_fee = ChainParameters::ethereum().calc_next_block_base_fee(
548 block_header.gas_used,
549 block_header.gas_limit,
550 block_header.base_fee_per_gas.unwrap_or_default(),
551 );
552
553 let market_events_channel_clone = market_events_channel.clone();
554
555 if let Err(e) = market_events_channel_clone.send(MarketEvents::BlockHeaderUpdate {
557 block_number: block_header.number,
558 block_hash: block_header.hash,
559 timestamp: block_header.timestamp,
560 base_fee: block_header.base_fee_per_gas.unwrap_or_default(),
561 next_base_fee: next_block_base_fee,
562 }) {
563 error!("{}", e);
564 }
565
566 if let Err(e) = market_events_channel_clone.send(MarketEvents::BlockStateUpdate { block_hash: block_header.hash }) {
568 error!("{}", e);
569 }
570
571 let client_clone = client.clone();
574 tokio::spawn(async move {
575 info!("Re-broadcaster task started");
576
577 for (_, tx_config) in test_config.txs.iter() {
578 debug!("Fetching original tx {}", tx_config.hash);
579 let Some(tx) = client_clone.get_transaction_by_hash(tx_config.hash).await.unwrap() else {
580 panic!("Cannot get tx: {}", tx_config.hash);
581 };
582
583 let from = tx.from();
584 let to = tx.to().unwrap_or_default();
585
586 match tx_config.send.to_lowercase().as_str() {
587 "mempool" => {
588 let mut mempool_guard = mempool_instance.write().await;
589 let tx_hash: TxHash = tx.tx_hash();
590
591 mempool_guard.add_tx(tx.clone());
592 if let Err(e) = mempool_events_channel.send(MempoolEvents::MempoolActualTxUpdate { tx_hash }) {
593 error!("{e}");
594 }
595 }
596 "block" => match client_clone.send_raw_transaction(tx.inner.encoded_2718().as_slice()).await {
597 Ok(p) => {
598 debug!("Transaction sent {}", p.tx_hash());
599 }
600 Err(e) => {
601 error!("Error sending transaction : {e}");
602 }
603 },
604 _ => {
605 debug!("Incorrect action {} for : hash {} from {} to {} ", tx_config.send, tx.tx_hash(), from, to);
606 }
607 }
608 }
609 });
610
611 println!("Test '{}' is started!", args.config);
612
613 let mut tx_compose_sub = swap_compose_channel.subscribe();
614
615 let mut stat = Stat::default();
616 let timeout_duration = Duration::from_secs(args.timeout);
617
618 loop {
619 tokio::select! {
620 msg = tx_compose_sub.recv() => {
621 match msg {
622 Ok(msg) => match msg.inner {
623 SwapComposeMessage::Ready(ready_message) => {
624 debug!(swap=%ready_message.swap, "Ready message");
625 stat.sign_counter += 1;
626
627 if stat.best_profit_eth < ready_message.swap.arb_profit_eth() {
628 stat.best_profit_eth = ready_message.swap.arb_profit_eth();
629 stat.best_swap = Some(ready_message.swap.clone());
630 }
631
632 if let Some(swaps_ok) = test_config.assertions.swaps_ok {
633 if stat.sign_counter >= swaps_ok {
634 break;
635 }
636 }
637 }
638 SwapComposeMessage::Prepare(encode_message) => {
639 debug!(swap=%encode_message.swap, "Prepare message");
640 stat.found_counter += 1;
641 }
642 _ => {}
643 },
644 Err(error) => {
645 error!(%error, "tx_compose_sub.recv")
646 }
647 }
648 }
649 msg = tokio::time::sleep(timeout_duration) => {
650 debug!(?msg, "Timed out");
651 break;
652 }
653 }
654 }
655 if test_config.modules.flashbots {
656 tokio::time::sleep(Duration::from_secs(2)).await;
658 if let Some(last_requests) = mock_server.unwrap().received_requests().await {
659 if last_requests.is_empty() {
660 println!("Mock server did not received any request!")
661 } else {
662 println!("Received {} flashbots requests", last_requests.len());
663 for request in last_requests {
664 let bundle_request: BundleRequest = serde_json::from_slice(&request.body)?;
665 println!(
666 "bundle_count={}, target_blocks={:?}, txs_in_bundles={:?}",
667 bundle_request.params.len(),
668 bundle_request.params.iter().map(|b| b.target_block).collect::<Vec<_>>(),
669 bundle_request.params.iter().map(|b| b.transactions.len()).collect::<Vec<_>>()
670 );
671 for bundle in bundle_request.params {
673 println!("Bundle with {} transactions", bundle.transactions.len());
674 }
675 }
676 }
677 } else {
678 println!("Mock server did not received any request!")
679 }
680 }
681
682 println!("\n\n-------------------\nStat : {stat}\n-------------------\n");
683
684 if let Some(swaps_encoded) = test_config.assertions.swaps_encoded {
685 if swaps_encoded > stat.found_counter {
686 println!("Test failed. Not enough encoded swaps : {} need {}", stat.found_counter, swaps_encoded);
687 exit(1)
688 } else {
689 println!("Test passed. Encoded swaps : {} required {}", stat.found_counter, swaps_encoded);
690 }
691 }
692 if let Some(swaps_ok) = test_config.assertions.swaps_ok {
693 if swaps_ok > stat.sign_counter {
694 println!("Test failed. Not enough verified swaps : {} need {}", stat.sign_counter, swaps_ok);
695 exit(1)
696 } else {
697 println!("Test passed. swaps : {} required {}", stat.sign_counter, swaps_ok);
698 }
699 }
700 if let Some(best_profit) = test_config.assertions.best_profit_eth {
701 if NWETH::from_float(best_profit) > stat.best_profit_eth {
702 println!("Profit is too small {} need {}", NWETH::to_float(stat.best_profit_eth), best_profit);
703 exit(1)
704 } else {
705 println!("Test passed. best profit : {} > {}", NWETH::to_float(stat.best_profit_eth), best_profit);
706 }
707 }
708
709 Ok(())
710}