kabu_rpc_handler/handler/
ws.rs1use axum::extract::{ConnectInfo, State};
2use axum::{
3 extract::ws::{Message, WebSocket, WebSocketUpgrade},
4 response::IntoResponse,
5};
6
7use crate::dto::block::{BlockHeader, WebSocketMessage};
8use kabu_rpc_state::AppState;
9use kabu_types_blockchain::ChainParameters;
10use revm::{DatabaseCommit, DatabaseRef};
11use std::net::SocketAddr;
12use tracing::{error, warn};
13
14pub async fn ws_handler<DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static>(
16 ws: WebSocketUpgrade,
17 ConnectInfo(addr): ConnectInfo<SocketAddr>,
18 State(app_state): State<AppState<DB>>,
19) -> impl IntoResponse {
20 ws.on_failed_upgrade(move |e| {
21 warn!("ws upgrade error: {} with {}", e, addr);
22 })
23 .on_upgrade(move |socket| on_upgrade(socket, addr, app_state))
24}
25
26async fn on_upgrade<DB: DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static>(
28 mut socket: WebSocket,
29 _who: SocketAddr,
30 app_state: AppState<DB>,
31) {
32 let mut receiver = app_state.bc.new_block_headers_channel().subscribe();
33
34 while let Ok(header) = receiver.recv().await {
35 let ws_msg = WebSocketMessage::BlockHeader(BlockHeader {
36 number: header.inner.header.number,
37 timestamp: header.inner.header.timestamp,
38 base_fee_per_gas: header.inner.header.base_fee_per_gas,
39 next_block_base_fee: ChainParameters::ethereum().calc_next_block_base_fee_from_header(&header.inner.header),
40 });
41 match serde_json::to_string(&ws_msg) {
42 Ok(json) => {
43 let _ = socket.send(Message::Text(json.into())).await;
44 }
45 Err(e) => {
46 error!("Failed to serialize block header: {}", e);
47 }
48 }
49 }
50}