nodebench/
main.rs

1use crate::cli::Cli;
2use alloy::primitives::{BlockHash, BlockNumber};
3use alloy::{
4    eips::BlockNumberOrTag,
5    primitives::TxHash,
6    providers::{Provider, ProviderBuilder, RootProvider},
7    rpc::types::BlockTransactions,
8};
9use chrono::{DateTime, Duration, Local, TimeDelta};
10use clap::Parser;
11use eyre::{eyre, Result};
12use futures::future::join_all;
13use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
14use kabu_core_blockchain_actors::BlockchainActors;
15use kabu_evm_db::KabuDB;
16use kabu_execution_multicaller::MulticallerSwapEncoder;
17use kabu_node_actor_config::NodeBlockActorConfig;
18use kabu_types_blockchain::KabuDataTypesEthereum;
19use kabu_types_events::MempoolEvents;
20use std::fmt::Formatter;
21use std::{collections::HashMap, fmt::Display, sync::Arc};
22use tokio::{select, sync::RwLock, task::JoinHandle};
23
24mod cli;
25
26#[allow(dead_code)]
27#[derive(Clone, Debug, Default)]
28pub struct StatEntry {
29    first: Vec<usize>,
30    total_delay: Vec<Duration>,
31    avg_delay_ms: Vec<i64>,
32}
33
34impl Display for StatEntry {
35    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36        write!(f, "first {:?} avg delay {:?} μs", self.first, self.avg_delay_ms)
37    }
38}
39
40#[derive(Clone, Debug, Default)]
41pub struct TimeMap {
42    time_map: HashMap<usize, DateTime<Local>>,
43}
44
45impl TimeMap {
46    pub fn add_time(&mut self, id: usize, time: DateTime<Local>) {
47        self.time_map.entry(id).or_insert(time);
48    }
49    pub fn add_now(&mut self, id: usize) -> DateTime<Local> {
50        *self.time_map.entry(id).or_insert(Local::now())
51    }
52
53    pub fn get_time(&self, id: usize) -> Option<&DateTime<Local>> {
54        self.time_map.get(&id)
55    }
56
57    pub fn to_relative(&self, pings: &[TimeDelta]) -> TimeMap {
58        let rel_time: HashMap<usize, DateTime<Local>> =
59            self.time_map.iter().map(|(k, v)| (*k, *v - pings.get(*k).cloned().unwrap())).collect();
60        TimeMap { time_map: rel_time }
61    }
62
63    pub fn get_first_time(&self) -> DateTime<Local> {
64        self.time_map.values().min().cloned().unwrap_or_default()
65    }
66
67    pub fn get_time_delta(&self, id: usize) -> Option<TimeDelta> {
68        self.time_map.get(&id).map(|x| *x - self.get_first_time())
69    }
70}
71
72fn analyze_time_maps(time_map_vec: Vec<&TimeMap>, ping: Option<&[TimeDelta]>) -> StatEntry {
73    let nodes_count = time_map_vec.first();
74
75    if nodes_count.is_none() {
76        return Default::default();
77    }
78
79    let nodes_count = nodes_count.unwrap().time_map.len();
80    if nodes_count == 0 {
81        return Default::default();
82    }
83
84    let mut delays: Vec<Duration> = vec![Duration::default(); nodes_count];
85    let mut received_first: Vec<usize> = vec![0; nodes_count];
86
87    for time_map in time_map_vec.iter() {
88        for node_id in 0..nodes_count {
89            match ping {
90                Some(ping) => {
91                    if let Some(t) = time_map.to_relative(ping).get_time_delta(node_id) {
92                        delays[node_id] += t;
93                        if t.is_zero() {
94                            received_first[node_id] += 1;
95                        }
96                    }
97                }
98                None => {
99                    if let Some(t) = time_map.get_time_delta(node_id) {
100                        delays[node_id] += t;
101                        if t.is_zero() {
102                            received_first[node_id] += 1;
103                        }
104                    }
105                }
106            }
107        }
108    }
109
110    let total_entries: usize = received_first.iter().sum();
111
112    let delays_avg: Vec<i64> = delays
113        .iter()
114        .enumerate()
115        .map(|(i, x)| {
116            if total_entries - received_first[i] == 0 {
117                0
118            } else {
119                x.num_microseconds().unwrap_or_default() / ((total_entries - received_first[i]) as i64)
120            }
121        })
122        .collect();
123
124    StatEntry { first: received_first, total_delay: delays, avg_delay_ms: delays_avg }
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct StatCollector {
129    ping: Vec<TimeDelta>,
130    blocks: HashMap<BlockHash, BlockNumber>,
131    block_headers: HashMap<BlockNumber, TimeMap>,
132    block_with_tx: HashMap<BlockNumber, TimeMap>,
133    block_logs: HashMap<BlockNumber, TimeMap>,
134    block_state: HashMap<BlockNumber, TimeMap>,
135    txs: HashMap<TxHash, TimeMap>,
136}
137
138impl Display for StatCollector {
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        writeln!(f, "headers abs {}", analyze_time_maps(self.block_headers.values().collect(), None))?;
141        writeln!(f, "headers rel {}", analyze_time_maps(self.block_headers.values().collect(), Some(&self.ping)))?;
142        writeln!(f, "blocks abs {}", analyze_time_maps(self.block_with_tx.values().collect(), None))?;
143        writeln!(f, "blocks rel {}", analyze_time_maps(self.block_with_tx.values().collect(), Some(&self.ping)))?;
144        writeln!(f, "logs abs {}", analyze_time_maps(self.block_logs.values().collect(), None))?;
145        writeln!(f, "logs rel {}", analyze_time_maps(self.block_logs.values().collect(), Some(&self.ping)))?;
146        writeln!(f, "state abs {}", analyze_time_maps(self.block_state.values().collect(), None))?;
147        writeln!(f, "state rel {}", analyze_time_maps(self.block_state.values().collect(), Some(&self.ping)))?;
148        writeln!(f, "-----")
149    }
150}
151
152#[derive(Clone, Debug, Default)]
153pub struct TxStatCollector {
154    pub(crate) total_received_tx: usize,
155    pub(crate) total_txs: usize,
156    pub(crate) txs_received: Vec<usize>,
157    pub(crate) txs_received_first: Vec<usize>,
158    pub(crate) txs_received_first_relative: Vec<usize>,
159    pub(crate) txs_delays: Vec<Duration>,
160    pub(crate) txs_delays_relative: Vec<Duration>,
161
162    pub(crate) txs_received_outdated: Vec<usize>,
163}
164
165impl Display for TxStatCollector {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        let total_txs: usize = self.txs_received_first.iter().sum();
168
169        let tx_delays_avg: Vec<i64> = self
170            .txs_delays
171            .iter()
172            .enumerate()
173            .map(|(i, x)| {
174                if total_txs - self.txs_received_first[i] == 0 {
175                    0
176                } else {
177                    x.num_microseconds().unwrap_or_default() / ((total_txs - self.txs_received_first[i]) as i64)
178                }
179            })
180            .collect();
181
182        let total_txs_rel: usize = self.txs_received_first_relative.iter().sum();
183
184        let tx_delays_relative_avg: Vec<i64> = self
185            .txs_delays
186            .iter()
187            .enumerate()
188            .map(|(i, x)| {
189                if total_txs_rel - self.txs_received_first_relative[i] == 0 {
190                    0
191                } else {
192                    x.num_microseconds().unwrap_or_default() / ((total_txs_rel - self.txs_received_first_relative[i]) as i64)
193                }
194            })
195            .collect();
196        writeln!(
197            f,
198            "txs total in blocks: {} received by nodes: {} per node {:?}  outdated {:?}",
199            self.total_txs, self.total_received_tx, self.txs_received, self.txs_received_outdated,
200        )?;
201        writeln!(f, "txs abs first {:?} delays avg {:?} μs", self.txs_received_first, tx_delays_avg)?;
202        writeln!(f, "txs rel first {:?} delays avg {:?} μs", self.txs_received_first_relative, tx_delays_relative_avg)?;
203
204        Ok(())
205    }
206}
207
208impl TxStatCollector {
209    pub fn new(nodes_count: usize) -> TxStatCollector {
210        TxStatCollector {
211            txs_received: vec![0; nodes_count],
212            txs_received_first: vec![0; nodes_count],
213            txs_received_first_relative: vec![0; nodes_count],
214            txs_delays: vec![Duration::default(); nodes_count],
215            txs_delays_relative: vec![Duration::default(); nodes_count],
216
217            txs_received_outdated: vec![0; nodes_count],
218            ..TxStatCollector::default()
219        }
220    }
221}
222
223async fn collect_stat_task(
224    id: usize,
225    provider: RootProvider,
226    stat: Arc<RwLock<StatCollector>>,
227    warn_up_blocks: usize,
228    blocks_needed: usize,
229    ping_time: TimeDelta,
230) -> Result<()> {
231    let bc = Blockchain::new(1);
232
233    let bc_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
234    let strategy = Strategy::<KabuDB>::new();
235
236    let encoder = MulticallerSwapEncoder::default();
237
238    let mut bc_actors = BlockchainActors::new(provider, encoder, bc.clone(), bc_state, strategy, vec![]);
239    bc_actors.with_block_events(NodeBlockActorConfig::all_enabled())?.with_local_mempool_events()?;
240
241    let mut blocks_counter: usize = 0;
242
243    let mut block_header_subscription = bc.new_block_headers_channel().subscribe();
244    let mut block_with_tx_subscription = bc.new_block_with_tx_channel().subscribe();
245    let mut block_logs_subscription = bc.new_block_logs_channel().subscribe();
246    let mut block_state_subscription = bc.new_block_state_update_channel().subscribe();
247
248    let mut pending_tx_subscription = bc.mempool_events_channel().subscribe();
249
250    loop {
251        select! {
252            header = block_header_subscription.recv() => {
253                match header {
254                    Ok(header)=>{
255                        let block_number = header.inner.header.number;
256                        let block_hash = header.inner.header.hash;
257                        stat.write().await.blocks.insert(block_hash, block_number);
258
259                        blocks_counter += 1;
260                        if blocks_counter >= warn_up_blocks {
261                            let recv_time = stat.write().await.block_headers.entry(block_number).or_default().add_now(id);
262                            println!("{id} : {} block header received {} {}", block_number, block_hash, recv_time - ping_time);
263                        }else{
264                            println!("Warming up {id} : {block_number} block header received {block_hash}");
265                        }
266
267                        if blocks_counter >= blocks_needed + warn_up_blocks {
268                            break;
269                        }
270                    }
271                    Err(e)=>{
272                        println!("Error receiving block header {id} {e}");
273                    }
274                }
275
276            }
277            block_msg = block_with_tx_subscription.recv() => {
278                match block_msg {
279                    Ok(block_msg)=>{
280                        let block_number = block_msg.block.header.number;
281                        let block_hash = block_msg.block.header.hash;
282                        if blocks_counter >= warn_up_blocks {
283                            let recv_time = stat.write().await.block_with_tx.entry(block_number).or_default().add_now(id);
284                            println!("{id} : {} block with tx received {} {}", block_number, block_hash, recv_time - ping_time);
285                        }else{
286                            println!("Warming up {id} : {block_number} block with tx received {block_hash}");
287                        }
288                    }
289                    Err(e)=>{
290                        println!("Error receiving block with tx {id} {e}");
291                    }
292                }
293            }
294            logs = block_logs_subscription.recv() => {
295                match logs {
296                    Ok(logs)=>{
297                        let block_number = stat.read().await.blocks.get(&logs.block_header.hash).cloned().unwrap_or_default();
298
299                        if blocks_counter >= warn_up_blocks {
300                            let recv_time = stat.write().await.block_logs.entry(block_number).or_default().add_now(id);
301                            println!("{id} : {} block logs received {} {}", block_number, logs.block_header.hash, recv_time - ping_time);
302                        }else{
303                            println!("Warming up {id} : {} block logs received {}", block_number, logs.block_header.hash);
304                        }
305                    }
306                    Err(e)=>{
307                        println!("Error receiving block logs {id} {e}");
308                    }
309                }
310            }
311
312
313            state_update = block_state_subscription.recv() => {
314                match state_update  {
315                    Ok(state_update)=>{
316                        let block_number = stat.read().await.blocks.get(&state_update.block_header.hash).cloned().unwrap_or_default();
317                        let block_hash = state_update.block_header.hash;
318
319                        if blocks_counter >= warn_up_blocks {
320                            let recv_time = stat.write().await.block_state.entry(block_number).or_default().add_now(id);
321                            println!("{id} : {} block state received {} {}", block_number, block_hash, recv_time - ping_time);
322                        }else{
323                            println!("Warming up {id} : {block_number} block state tx received {block_hash}");
324                        }
325                    }
326                    Err(e)=>{
327                        println!("Error receiving block state {id} {e}");
328                    }
329                }
330            }
331
332            mempool_event = pending_tx_subscription.recv() =>{
333                match mempool_event {
334                    Ok(mempool_event) =>{
335                        if let MempoolEvents::MempoolTxUpdate{ tx_hash} = mempool_event {
336                            stat.write().await.txs.entry(tx_hash).or_default().add_now(id);
337                        }
338                    }
339                    Err(e)=>{
340                        println!("Error receiving tx {id} {e}");
341                    }
342                }
343
344            }
345
346        }
347    }
348    println!("{id} finished");
349
350    Ok(())
351}
352
353#[tokio::main]
354async fn main() -> Result<()> {
355    env_logger::init_from_env(env_logger::Env::default().default_filter_or("debug,alloy_rpc_client=info,h2=info"));
356    let cli = Cli::parse();
357
358    if cli.endpoint.is_empty() {
359        return Err(eyre!("NO_NODES_SELECTED"));
360    }
361
362    let nodes_count = cli.endpoint.len();
363
364    let stat = Arc::new(RwLock::new(StatCollector::default()));
365
366    println!("Hello, nodebench!");
367
368    let mut tasks: Vec<JoinHandle<_>> = vec![];
369
370    let mut first_provider: Option<RootProvider> = None;
371
372    for (idx, endpoint) in cli.endpoint.iter().enumerate() {
373        //let conn = WsConnect::new(endpoint.clone());
374        let provider = ProviderBuilder::new().disable_recommended_fillers().connect(endpoint.clone().as_str()).await?;
375
376        if first_provider.is_none() {
377            first_provider = Some(provider.clone());
378        }
379
380        let start_time = Local::now();
381        for _i in 0u64..10 {
382            let block_number = provider.get_block_number().await?;
383            let _ = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await?;
384        }
385        let ping_time = (Local::now() - start_time) / (10 * 2);
386        println!("Ping time {idx} : {ping_time}");
387        stat.write().await.ping.push(ping_time);
388
389        let join_handler = tokio::spawn(collect_stat_task(idx, provider, stat.clone(), 3, 10, ping_time));
390        tasks.push(join_handler);
391    }
392
393    join_all(tasks).await;
394
395    let stat = stat.read().await;
396    let first_provider = first_provider.unwrap();
397
398    let mut calc = TxStatCollector::new(cli.endpoint.len());
399
400    println!("{stat}");
401
402    for (block_number, _) in stat.block_headers.iter() {
403        println!("Getting block {block_number}");
404        let block = first_provider.get_block_by_number(BlockNumberOrTag::Number(*block_number)).await?.unwrap();
405
406        calc.total_txs += block.transactions.len();
407
408        let block_time_map = stat.block_headers.get(block_number).unwrap();
409
410        if let BlockTransactions::Hashes(tx_hash_vec) = block.transactions {
411            for tx_hash in tx_hash_vec {
412                if let Some(tx_time) = stat.txs.get(&tx_hash) {
413                    calc.total_received_tx += 1;
414                    for node_id in 0..nodes_count {
415                        let block_time_node = block_time_map.get_time(node_id).unwrap();
416
417                        if let Some(tx_local_time) = tx_time.get_time(node_id) {
418                            calc.txs_received[node_id] += 1;
419
420                            // check if tx received after block
421                            if tx_local_time > block_time_node
422                                || tx_time.get_time_delta(node_id).unwrap_or_default() > TimeDelta::seconds(2)
423                            {
424                                calc.txs_received_outdated[node_id] += 1;
425                            } else {
426                                // calc absolute delay
427                                if let Some(t) = tx_time.get_time_delta(node_id) {
428                                    calc.txs_delays[node_id] += t;
429                                    if t.is_zero() {
430                                        calc.txs_received_first[node_id] += 1;
431                                    }
432                                }
433                                //calc relative delay
434                                if let Some(t) = tx_time.to_relative(&stat.ping).get_time_delta(node_id) {
435                                    calc.txs_delays_relative[node_id] += t;
436                                    if t.is_zero() {
437                                        calc.txs_received_first_relative[node_id] += 1;
438                                    }
439                                }
440                            }
441                        }
442                    }
443                }
444            }
445        }
446    }
447    println!("{calc}");
448
449    Ok(())
450}