kabu_rpc_handler/
web_actor.rs

1use crate::router::router;
2use axum::Router;
3use kabu_core_actors::{Actor, ActorResult, WorkerResult};
4use kabu_core_actors_macros::Consumer;
5use kabu_core_blockchain::{Blockchain, BlockchainState};
6use kabu_rpc_state::AppState;
7use kabu_storage_db::DbPool;
8use kabu_types_blockchain::KabuDataTypesEthereum;
9use revm::{DatabaseCommit, DatabaseRef};
10use std::net::SocketAddr;
11use tokio::net::TcpListener;
12use tokio_util::sync::CancellationToken;
13use tower_http::trace::{DefaultMakeSpan, TraceLayer};
14use tracing::info;
15
16pub async fn start_web_server_worker<S, DB>(
17    host: String,
18    extra_router: Router<S>,
19    bc: Blockchain,
20    state: BlockchainState<DB, KabuDataTypesEthereum>,
21    db_pool: DbPool,
22    shutdown_token: CancellationToken,
23) -> WorkerResult
24where
25    DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
26    S: Clone + Send + Sync + 'static,
27    Router: From<Router<S>>,
28{
29    let app_state = AppState { db: db_pool, bc, state };
30    let router = router(app_state);
31    let router = router.merge(extra_router);
32
33    // logging
34    let router = router.layer(TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::default().include_headers(true)));
35
36    info!("Webserver listening on {}", &host);
37    let listener = TcpListener::bind(host).await?;
38    axum::serve(listener, router.into_make_service_with_connect_info::<SocketAddr>())
39        .with_graceful_shutdown(async move {
40            shutdown_token.cancelled().await;
41            info!("Shutting down webserver...");
42        })
43        .await?;
44
45    Ok("Webserver shutdown".to_string())
46}
47
48#[derive(Consumer)]
49pub struct WebServerActor<S, DB: Clone + Send + Sync + 'static> {
50    host: String,
51    extra_router: Router<S>,
52    shutdown_token: CancellationToken,
53    db_pool: DbPool,
54    bc: Option<Blockchain>,
55    state: Option<BlockchainState<DB, KabuDataTypesEthereum>>,
56}
57
58impl<S, DB> WebServerActor<S, DB>
59where
60    DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + Send + Sync + Clone + Default + 'static,
61    S: Clone + Send + Sync + 'static,
62    Router: From<Router<S>>,
63{
64    pub fn new(host: String, extra_router: Router<S>, db_pool: DbPool, shutdown_token: CancellationToken) -> Self {
65        Self { host, extra_router, shutdown_token, db_pool, bc: None, state: None }
66    }
67
68    pub fn on_bc(self, bc: &Blockchain, state: &BlockchainState<DB, KabuDataTypesEthereum>) -> Self {
69        Self { bc: Some(bc.clone()), state: Some(state.clone()), ..self }
70    }
71}
72
73impl<S, DB> Actor for WebServerActor<S, DB>
74where
75    S: Clone + Send + Sync + 'static,
76    Router: From<Router<S>>,
77    DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
78{
79    fn start(&self) -> ActorResult {
80        let task = tokio::spawn(start_web_server_worker(
81            self.host.clone(),
82            self.extra_router.clone(),
83            self.bc.clone().unwrap(),
84            self.state.clone().unwrap(),
85            self.db_pool.clone(),
86            self.shutdown_token.clone(),
87        ));
88        Ok(vec![task])
89    }
90
91    fn name(&self) -> &'static str {
92        "WebServerActor"
93    }
94}