kabu_node_json_rpc/
wait_for_node_sync_actor.rs1use alloy_network::Ethereum;
2use alloy_provider::Provider;
3use alloy_rpc_types::SyncStatus;
4use eyre::eyre;
5use kabu_core_actors::{Actor, ActorResult, WorkerResult};
6use kabu_node_debug_provider::DebugProviderExt;
7use std::time::Duration;
8use tokio::time::timeout;
9use tracing::{error, info};
10
11const SYNC_CHECK_INTERVAL: Duration = Duration::from_secs(1);
12const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
13
14async fn wait_for_node_sync_one_shot_worker<P>(client: P) -> WorkerResult
16where
17 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
18{
19 info!("Waiting for node to sync...");
20 let mut print_count = 0;
21 loop {
22 match timeout(CLIENT_TIMEOUT, client.syncing()).await {
23 Ok(result) => match result {
24 Ok(syncing_status) => match syncing_status {
25 SyncStatus::None => {
26 break;
27 }
28 SyncStatus::Info(sync_progress) => {
29 if print_count == 0 {
30 info!("Sync progress: {:?}", sync_progress);
31 }
32 }
33 },
34 Err(e) => {
35 error!("Error retrieving syncing status: {:?}", e);
36 break;
37 }
38 },
39 Err(elapsed) => {
40 error!("Timeout during get syncing status. Elapsed time: {:?}", elapsed);
41 break;
42 }
43 }
44 tokio::time::sleep(SYNC_CHECK_INTERVAL).await;
45 print_count = if print_count > 4 { 0 } else { print_count + 1 };
46 }
47 Ok("Node is sync".to_string())
48}
49
50pub struct WaitForNodeSyncOneShotBlockingActor<P> {
51 client: P,
52}
53
54impl<P> WaitForNodeSyncOneShotBlockingActor<P>
55where
56 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
57{
58 pub fn new(client: P) -> WaitForNodeSyncOneShotBlockingActor<P> {
59 WaitForNodeSyncOneShotBlockingActor { client }
60 }
61}
62
63impl<P> Actor for WaitForNodeSyncOneShotBlockingActor<P>
64where
65 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
66{
67 fn start_and_wait(&self) -> eyre::Result<()> {
68 let rt = tokio::runtime::Runtime::new()?; let client_cloned = self.client.clone();
70 let handle = rt.spawn(async { wait_for_node_sync_one_shot_worker(client_cloned).await });
71
72 self.wait(Ok(vec![handle]))?;
73 rt.shutdown_background();
74
75 Ok(())
76 }
77
78 fn start(&self) -> ActorResult {
79 Err(eyre!("NEED_TO_BE_WAITED"))
80 }
81
82 fn name(&self) -> &'static str {
83 "WaitForNodeSyncOneShotBlockingActor"
84 }
85}