1use crate::cli::Cli;
2use alloy::primitives::{BlockHash, BlockNumber};
3use alloy::{
4 eips::BlockNumberOrTag,
5 primitives::TxHash,
6 providers::{Provider, ProviderBuilder, RootProvider},
7 rpc::types::BlockTransactions,
8};
9use chrono::{DateTime, Duration, Local, TimeDelta};
10use clap::Parser;
11use eyre::{eyre, Result};
12use futures::future::join_all;
13use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
14use kabu_core_blockchain_actors::BlockchainActors;
15use kabu_evm_db::KabuDB;
16use kabu_execution_multicaller::MulticallerSwapEncoder;
17use kabu_node_actor_config::NodeBlockActorConfig;
18use kabu_types_blockchain::KabuDataTypesEthereum;
19use kabu_types_events::MempoolEvents;
20use std::fmt::Formatter;
21use std::{collections::HashMap, fmt::Display, sync::Arc};
22use tokio::{select, sync::RwLock, task::JoinHandle};
23
24mod cli;
25
26#[allow(dead_code)]
27#[derive(Clone, Debug, Default)]
28pub struct StatEntry {
29 first: Vec<usize>,
30 total_delay: Vec<Duration>,
31 avg_delay_ms: Vec<i64>,
32}
33
34impl Display for StatEntry {
35 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36 write!(f, "first {:?} avg delay {:?} μs", self.first, self.avg_delay_ms)
37 }
38}
39
40#[derive(Clone, Debug, Default)]
41pub struct TimeMap {
42 time_map: HashMap<usize, DateTime<Local>>,
43}
44
45impl TimeMap {
46 pub fn add_time(&mut self, id: usize, time: DateTime<Local>) {
47 self.time_map.entry(id).or_insert(time);
48 }
49 pub fn add_now(&mut self, id: usize) -> DateTime<Local> {
50 *self.time_map.entry(id).or_insert(Local::now())
51 }
52
53 pub fn get_time(&self, id: usize) -> Option<&DateTime<Local>> {
54 self.time_map.get(&id)
55 }
56
57 pub fn to_relative(&self, pings: &[TimeDelta]) -> TimeMap {
58 let rel_time: HashMap<usize, DateTime<Local>> =
59 self.time_map.iter().map(|(k, v)| (*k, *v - pings.get(*k).cloned().unwrap())).collect();
60 TimeMap { time_map: rel_time }
61 }
62
63 pub fn get_first_time(&self) -> DateTime<Local> {
64 self.time_map.values().min().cloned().unwrap_or_default()
65 }
66
67 pub fn get_time_delta(&self, id: usize) -> Option<TimeDelta> {
68 self.time_map.get(&id).map(|x| *x - self.get_first_time())
69 }
70}
71
72fn analyze_time_maps(time_map_vec: Vec<&TimeMap>, ping: Option<&[TimeDelta]>) -> StatEntry {
73 let nodes_count = time_map_vec.first();
74
75 if nodes_count.is_none() {
76 return Default::default();
77 }
78
79 let nodes_count = nodes_count.unwrap().time_map.len();
80 if nodes_count == 0 {
81 return Default::default();
82 }
83
84 let mut delays: Vec<Duration> = vec![Duration::default(); nodes_count];
85 let mut received_first: Vec<usize> = vec![0; nodes_count];
86
87 for time_map in time_map_vec.iter() {
88 for node_id in 0..nodes_count {
89 match ping {
90 Some(ping) => {
91 if let Some(t) = time_map.to_relative(ping).get_time_delta(node_id) {
92 delays[node_id] += t;
93 if t.is_zero() {
94 received_first[node_id] += 1;
95 }
96 }
97 }
98 None => {
99 if let Some(t) = time_map.get_time_delta(node_id) {
100 delays[node_id] += t;
101 if t.is_zero() {
102 received_first[node_id] += 1;
103 }
104 }
105 }
106 }
107 }
108 }
109
110 let total_entries: usize = received_first.iter().sum();
111
112 let delays_avg: Vec<i64> = delays
113 .iter()
114 .enumerate()
115 .map(|(i, x)| {
116 if total_entries - received_first[i] == 0 {
117 0
118 } else {
119 x.num_microseconds().unwrap_or_default() / ((total_entries - received_first[i]) as i64)
120 }
121 })
122 .collect();
123
124 StatEntry { first: received_first, total_delay: delays, avg_delay_ms: delays_avg }
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct StatCollector {
129 ping: Vec<TimeDelta>,
130 blocks: HashMap<BlockHash, BlockNumber>,
131 block_headers: HashMap<BlockNumber, TimeMap>,
132 block_with_tx: HashMap<BlockNumber, TimeMap>,
133 block_logs: HashMap<BlockNumber, TimeMap>,
134 block_state: HashMap<BlockNumber, TimeMap>,
135 txs: HashMap<TxHash, TimeMap>,
136}
137
138impl Display for StatCollector {
139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140 writeln!(f, "headers abs {}", analyze_time_maps(self.block_headers.values().collect(), None))?;
141 writeln!(f, "headers rel {}", analyze_time_maps(self.block_headers.values().collect(), Some(&self.ping)))?;
142 writeln!(f, "blocks abs {}", analyze_time_maps(self.block_with_tx.values().collect(), None))?;
143 writeln!(f, "blocks rel {}", analyze_time_maps(self.block_with_tx.values().collect(), Some(&self.ping)))?;
144 writeln!(f, "logs abs {}", analyze_time_maps(self.block_logs.values().collect(), None))?;
145 writeln!(f, "logs rel {}", analyze_time_maps(self.block_logs.values().collect(), Some(&self.ping)))?;
146 writeln!(f, "state abs {}", analyze_time_maps(self.block_state.values().collect(), None))?;
147 writeln!(f, "state rel {}", analyze_time_maps(self.block_state.values().collect(), Some(&self.ping)))?;
148 writeln!(f, "-----")
149 }
150}
151
152#[derive(Clone, Debug, Default)]
153pub struct TxStatCollector {
154 pub(crate) total_received_tx: usize,
155 pub(crate) total_txs: usize,
156 pub(crate) txs_received: Vec<usize>,
157 pub(crate) txs_received_first: Vec<usize>,
158 pub(crate) txs_received_first_relative: Vec<usize>,
159 pub(crate) txs_delays: Vec<Duration>,
160 pub(crate) txs_delays_relative: Vec<Duration>,
161
162 pub(crate) txs_received_outdated: Vec<usize>,
163}
164
165impl Display for TxStatCollector {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 let total_txs: usize = self.txs_received_first.iter().sum();
168
169 let tx_delays_avg: Vec<i64> = self
170 .txs_delays
171 .iter()
172 .enumerate()
173 .map(|(i, x)| {
174 if total_txs - self.txs_received_first[i] == 0 {
175 0
176 } else {
177 x.num_microseconds().unwrap_or_default() / ((total_txs - self.txs_received_first[i]) as i64)
178 }
179 })
180 .collect();
181
182 let total_txs_rel: usize = self.txs_received_first_relative.iter().sum();
183
184 let tx_delays_relative_avg: Vec<i64> = self
185 .txs_delays
186 .iter()
187 .enumerate()
188 .map(|(i, x)| {
189 if total_txs_rel - self.txs_received_first_relative[i] == 0 {
190 0
191 } else {
192 x.num_microseconds().unwrap_or_default() / ((total_txs_rel - self.txs_received_first_relative[i]) as i64)
193 }
194 })
195 .collect();
196 writeln!(
197 f,
198 "txs total in blocks: {} received by nodes: {} per node {:?} outdated {:?}",
199 self.total_txs, self.total_received_tx, self.txs_received, self.txs_received_outdated,
200 )?;
201 writeln!(f, "txs abs first {:?} delays avg {:?} μs", self.txs_received_first, tx_delays_avg)?;
202 writeln!(f, "txs rel first {:?} delays avg {:?} μs", self.txs_received_first_relative, tx_delays_relative_avg)?;
203
204 Ok(())
205 }
206}
207
208impl TxStatCollector {
209 pub fn new(nodes_count: usize) -> TxStatCollector {
210 TxStatCollector {
211 txs_received: vec![0; nodes_count],
212 txs_received_first: vec![0; nodes_count],
213 txs_received_first_relative: vec![0; nodes_count],
214 txs_delays: vec![Duration::default(); nodes_count],
215 txs_delays_relative: vec![Duration::default(); nodes_count],
216
217 txs_received_outdated: vec![0; nodes_count],
218 ..TxStatCollector::default()
219 }
220 }
221}
222
223async fn collect_stat_task(
224 id: usize,
225 provider: RootProvider,
226 stat: Arc<RwLock<StatCollector>>,
227 warn_up_blocks: usize,
228 blocks_needed: usize,
229 ping_time: TimeDelta,
230) -> Result<()> {
231 let bc = Blockchain::new(1);
232
233 let bc_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
234 let strategy = Strategy::<KabuDB>::new();
235
236 let encoder = MulticallerSwapEncoder::default();
237
238 let mut bc_actors = BlockchainActors::new(provider, encoder, bc.clone(), bc_state, strategy, vec![]);
239 bc_actors.with_block_events(NodeBlockActorConfig::all_enabled())?.with_local_mempool_events()?;
240
241 let mut blocks_counter: usize = 0;
242
243 let mut block_header_subscription = bc.new_block_headers_channel().subscribe();
244 let mut block_with_tx_subscription = bc.new_block_with_tx_channel().subscribe();
245 let mut block_logs_subscription = bc.new_block_logs_channel().subscribe();
246 let mut block_state_subscription = bc.new_block_state_update_channel().subscribe();
247
248 let mut pending_tx_subscription = bc.mempool_events_channel().subscribe();
249
250 loop {
251 select! {
252 header = block_header_subscription.recv() => {
253 match header {
254 Ok(header)=>{
255 let block_number = header.inner.header.number;
256 let block_hash = header.inner.header.hash;
257 stat.write().await.blocks.insert(block_hash, block_number);
258
259 blocks_counter += 1;
260 if blocks_counter >= warn_up_blocks {
261 let recv_time = stat.write().await.block_headers.entry(block_number).or_default().add_now(id);
262 println!("{id} : {} block header received {} {}", block_number, block_hash, recv_time - ping_time);
263 }else{
264 println!("Warming up {id} : {block_number} block header received {block_hash}");
265 }
266
267 if blocks_counter >= blocks_needed + warn_up_blocks {
268 break;
269 }
270 }
271 Err(e)=>{
272 println!("Error receiving block header {id} {e}");
273 }
274 }
275
276 }
277 block_msg = block_with_tx_subscription.recv() => {
278 match block_msg {
279 Ok(block_msg)=>{
280 let block_number = block_msg.block.header.number;
281 let block_hash = block_msg.block.header.hash;
282 if blocks_counter >= warn_up_blocks {
283 let recv_time = stat.write().await.block_with_tx.entry(block_number).or_default().add_now(id);
284 println!("{id} : {} block with tx received {} {}", block_number, block_hash, recv_time - ping_time);
285 }else{
286 println!("Warming up {id} : {block_number} block with tx received {block_hash}");
287 }
288 }
289 Err(e)=>{
290 println!("Error receiving block with tx {id} {e}");
291 }
292 }
293 }
294 logs = block_logs_subscription.recv() => {
295 match logs {
296 Ok(logs)=>{
297 let block_number = stat.read().await.blocks.get(&logs.block_header.hash).cloned().unwrap_or_default();
298
299 if blocks_counter >= warn_up_blocks {
300 let recv_time = stat.write().await.block_logs.entry(block_number).or_default().add_now(id);
301 println!("{id} : {} block logs received {} {}", block_number, logs.block_header.hash, recv_time - ping_time);
302 }else{
303 println!("Warming up {id} : {} block logs received {}", block_number, logs.block_header.hash);
304 }
305 }
306 Err(e)=>{
307 println!("Error receiving block logs {id} {e}");
308 }
309 }
310 }
311
312
313 state_update = block_state_subscription.recv() => {
314 match state_update {
315 Ok(state_update)=>{
316 let block_number = stat.read().await.blocks.get(&state_update.block_header.hash).cloned().unwrap_or_default();
317 let block_hash = state_update.block_header.hash;
318
319 if blocks_counter >= warn_up_blocks {
320 let recv_time = stat.write().await.block_state.entry(block_number).or_default().add_now(id);
321 println!("{id} : {} block state received {} {}", block_number, block_hash, recv_time - ping_time);
322 }else{
323 println!("Warming up {id} : {block_number} block state tx received {block_hash}");
324 }
325 }
326 Err(e)=>{
327 println!("Error receiving block state {id} {e}");
328 }
329 }
330 }
331
332 mempool_event = pending_tx_subscription.recv() =>{
333 match mempool_event {
334 Ok(mempool_event) =>{
335 if let MempoolEvents::MempoolTxUpdate{ tx_hash} = mempool_event {
336 stat.write().await.txs.entry(tx_hash).or_default().add_now(id);
337 }
338 }
339 Err(e)=>{
340 println!("Error receiving tx {id} {e}");
341 }
342 }
343
344 }
345
346 }
347 }
348 println!("{id} finished");
349
350 Ok(())
351}
352
353#[tokio::main]
354async fn main() -> Result<()> {
355 env_logger::init_from_env(env_logger::Env::default().default_filter_or("debug,alloy_rpc_client=info,h2=info"));
356 let cli = Cli::parse();
357
358 if cli.endpoint.is_empty() {
359 return Err(eyre!("NO_NODES_SELECTED"));
360 }
361
362 let nodes_count = cli.endpoint.len();
363
364 let stat = Arc::new(RwLock::new(StatCollector::default()));
365
366 println!("Hello, nodebench!");
367
368 let mut tasks: Vec<JoinHandle<_>> = vec![];
369
370 let mut first_provider: Option<RootProvider> = None;
371
372 for (idx, endpoint) in cli.endpoint.iter().enumerate() {
373 let provider = ProviderBuilder::new().disable_recommended_fillers().connect(endpoint.clone().as_str()).await?;
375
376 if first_provider.is_none() {
377 first_provider = Some(provider.clone());
378 }
379
380 let start_time = Local::now();
381 for _i in 0u64..10 {
382 let block_number = provider.get_block_number().await?;
383 let _ = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await?;
384 }
385 let ping_time = (Local::now() - start_time) / (10 * 2);
386 println!("Ping time {idx} : {ping_time}");
387 stat.write().await.ping.push(ping_time);
388
389 let join_handler = tokio::spawn(collect_stat_task(idx, provider, stat.clone(), 3, 10, ping_time));
390 tasks.push(join_handler);
391 }
392
393 join_all(tasks).await;
394
395 let stat = stat.read().await;
396 let first_provider = first_provider.unwrap();
397
398 let mut calc = TxStatCollector::new(cli.endpoint.len());
399
400 println!("{stat}");
401
402 for (block_number, _) in stat.block_headers.iter() {
403 println!("Getting block {block_number}");
404 let block = first_provider.get_block_by_number(BlockNumberOrTag::Number(*block_number)).await?.unwrap();
405
406 calc.total_txs += block.transactions.len();
407
408 let block_time_map = stat.block_headers.get(block_number).unwrap();
409
410 if let BlockTransactions::Hashes(tx_hash_vec) = block.transactions {
411 for tx_hash in tx_hash_vec {
412 if let Some(tx_time) = stat.txs.get(&tx_hash) {
413 calc.total_received_tx += 1;
414 for node_id in 0..nodes_count {
415 let block_time_node = block_time_map.get_time(node_id).unwrap();
416
417 if let Some(tx_local_time) = tx_time.get_time(node_id) {
418 calc.txs_received[node_id] += 1;
419
420 if tx_local_time > block_time_node
422 || tx_time.get_time_delta(node_id).unwrap_or_default() > TimeDelta::seconds(2)
423 {
424 calc.txs_received_outdated[node_id] += 1;
425 } else {
426 if let Some(t) = tx_time.get_time_delta(node_id) {
428 calc.txs_delays[node_id] += t;
429 if t.is_zero() {
430 calc.txs_received_first[node_id] += 1;
431 }
432 }
433 if let Some(t) = tx_time.to_relative(&stat.ping).get_time_delta(node_id) {
435 calc.txs_delays_relative[node_id] += t;
436 if t.is_zero() {
437 calc.txs_received_first_relative[node_id] += 1;
438 }
439 }
440 }
441 }
442 }
443 }
444 }
445 }
446 }
447 println!("{calc}");
448
449 Ok(())
450}