kabu_rpc_handler/handler/
ws.rs

1use axum::extract::{ConnectInfo, State};
2use axum::{
3    extract::ws::{Message, WebSocket, WebSocketUpgrade},
4    response::IntoResponse,
5};
6
7use crate::dto::block::{BlockHeader, WebSocketMessage};
8use kabu_rpc_state::AppState;
9use kabu_types_blockchain::ChainParameters;
10use revm::{DatabaseCommit, DatabaseRef};
11use std::net::SocketAddr;
12use tracing::{error, warn};
13
14/// Handle websocket upgrade
15pub async fn ws_handler<DB: DatabaseRef<Error = kabu_evm_db::KabuDBError> + DatabaseCommit + Send + Sync + Clone + 'static>(
16    ws: WebSocketUpgrade,
17    ConnectInfo(addr): ConnectInfo<SocketAddr>,
18    State(app_state): State<AppState<DB>>,
19) -> impl IntoResponse {
20    ws.on_failed_upgrade(move |e| {
21        warn!("ws upgrade error: {} with {}", e, addr);
22    })
23    .on_upgrade(move |socket| on_upgrade(socket, addr, app_state))
24}
25
26/// Actual websocket statemachine (one will be spawned per connection)
27async fn on_upgrade<DB: DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static>(
28    mut socket: WebSocket,
29    _who: SocketAddr,
30    app_state: AppState<DB>,
31) {
32    let mut receiver = app_state.bc.new_block_headers_channel().subscribe();
33
34    while let Ok(header) = receiver.recv().await {
35        let ws_msg = WebSocketMessage::BlockHeader(BlockHeader {
36            number: header.inner.header.number,
37            timestamp: header.inner.header.timestamp,
38            base_fee_per_gas: header.inner.header.base_fee_per_gas,
39            next_block_base_fee: ChainParameters::ethereum().calc_next_block_base_fee_from_header(&header.inner.header),
40        });
41        match serde_json::to_string(&ws_msg) {
42            Ok(json) => {
43                let _ = socket.send(Message::Text(json.into())).await;
44            }
45            Err(e) => {
46                error!("Failed to serialize block header: {}", e);
47            }
48        }
49    }
50}