replayer/
main.rs

1use kabu_evm_db::KabuDB;
2use std::env;
3use std::process::exit;
4use std::time::Duration;
5
6use alloy::network::TransactionBuilder;
7use alloy::primitives::{address, Address, U256};
8use alloy::providers::Provider;
9use alloy::rpc::types::Header;
10use alloy::{providers::ProviderBuilder, rpc::client::ClientBuilder};
11use clap::Parser;
12use eyre::Result;
13use tokio::select;
14use url::Url;
15
16use kabu_node_debug_provider::HttpCachedTransport;
17
18use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
19use kabu_core_blockchain_actors::BlockchainActors;
20use kabu_defi_abi::AbiEncoderHelper;
21use kabu_defi_address_book::{TokenAddressEth, UniswapV3PoolAddress};
22use kabu_defi_pools::state_readers::ERC20StateReader;
23use kabu_evm_db::DatabaseKabuExt;
24use kabu_evm_utils::NWETH;
25use kabu_execution_multicaller::MulticallerSwapEncoder;
26use kabu_node_player::NodeBlockPlayerActor;
27use kabu_types_entities::required_state::RequiredState;
28use kabu_types_entities::{MarketState, PoolClass, PoolId, Swap, SwapAmountType, SwapLine};
29use kabu_types_events::{MessageSwapCompose, SwapComposeData, TxComposeData};
30use tracing::{debug, error, info};
31use tracing_subscriber::layer::SubscriberExt;
32use tracing_subscriber::util::SubscriberInitExt;
33use tracing_subscriber::{fmt, EnvFilter, Layer};
34
35#[derive(Parser, Debug)]
36struct Commands {
37    /// Run replayer for the given block number count
38    #[arg(short, long)]
39    terminate_after_block_count: Option<u64>,
40}
41
42#[tokio::main]
43async fn main() -> Result<()> {
44    let start_block_number = 20179184;
45
46    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
47        "debug,alloy_rpc_client=off,kabu_node_debug_provider=info,alloy_transport_http=off,hyper_util=off,kabu_core_block_history=trace"
48            .into()
49    });
50    let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
51
52    tracing_subscriber::registry().with(fmt_layer).init();
53
54    let args = Commands::parse();
55    let node_url = env::var("MAINNET_HTTP")?;
56    let node_url = Url::parse(node_url.as_str())?;
57
58    let transport = HttpCachedTransport::new(node_url.clone(), Some("./.cache")).await;
59    transport.set_block_number(start_block_number);
60
61    let client = ClientBuilder::default().transport(transport.clone(), true).with_poll_interval(Duration::from_millis(50));
62    let provider = ProviderBuilder::new().disable_recommended_fillers().connect_client(client);
63
64    let node_provider = ProviderBuilder::new().disable_recommended_fillers().connect_http(node_url);
65
66    // creating singers
67    //let tx_signers = SharedState::new(TxSigners::new());
68
69    // new blockchain
70    let bc = Blockchain::new(1);
71
72    let bc_state = BlockchainState::new_with_market_state(MarketState::new(KabuDB::empty()));
73
74    let market_state = bc_state.market_state();
75
76    let strategy = Strategy::<KabuDB>::new();
77
78    let swap_encoder = MulticallerSwapEncoder::default();
79
80    const TARGET_ADDRESS: Address = address!("A69babEF1cA67A37Ffaf7a485DfFF3382056e78C");
81
82    let mut required_state = RequiredState::new();
83    required_state.add_call(TokenAddressEth::WETH, AbiEncoderHelper::encode_erc20_balance_of(TARGET_ADDRESS));
84
85    // instead fo code above
86    let mut bc_actors =
87        BlockchainActors::new(provider.clone(), swap_encoder.clone(), bc.clone(), bc_state.clone(), strategy.clone(), vec![]);
88    bc_actors
89        .with_nonce_and_balance_monitor_only_once()?
90        .with_signers()?
91        .with_market_state_preloader_virtual(vec![])?
92        .with_preloaded_state(vec![(UniswapV3PoolAddress::USDC_WETH_500, PoolClass::UniswapV3)], Some(required_state))?
93        .with_block_history()?
94        .with_swap_encoder(swap_encoder)?
95        .with_evm_estimator()?;
96
97    //Start node block player actor
98    if let Err(e) =
99        bc_actors.start(NodeBlockPlayerActor::new(provider.clone(), start_block_number, start_block_number + 200).on_bc(&bc, &bc_state))
100    {
101        panic!("Cannot start block player : {e}");
102    }
103
104    tokio::task::spawn(bc_actors.wait());
105    let compose_channel = strategy.swap_compose_channel();
106
107    let mut header_sub = bc.new_block_headers_channel().subscribe();
108    let mut block_sub = bc.new_block_with_tx_channel().subscribe();
109    let mut logs_sub = bc.new_block_logs_channel().subscribe();
110    let mut state_update_sub = bc.new_block_state_update_channel().subscribe();
111
112    //let memepool = bc.mempool();
113    let market = bc.market();
114
115    let mut cur_header: Header = Header::default();
116
117    loop {
118        select! {
119            header = header_sub.recv() => {
120                match header {
121                    Ok(message_header)=>{
122                        let header = message_header.inner.header;
123                        info!("Block header received: block_number={}, block_hash={}", header.number, header.hash);
124
125                        if let Some(terminate_after_block_count) = args.terminate_after_block_count {
126                            println!("Replay current_block={}/{}", header.number, start_block_number + terminate_after_block_count);
127                            if header.number >= start_block_number + terminate_after_block_count {
128                                println!("Successful for start_block_number={}, current_block={}, terminate_after_block_count={}", start_block_number, header.number, terminate_after_block_count);
129                                exit(0);
130                            }
131                        }
132
133                        cur_header = header.clone();
134                        if header.number % 10 == 0 {
135                            info!("Composing swap: block_number={}, block_hash={}", header.number, header.hash);
136
137                            let swap_path = market.read().await.swap_path(vec![TokenAddressEth::WETH, TokenAddressEth::USDC], vec![PoolId::Address(UniswapV3PoolAddress::USDC_WETH_500)])?;
138                            let mut swap_line = SwapLine::from(swap_path);
139                            swap_line.amount_in = SwapAmountType::Set( NWETH::from_float(0.1));
140                            swap_line.gas_used = Some(300000);
141
142                            let tx_compose_encode_msg = MessageSwapCompose::prepare(
143                                SwapComposeData{
144                                    tx_compose : TxComposeData {
145                                        next_block_base_fee : bc.chain_parameters().calc_next_block_base_fee_from_header(&header),
146                                        ..TxComposeData::default()
147                                    },
148                                    poststate : Some(market_state.read().await.state_db.clone()),
149                                    swap : Swap::ExchangeSwapLine(swap_line),
150                                    ..SwapComposeData::default()
151                                });
152
153                            if let Err(e) = compose_channel.send(tx_compose_encode_msg) {
154                                error!("compose_channel.send : {}", e)
155                            }else{
156                                debug!("compose_channel.send ok");
157                            }
158                        }
159                    }
160                    Err(e)=>{
161                        error!("Error receiving headers: {e}");
162                    }
163                }
164            }
165
166            logs = logs_sub.recv() => {
167                match logs{
168                    Ok(logs_update)=>{
169                        info!("Block logs received : {} log records : {}", logs_update.block_header.hash, logs_update.logs.len());
170                    }
171                    Err(e)=>{
172                        error!("Error receiving logs: {e}");
173                    }
174                }
175            }
176
177            block = block_sub.recv() => {
178                match block {
179                    Ok(block_msg)=>{
180                        info!("Block with tx received : {} txs : {}", block_msg.block.header.hash, block_msg.block.transactions.len());
181                    }
182                    Err(e)=>{
183                        error!("Error receiving blocks: {e}");
184                    }
185                }
186            }
187            state_udpate = state_update_sub.recv() => {
188                match state_udpate {
189                    Ok(state_update)=>{
190                        let state_update = state_update.inner;
191
192                        info!("Block state update received : {} update records : {}", state_update.block_header.hash, state_update.state_update.len() );
193                        let mut state_db = market_state.read().await.state_db.clone();
194                        state_db.apply_geth_update_vec(state_update.state_update);
195
196
197                        // Note: ERC20StateReader calls will still fail until full EVM integration is complete,
198                        // but database access is now direct
199                        if let Ok(balance) = ERC20StateReader::balance_of(&state_db, TokenAddressEth::WETH, TARGET_ADDRESS ) {
200                            info!("------WETH Balance of {} : {}", TARGET_ADDRESS, balance);
201                            let tx_req = alloy::rpc::types::TransactionRequest::default()
202                                .to(TokenAddressEth::WETH)
203                                .with_input(AbiEncoderHelper::encode_erc20_balance_of(TARGET_ADDRESS));
204                            let fetched_balance = node_provider.call(tx_req).block(cur_header.number.into()).await?;
205
206                            let fetched_balance = U256::from_be_slice(fetched_balance.to_vec().as_slice());
207                            if fetched_balance != balance {
208                                error!("Balance is wrong {}/({:#x}) need {}({:#x})", balance, balance, fetched_balance, fetched_balance);
209                                exit(1);
210                            }
211                        } else {
212                            info!("ERC20StateReader call failed - EVM integration not complete yet");
213                        }
214                        if let Ok(balance) = ERC20StateReader::balance_of(&state_db, TokenAddressEth::WETH, UniswapV3PoolAddress::USDC_WETH_500 ) {
215                            info!("------WETH Balance of {} : {}/({:#x}) ", UniswapV3PoolAddress::USDC_WETH_500, balance, balance);
216                        } else {
217                            info!("ERC20StateReader call for pool balance failed - EVM integration not complete yet");
218                        }
219
220                        info!("StateDB : Accounts: {} / {} Contracts : {} / {}", state_db.accounts_len(), state_db.ro_accounts_len(), state_db.contracts_len(), state_db.ro_contracts_len())
221
222                    }
223                    Err(e)=>{
224                        error!("Error receiving blocks: {e}");
225                    }
226                }
227            }
228        }
229    }
230}