kabu_defi_health_monitor/
state_health_monitor.rs

1use std::collections::HashMap;
2
3use alloy_eips::BlockNumberOrTag;
4use alloy_network::Ethereum;
5use alloy_provider::Provider;
6use chrono::{DateTime, Duration, Local};
7use eyre::Result;
8use tokio::sync::broadcast::error::RecvError;
9use tokio::sync::broadcast::Receiver;
10use tracing::{error, info, warn};
11
12use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, SharedState, WorkerResult};
13use kabu_core_actors_macros::{Accessor, Consumer};
14use kabu_core_blockchain::{Blockchain, BlockchainState};
15use kabu_evm_db::DatabaseKabuExt;
16use kabu_types_blockchain::KabuDataTypes;
17use kabu_types_entities::{MarketState, PoolId};
18use kabu_types_events::{MarketEvents, MessageTxCompose, TxComposeMessageType};
19use revm::DatabaseRef;
20
21async fn verify_pool_state_task<P: Provider<Ethereum> + 'static, DB: DatabaseKabuExt>(
22    client: P,
23    address: PoolId,
24    market_state: SharedState<MarketState<DB>>,
25) -> Result<()> {
26    info!("Verifying state {address:?}");
27    let address = match address {
28        PoolId::Address(addr) => addr,
29        PoolId::B256(_) => return Err(eyre::eyre!("B256 pool ID not supported for verification")),
30    };
31    let account = market_state.write().await.state_db.load_account(address).cloned()?;
32    let read_only_cell_hash_set = market_state.read().await.config.read_only_cells.get(&address).cloned().unwrap_or_default();
33
34    for (cell, current_value) in account.storage.iter() {
35        if read_only_cell_hash_set.contains(cell) {
36            continue;
37        }
38        match client.get_storage_at(address, *cell).block_id(BlockNumberOrTag::Latest.into()).await {
39            Ok(actual_value) => {
40                if actual_value.is_zero() {
41                    continue;
42                }
43                if *current_value != actual_value {
44                    warn!("verify : account storage is different : {address:?} {cell:?} {current_value:#32x} -> {actual_value:#32x} storage size : {}", account.storage.len());
45                    if let Err(e) = market_state.write().await.state_db.insert_account_storage(address, *cell, actual_value) {
46                        error!("{e}");
47                    }
48                }
49            }
50            Err(e) => {
51                error!("Cannot read storage {:?} {:?} : {}", account, cell, e)
52            }
53        }
54    }
55
56    Ok(())
57}
58
59pub async fn state_health_monitor_worker<
60    P: Provider<Ethereum> + Clone + 'static,
61    DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + 'static,
62>(
63    client: P,
64    market_state: SharedState<MarketState<DB>>,
65    tx_compose_channel_rx: Broadcaster<MessageTxCompose>,
66    market_events_rx: Broadcaster<MarketEvents>,
67) -> WorkerResult {
68    let mut tx_compose_channel_rx: Receiver<MessageTxCompose> = tx_compose_channel_rx.subscribe();
69    let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe();
70
71    let mut check_time_map: HashMap<PoolId, DateTime<Local>> = HashMap::new();
72    let mut pool_address_to_verify_vec: Vec<PoolId> = Vec::new();
73
74    loop {
75        tokio::select! {
76            msg = market_events_rx.recv() => {
77                let market_event_msg : Result<MarketEvents, RecvError> = msg;
78                match market_event_msg {
79                    Ok(market_event)=>{
80                        if matches!(market_event, MarketEvents::BlockStateUpdate{..}) {
81                            for pool_address in pool_address_to_verify_vec {
82                                tokio::task::spawn(
83                                    verify_pool_state_task(
84                                        client.clone(),
85                                        pool_address,
86                                        market_state.clone()
87                                    )
88                                );
89                            }
90                            pool_address_to_verify_vec = Vec::new();
91                        }
92                    }
93                    Err(e)=>{error!("market_event_rx error : {e}")}
94                }
95            },
96
97            msg = tx_compose_channel_rx.recv() => {
98                let tx_compose_update : Result<MessageTxCompose, RecvError>  = msg;
99                match tx_compose_update {
100                    Ok(tx_compose_msg)=>{
101                        if let TxComposeMessageType::Sign(sign_request_data)= tx_compose_msg.inner {
102                            if let Some(swap) = sign_request_data.swap {
103                                let pool_address_vec =  swap.get_pool_address_vec();
104                                let now = chrono::Local::now();
105                                for pool_address in pool_address_vec {
106                                    if now - check_time_map.get(&pool_address).cloned().unwrap_or(DateTime::<Local>::MIN_UTC.into()) > Duration::seconds(60) {
107                                        check_time_map.insert(pool_address, Local::now());
108                                        if !pool_address_to_verify_vec.contains(&pool_address){
109                                            pool_address_to_verify_vec.push(pool_address)
110                                        }
111                                    }
112                                }
113
114                            }
115                        }
116
117                    }
118                    Err(e)=>{
119                        error!("tx_compose_channel_rx : {e}")
120                    }
121                }
122
123            }
124
125        }
126    }
127}
128
129#[derive(Accessor, Consumer)]
130pub struct StateHealthMonitorActor<P, DB: Clone + Send + Sync + 'static> {
131    client: P,
132    #[accessor]
133    market_state: Option<SharedState<MarketState<DB>>>,
134    #[consumer]
135    tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose>>,
136    #[consumer]
137    market_events_rx: Option<Broadcaster<MarketEvents>>,
138}
139
140impl<P, DB> StateHealthMonitorActor<P, DB>
141where
142    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
143    DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + Default + 'static,
144{
145    pub fn new(client: P) -> Self {
146        StateHealthMonitorActor { client, market_state: None, tx_compose_channel_rx: None, market_events_rx: None }
147    }
148
149    pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
150        Self {
151            market_state: Some(state.market_state()),
152            tx_compose_channel_rx: Some(bc.tx_compose_channel()),
153            market_events_rx: Some(bc.market_events_channel()),
154            ..self
155        }
156    }
157}
158
159impl<P, DB> Actor for StateHealthMonitorActor<P, DB>
160where
161    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
162    DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + 'static,
163{
164    fn start(&self) -> ActorResult {
165        let task = tokio::task::spawn(state_health_monitor_worker(
166            self.client.clone(),
167            self.market_state.clone().unwrap(),
168            self.tx_compose_channel_rx.clone().unwrap(),
169            self.market_events_rx.clone().unwrap(),
170        ));
171        Ok(vec![task])
172    }
173
174    fn name(&self) -> &'static str {
175        "StateHealthMonitorActor"
176    }
177}