kabu_rpc_handler/
web_actor.rs1use crate::router::router;
2use axum::Router;
3use kabu_core_actors::{Actor, ActorResult, WorkerResult};
4use kabu_core_actors_macros::Consumer;
5use kabu_core_blockchain::{Blockchain, BlockchainState};
6use kabu_rpc_state::AppState;
7use kabu_storage_db::DbPool;
8use kabu_types_blockchain::KabuDataTypesEthereum;
9use revm::{DatabaseCommit, DatabaseRef};
10use std::net::SocketAddr;
11use tokio::net::TcpListener;
12use tokio_util::sync::CancellationToken;
13use tower_http::trace::{DefaultMakeSpan, TraceLayer};
14use tracing::info;
15
16pub async fn start_web_server_worker<S, DB>(
17 host: String,
18 extra_router: Router<S>,
19 bc: Blockchain,
20 state: BlockchainState<DB, KabuDataTypesEthereum>,
21 db_pool: DbPool,
22 shutdown_token: CancellationToken,
23) -> WorkerResult
24where
25 DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
26 S: Clone + Send + Sync + 'static,
27 Router: From<Router<S>>,
28{
29 let app_state = AppState { db: db_pool, bc, state };
30 let router = router(app_state);
31 let router = router.merge(extra_router);
32
33 let router = router.layer(TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::default().include_headers(true)));
35
36 info!("Webserver listening on {}", &host);
37 let listener = TcpListener::bind(host).await?;
38 axum::serve(listener, router.into_make_service_with_connect_info::<SocketAddr>())
39 .with_graceful_shutdown(async move {
40 shutdown_token.cancelled().await;
41 info!("Shutting down webserver...");
42 })
43 .await?;
44
45 Ok("Webserver shutdown".to_string())
46}
47
48#[derive(Consumer)]
49pub struct WebServerActor<S, DB: Clone + Send + Sync + 'static> {
50 host: String,
51 extra_router: Router<S>,
52 shutdown_token: CancellationToken,
53 db_pool: DbPool,
54 bc: Option<Blockchain>,
55 state: Option<BlockchainState<DB, KabuDataTypesEthereum>>,
56}
57
58impl<S, DB> WebServerActor<S, DB>
59where
60 DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + Send + Sync + Clone + Default + 'static,
61 S: Clone + Send + Sync + 'static,
62 Router: From<Router<S>>,
63{
64 pub fn new(host: String, extra_router: Router<S>, db_pool: DbPool, shutdown_token: CancellationToken) -> Self {
65 Self { host, extra_router, shutdown_token, db_pool, bc: None, state: None }
66 }
67
68 pub fn on_bc(self, bc: &Blockchain, state: &BlockchainState<DB, KabuDataTypesEthereum>) -> Self {
69 Self { bc: Some(bc.clone()), state: Some(state.clone()), ..self }
70 }
71}
72
73impl<S, DB> Actor for WebServerActor<S, DB>
74where
75 S: Clone + Send + Sync + 'static,
76 Router: From<Router<S>>,
77 DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + DatabaseCommit + Send + Sync + Clone + Default + 'static,
78{
79 fn start(&self) -> ActorResult {
80 let task = tokio::spawn(start_web_server_worker(
81 self.host.clone(),
82 self.extra_router.clone(),
83 self.bc.clone().unwrap(),
84 self.state.clone().unwrap(),
85 self.db_pool.clone(),
86 self.shutdown_token.clone(),
87 ));
88 Ok(vec![task])
89 }
90
91 fn name(&self) -> &'static str {
92 "WebServerActor"
93 }
94}