1use kabu_evm_db::KabuDB;
2use std::env;
3use std::process::exit;
4use std::time::Duration;
5
6use alloy::network::TransactionBuilder;
7use alloy::primitives::{address, Address, U256};
8use alloy::providers::Provider;
9use alloy::rpc::types::Header;
10use alloy::{providers::ProviderBuilder, rpc::client::ClientBuilder};
11use clap::Parser;
12use eyre::Result;
13use tokio::select;
14use url::Url;
15
16use kabu_node_debug_provider::HttpCachedTransport;
17
18use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
19use kabu_core_blockchain_actors::BlockchainActors;
20use kabu_defi_abi::AbiEncoderHelper;
21use kabu_defi_address_book::{TokenAddressEth, UniswapV3PoolAddress};
22use kabu_defi_pools::state_readers::ERC20StateReader;
23use kabu_evm_db::DatabaseKabuExt;
24use kabu_evm_utils::NWETH;
25use kabu_execution_multicaller::MulticallerSwapEncoder;
26use kabu_node_player::NodeBlockPlayerActor;
27use kabu_types_entities::required_state::RequiredState;
28use kabu_types_entities::{MarketState, PoolClass, PoolId, Swap, SwapAmountType, SwapLine};
29use kabu_types_events::{MessageSwapCompose, SwapComposeData, TxComposeData};
30use tracing::{debug, error, info};
31use tracing_subscriber::layer::SubscriberExt;
32use tracing_subscriber::util::SubscriberInitExt;
33use tracing_subscriber::{fmt, EnvFilter, Layer};
34
35#[derive(Parser, Debug)]
36struct Commands {
37 #[arg(short, long)]
39 terminate_after_block_count: Option<u64>,
40}
41
42#[tokio::main]
43async fn main() -> Result<()> {
44 let start_block_number = 20179184;
45
46 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
47 "debug,alloy_rpc_client=off,kabu_node_debug_provider=info,alloy_transport_http=off,hyper_util=off,kabu_core_block_history=trace"
48 .into()
49 });
50 let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
51
52 tracing_subscriber::registry().with(fmt_layer).init();
53
54 let args = Commands::parse();
55 let node_url = env::var("MAINNET_HTTP")?;
56 let node_url = Url::parse(node_url.as_str())?;
57
58 let transport = HttpCachedTransport::new(node_url.clone(), Some("./.cache")).await;
59 transport.set_block_number(start_block_number);
60
61 let client = ClientBuilder::default().transport(transport.clone(), true).with_poll_interval(Duration::from_millis(50));
62 let provider = ProviderBuilder::new().disable_recommended_fillers().connect_client(client);
63
64 let node_provider = ProviderBuilder::new().disable_recommended_fillers().connect_http(node_url);
65
66 let bc = Blockchain::new(1);
71
72 let bc_state = BlockchainState::new_with_market_state(MarketState::new(KabuDB::empty()));
73
74 let market_state = bc_state.market_state();
75
76 let strategy = Strategy::<KabuDB>::new();
77
78 let swap_encoder = MulticallerSwapEncoder::default();
79
80 const TARGET_ADDRESS: Address = address!("A69babEF1cA67A37Ffaf7a485DfFF3382056e78C");
81
82 let mut required_state = RequiredState::new();
83 required_state.add_call(TokenAddressEth::WETH, AbiEncoderHelper::encode_erc20_balance_of(TARGET_ADDRESS));
84
85 let mut bc_actors =
87 BlockchainActors::new(provider.clone(), swap_encoder.clone(), bc.clone(), bc_state.clone(), strategy.clone(), vec![]);
88 bc_actors
89 .with_nonce_and_balance_monitor_only_once()?
90 .with_signers()?
91 .with_market_state_preloader_virtual(vec![])?
92 .with_preloaded_state(vec![(UniswapV3PoolAddress::USDC_WETH_500, PoolClass::UniswapV3)], Some(required_state))?
93 .with_block_history()?
94 .with_swap_encoder(swap_encoder)?
95 .with_evm_estimator()?;
96
97 if let Err(e) =
99 bc_actors.start(NodeBlockPlayerActor::new(provider.clone(), start_block_number, start_block_number + 200).on_bc(&bc, &bc_state))
100 {
101 panic!("Cannot start block player : {e}");
102 }
103
104 tokio::task::spawn(bc_actors.wait());
105 let compose_channel = strategy.swap_compose_channel();
106
107 let mut header_sub = bc.new_block_headers_channel().subscribe();
108 let mut block_sub = bc.new_block_with_tx_channel().subscribe();
109 let mut logs_sub = bc.new_block_logs_channel().subscribe();
110 let mut state_update_sub = bc.new_block_state_update_channel().subscribe();
111
112 let market = bc.market();
114
115 let mut cur_header: Header = Header::default();
116
117 loop {
118 select! {
119 header = header_sub.recv() => {
120 match header {
121 Ok(message_header)=>{
122 let header = message_header.inner.header;
123 info!("Block header received: block_number={}, block_hash={}", header.number, header.hash);
124
125 if let Some(terminate_after_block_count) = args.terminate_after_block_count {
126 println!("Replay current_block={}/{}", header.number, start_block_number + terminate_after_block_count);
127 if header.number >= start_block_number + terminate_after_block_count {
128 println!("Successful for start_block_number={}, current_block={}, terminate_after_block_count={}", start_block_number, header.number, terminate_after_block_count);
129 exit(0);
130 }
131 }
132
133 cur_header = header.clone();
134 if header.number % 10 == 0 {
135 info!("Composing swap: block_number={}, block_hash={}", header.number, header.hash);
136
137 let swap_path = market.read().await.swap_path(vec![TokenAddressEth::WETH, TokenAddressEth::USDC], vec![PoolId::Address(UniswapV3PoolAddress::USDC_WETH_500)])?;
138 let mut swap_line = SwapLine::from(swap_path);
139 swap_line.amount_in = SwapAmountType::Set( NWETH::from_float(0.1));
140 swap_line.gas_used = Some(300000);
141
142 let tx_compose_encode_msg = MessageSwapCompose::prepare(
143 SwapComposeData{
144 tx_compose : TxComposeData {
145 next_block_base_fee : bc.chain_parameters().calc_next_block_base_fee_from_header(&header),
146 ..TxComposeData::default()
147 },
148 poststate : Some(market_state.read().await.state_db.clone()),
149 swap : Swap::ExchangeSwapLine(swap_line),
150 ..SwapComposeData::default()
151 });
152
153 if let Err(e) = compose_channel.send(tx_compose_encode_msg) {
154 error!("compose_channel.send : {}", e)
155 }else{
156 debug!("compose_channel.send ok");
157 }
158 }
159 }
160 Err(e)=>{
161 error!("Error receiving headers: {e}");
162 }
163 }
164 }
165
166 logs = logs_sub.recv() => {
167 match logs{
168 Ok(logs_update)=>{
169 info!("Block logs received : {} log records : {}", logs_update.block_header.hash, logs_update.logs.len());
170 }
171 Err(e)=>{
172 error!("Error receiving logs: {e}");
173 }
174 }
175 }
176
177 block = block_sub.recv() => {
178 match block {
179 Ok(block_msg)=>{
180 info!("Block with tx received : {} txs : {}", block_msg.block.header.hash, block_msg.block.transactions.len());
181 }
182 Err(e)=>{
183 error!("Error receiving blocks: {e}");
184 }
185 }
186 }
187 state_udpate = state_update_sub.recv() => {
188 match state_udpate {
189 Ok(state_update)=>{
190 let state_update = state_update.inner;
191
192 info!("Block state update received : {} update records : {}", state_update.block_header.hash, state_update.state_update.len() );
193 let mut state_db = market_state.read().await.state_db.clone();
194 state_db.apply_geth_update_vec(state_update.state_update);
195
196
197 if let Ok(balance) = ERC20StateReader::balance_of(&state_db, TokenAddressEth::WETH, TARGET_ADDRESS ) {
200 info!("------WETH Balance of {} : {}", TARGET_ADDRESS, balance);
201 let tx_req = alloy::rpc::types::TransactionRequest::default()
202 .to(TokenAddressEth::WETH)
203 .with_input(AbiEncoderHelper::encode_erc20_balance_of(TARGET_ADDRESS));
204 let fetched_balance = node_provider.call(tx_req).block(cur_header.number.into()).await?;
205
206 let fetched_balance = U256::from_be_slice(fetched_balance.to_vec().as_slice());
207 if fetched_balance != balance {
208 error!("Balance is wrong {}/({:#x}) need {}({:#x})", balance, balance, fetched_balance, fetched_balance);
209 exit(1);
210 }
211 } else {
212 info!("ERC20StateReader call failed - EVM integration not complete yet");
213 }
214 if let Ok(balance) = ERC20StateReader::balance_of(&state_db, TokenAddressEth::WETH, UniswapV3PoolAddress::USDC_WETH_500 ) {
215 info!("------WETH Balance of {} : {}/({:#x}) ", UniswapV3PoolAddress::USDC_WETH_500, balance, balance);
216 } else {
217 info!("ERC20StateReader call for pool balance failed - EVM integration not complete yet");
218 }
219
220 info!("StateDB : Accounts: {} / {} Contracts : {} / {}", state_db.accounts_len(), state_db.ro_accounts_len(), state_db.contracts_len(), state_db.ro_contracts_len())
221
222 }
223 Err(e)=>{
224 error!("Error receiving blocks: {e}");
225 }
226 }
227 }
228 }
229 }
230}