kabu_defi_health_monitor/
state_health_monitor.rs1use std::collections::HashMap;
2
3use alloy_eips::BlockNumberOrTag;
4use alloy_network::Ethereum;
5use alloy_provider::Provider;
6use chrono::{DateTime, Duration, Local};
7use eyre::Result;
8use tokio::sync::broadcast::error::RecvError;
9use tokio::sync::broadcast::Receiver;
10use tracing::{error, info, warn};
11
12use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, SharedState, WorkerResult};
13use kabu_core_actors_macros::{Accessor, Consumer};
14use kabu_core_blockchain::{Blockchain, BlockchainState};
15use kabu_evm_db::DatabaseKabuExt;
16use kabu_types_blockchain::KabuDataTypes;
17use kabu_types_entities::{MarketState, PoolId};
18use kabu_types_events::{MarketEvents, MessageTxCompose, TxComposeMessageType};
19use revm::DatabaseRef;
20
21async fn verify_pool_state_task<P: Provider<Ethereum> + 'static, DB: DatabaseKabuExt>(
22 client: P,
23 address: PoolId,
24 market_state: SharedState<MarketState<DB>>,
25) -> Result<()> {
26 info!("Verifying state {address:?}");
27 let address = match address {
28 PoolId::Address(addr) => addr,
29 PoolId::B256(_) => return Err(eyre::eyre!("B256 pool ID not supported for verification")),
30 };
31 let account = market_state.write().await.state_db.load_account(address).cloned()?;
32 let read_only_cell_hash_set = market_state.read().await.config.read_only_cells.get(&address).cloned().unwrap_or_default();
33
34 for (cell, current_value) in account.storage.iter() {
35 if read_only_cell_hash_set.contains(cell) {
36 continue;
37 }
38 match client.get_storage_at(address, *cell).block_id(BlockNumberOrTag::Latest.into()).await {
39 Ok(actual_value) => {
40 if actual_value.is_zero() {
41 continue;
42 }
43 if *current_value != actual_value {
44 warn!("verify : account storage is different : {address:?} {cell:?} {current_value:#32x} -> {actual_value:#32x} storage size : {}", account.storage.len());
45 if let Err(e) = market_state.write().await.state_db.insert_account_storage(address, *cell, actual_value) {
46 error!("{e}");
47 }
48 }
49 }
50 Err(e) => {
51 error!("Cannot read storage {:?} {:?} : {}", account, cell, e)
52 }
53 }
54 }
55
56 Ok(())
57}
58
59pub async fn state_health_monitor_worker<
60 P: Provider<Ethereum> + Clone + 'static,
61 DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + 'static,
62>(
63 client: P,
64 market_state: SharedState<MarketState<DB>>,
65 tx_compose_channel_rx: Broadcaster<MessageTxCompose>,
66 market_events_rx: Broadcaster<MarketEvents>,
67) -> WorkerResult {
68 let mut tx_compose_channel_rx: Receiver<MessageTxCompose> = tx_compose_channel_rx.subscribe();
69 let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe();
70
71 let mut check_time_map: HashMap<PoolId, DateTime<Local>> = HashMap::new();
72 let mut pool_address_to_verify_vec: Vec<PoolId> = Vec::new();
73
74 loop {
75 tokio::select! {
76 msg = market_events_rx.recv() => {
77 let market_event_msg : Result<MarketEvents, RecvError> = msg;
78 match market_event_msg {
79 Ok(market_event)=>{
80 if matches!(market_event, MarketEvents::BlockStateUpdate{..}) {
81 for pool_address in pool_address_to_verify_vec {
82 tokio::task::spawn(
83 verify_pool_state_task(
84 client.clone(),
85 pool_address,
86 market_state.clone()
87 )
88 );
89 }
90 pool_address_to_verify_vec = Vec::new();
91 }
92 }
93 Err(e)=>{error!("market_event_rx error : {e}")}
94 }
95 },
96
97 msg = tx_compose_channel_rx.recv() => {
98 let tx_compose_update : Result<MessageTxCompose, RecvError> = msg;
99 match tx_compose_update {
100 Ok(tx_compose_msg)=>{
101 if let TxComposeMessageType::Sign(sign_request_data)= tx_compose_msg.inner {
102 if let Some(swap) = sign_request_data.swap {
103 let pool_address_vec = swap.get_pool_address_vec();
104 let now = chrono::Local::now();
105 for pool_address in pool_address_vec {
106 if now - check_time_map.get(&pool_address).cloned().unwrap_or(DateTime::<Local>::MIN_UTC.into()) > Duration::seconds(60) {
107 check_time_map.insert(pool_address, Local::now());
108 if !pool_address_to_verify_vec.contains(&pool_address){
109 pool_address_to_verify_vec.push(pool_address)
110 }
111 }
112 }
113
114 }
115 }
116
117 }
118 Err(e)=>{
119 error!("tx_compose_channel_rx : {e}")
120 }
121 }
122
123 }
124
125 }
126 }
127}
128
129#[derive(Accessor, Consumer)]
130pub struct StateHealthMonitorActor<P, DB: Clone + Send + Sync + 'static> {
131 client: P,
132 #[accessor]
133 market_state: Option<SharedState<MarketState<DB>>>,
134 #[consumer]
135 tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose>>,
136 #[consumer]
137 market_events_rx: Option<Broadcaster<MarketEvents>>,
138}
139
140impl<P, DB> StateHealthMonitorActor<P, DB>
141where
142 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
143 DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + Default + 'static,
144{
145 pub fn new(client: P) -> Self {
146 StateHealthMonitorActor { client, market_state: None, tx_compose_channel_rx: None, market_events_rx: None }
147 }
148
149 pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain, state: &BlockchainState<DB, LDT>) -> Self {
150 Self {
151 market_state: Some(state.market_state()),
152 tx_compose_channel_rx: Some(bc.tx_compose_channel()),
153 market_events_rx: Some(bc.market_events_channel()),
154 ..self
155 }
156 }
157}
158
159impl<P, DB> Actor for StateHealthMonitorActor<P, DB>
160where
161 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
162 DB: DatabaseRef + DatabaseKabuExt + Send + Sync + Clone + 'static,
163{
164 fn start(&self) -> ActorResult {
165 let task = tokio::task::spawn(state_health_monitor_worker(
166 self.client.clone(),
167 self.market_state.clone().unwrap(),
168 self.tx_compose_channel_rx.clone().unwrap(),
169 self.market_events_rx.clone().unwrap(),
170 ));
171 Ok(vec![task])
172 }
173
174 fn name(&self) -> &'static str {
175 "StateHealthMonitorActor"
176 }
177}