kabu_node_json_rpc/
wait_for_node_sync_actor.rs

1use alloy_network::Ethereum;
2use alloy_provider::Provider;
3use alloy_rpc_types::SyncStatus;
4use eyre::eyre;
5use kabu_core_actors::{Actor, ActorResult, WorkerResult};
6use kabu_node_debug_provider::DebugProviderExt;
7use std::time::Duration;
8use tokio::time::timeout;
9use tracing::{error, info};
10
11const SYNC_CHECK_INTERVAL: Duration = Duration::from_secs(1);
12const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
13
14/// Wait for the node to sync. This works only for http/ipc/ws providers.
15async fn wait_for_node_sync_one_shot_worker<P>(client: P) -> WorkerResult
16where
17    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
18{
19    info!("Waiting for node to sync...");
20    let mut print_count = 0;
21    loop {
22        match timeout(CLIENT_TIMEOUT, client.syncing()).await {
23            Ok(result) => match result {
24                Ok(syncing_status) => match syncing_status {
25                    SyncStatus::None => {
26                        break;
27                    }
28                    SyncStatus::Info(sync_progress) => {
29                        if print_count == 0 {
30                            info!("Sync progress: {:?}", sync_progress);
31                        }
32                    }
33                },
34                Err(e) => {
35                    error!("Error retrieving syncing status: {:?}", e);
36                    break;
37                }
38            },
39            Err(elapsed) => {
40                error!("Timeout during get syncing status. Elapsed time: {:?}", elapsed);
41                break;
42            }
43        }
44        tokio::time::sleep(SYNC_CHECK_INTERVAL).await;
45        print_count = if print_count > 4 { 0 } else { print_count + 1 };
46    }
47    Ok("Node is sync".to_string())
48}
49
50pub struct WaitForNodeSyncOneShotBlockingActor<P> {
51    client: P,
52}
53
54impl<P> WaitForNodeSyncOneShotBlockingActor<P>
55where
56    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
57{
58    pub fn new(client: P) -> WaitForNodeSyncOneShotBlockingActor<P> {
59        WaitForNodeSyncOneShotBlockingActor { client }
60    }
61}
62
63impl<P> Actor for WaitForNodeSyncOneShotBlockingActor<P>
64where
65    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
66{
67    fn start_and_wait(&self) -> eyre::Result<()> {
68        let rt = tokio::runtime::Runtime::new()?; // we need a different runtime to wait for the result
69        let client_cloned = self.client.clone();
70        let handle = rt.spawn(async { wait_for_node_sync_one_shot_worker(client_cloned).await });
71
72        self.wait(Ok(vec![handle]))?;
73        rt.shutdown_background();
74
75        Ok(())
76    }
77
78    fn start(&self) -> ActorResult {
79        Err(eyre!("NEED_TO_BE_WAITED"))
80    }
81
82    fn name(&self) -> &'static str {
83        "WaitForNodeSyncOneShotBlockingActor"
84    }
85}