kabu_defi_health_monitor/
pool_health_monitor.rs

1use alloy_primitives::Address;
2use eyre::Result;
3use influxdb::{Timestamp, WriteQuery};
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6use tokio::sync::broadcast::error::RecvError;
7use tracing::{debug, error, info};
8
9use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
10use kabu_core_actors_macros::{Accessor, Consumer, Producer};
11use kabu_core_blockchain::Blockchain;
12use kabu_defi_address_book::TokenAddressEth;
13use kabu_types_entities::{Market, PoolId, PoolProtocol};
14use kabu_types_events::{HealthEvent, MessageHealthEvent};
15use lazy_static::lazy_static;
16
17lazy_static! {
18    static ref TRUSTED_TOKENS: HashSet<Address> = HashSet::from_iter(vec![
19        TokenAddressEth::WETH,
20        TokenAddressEth::USDC,
21        TokenAddressEth::USDT,
22        TokenAddressEth::STETH,
23        TokenAddressEth::WSTETH,
24        TokenAddressEth::WBTC,
25        TokenAddressEth::CRV,
26        TokenAddressEth::DAI
27    ]);
28}
29
30pub async fn pool_health_monitor_worker(
31    market: SharedState<Market>,
32    pool_health_monitor_rx: Broadcaster<MessageHealthEvent>,
33    influx_channel_tx: Broadcaster<WriteQuery>,
34) -> WorkerResult {
35    subscribe!(pool_health_monitor_rx);
36
37    let mut pool_errors_map: HashMap<PoolId, u32> = HashMap::new();
38    //let mut estimate_errors_map: HashMap<u64, u32> = HashMap::new();
39
40    loop {
41        tokio::select! {
42                    msg = pool_health_monitor_rx.recv() => {
43
44                        let pool_health_update : Result<MessageHealthEvent, RecvError>  = msg;
45                        match pool_health_update {
46                            Ok(pool_health_message)=>{
47                                match pool_health_message.inner {
48                                    HealthEvent::SwapLineEstimationError(estimate_error) => {
49                                        debug!("SwapPath health_monitor message update: {} path : {}", estimate_error.msg, estimate_error.swap_path);
50        //                                let entry = estimate_errors_map.entry(estimate_error.swap_path.get_hash()).or_insert(0);
51        //                                *entry += 1;
52        //                                if *entry >= 10 {
53                                            //info!("Disabling path : swap_path={} msg={} counter={}", estimate_error.swap_path, estimate_error.msg, entry);
54
55                                            let start_time=std::time::Instant::now();
56                                            let mut market_guard = market.write().await;
57                                            debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
58                                            let is_ok = market_guard.set_path_disabled(&estimate_error.swap_path, true);
59                                            info!("Disabling path : swap_path={} msg={} hash={:#?} ok={}", estimate_error.swap_path, estimate_error.msg, estimate_error.swap_path.get_hash(),is_ok);
60
61
62
63                                            for (idx, pool) in estimate_error.swap_path.pools.iter().enumerate() {
64                                                let tokens = [estimate_error.swap_path.tokens[idx].get_address(), estimate_error.swap_path.tokens[idx+1].get_address()];
65                                                if pool.get_protocol() == PoolProtocol::UniswapV2Like || pool.get_protocol() == PoolProtocol::UniswapV3Like {
66                                                //tokens.iter().any(|token_address| !TRUSTED_TOKENS.contains(token_address) ) {
67                                                    let pool_id = pool.get_pool_id();
68
69
70
71                                                    //if !market_guard.is_pool_disabled(&pool_id) {
72                                                            market_guard.set_pool_disabled(&pool_id, &tokens[0], &tokens[1], true);
73
74                                                            match market_guard.get_pool(&pool_id) {
75                                                                Some(pool)=>{
76                                                                    info!("Disabling pool: protocol={}, pool_id={}, msg=ESTIMATION_FAILED", pool.get_protocol(),pool_id);
77
78                                                                    let amount_f64 = -2.0f64;
79                                                                    let pool_protocol = pool.get_protocol().to_string();
80                                                                    let pool_id = pool.get_pool_id().to_string();
81                                                                    let influx_channel_clone = influx_channel_tx.clone();
82
83                                                                    if let Err(e) = tokio::time::timeout(
84                                                                        Duration::from_secs(1),
85                                                                        async move {
86                                                                            let start_time_utc =   chrono::Utc::now();
87
88                                                                            let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "pool_disabled")
89                                                                                .add_field("message", "ESTIMATION_FAILED")
90                                                                                .add_field("amount", amount_f64)
91                                                                                .add_tag("id", pool_id)
92                                                                                .add_tag("protocol", pool_protocol)
93                                                                                .add_tag("token_from", tokens[0].to_string())
94                                                                                .add_tag("token_to", tokens[1].to_string());
95
96                                                                            if let Err(e) = influx_channel_clone.send(write_query) {
97                                                                               error!("Failed to failed pool to influxdb: {:?}", e);
98                                                                            }
99                                                                        }
100                                                                    ).await {
101                                                                        error!("Failed to send failed pool info to influxdb: {:?}", e);
102                                                                    }
103                                                                }
104                                                                _=>{
105                                                                    error!("Disabled pool missing in market: address={}", pool_id);
106                                                                }
107                                                            }
108                                                        //}
109                                                }
110
111
112
113                                            }
114
115
116                                            drop(market_guard);
117                                            debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write released");
118
119        //                                }
120
121                                    }
122                                    HealthEvent::PoolSwapError(swap_error)=>{
123                                        debug!("Pool health_monitor message update: {:?} {} {} ", swap_error.pool, swap_error.msg, swap_error.amount);
124                                        let entry = pool_errors_map.entry(swap_error.pool).or_insert(0);
125                                        *entry += 1;
126                                        if *entry >= 10 {
127                                            let start_time=std::time::Instant::now();
128                                            let mut market_guard = market.write().await;
129                                            debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
130
131                                            //if !market_guard.is_pool_disabled(&swap_error.pool) {
132                                                market_guard.set_pool_disabled(&swap_error.pool, &swap_error.token_from, &swap_error.token_to, true);
133
134
135                                                match market_guard.get_pool(&swap_error.pool) {
136                                                    Some(pool)=>{
137                                                        info!("Disabling pool: protocol={}, address={:?}, msg={} amount={}", pool.get_protocol(),swap_error.pool, swap_error.msg, swap_error.amount);
138
139                                                        let amount_f64 = if let Some(token_in) = market_guard.get_token(&swap_error.token_from) {
140                                                            token_in.to_float(swap_error.amount)
141                                                        } else {
142                                                            -1.0f64
143                                                        };
144
145                                                        let pool_protocol = pool.get_protocol().to_string();
146                                                        let pool_id = pool.get_pool_id().to_string();
147                                                        let influx_channel_clone = influx_channel_tx.clone();
148
149                                                        if let Err(e) = tokio::time::timeout(
150                                                            Duration::from_secs(1),
151                                                            async move {
152                                                                let start_time_utc =   chrono::Utc::now();
153
154                                                                let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "pool_disabled")
155                                                                    .add_field("message", swap_error.msg)
156                                                                    .add_field("amount", amount_f64)
157                                                                    .add_tag("id", pool_id)
158                                                                    .add_tag("protocol", pool_protocol)
159                                                                    .add_tag("token_from", swap_error.token_from.to_string())
160                                                                    .add_tag("token_to", swap_error.token_to.to_string());
161
162                                                                if let Err(e) = influx_channel_clone.send(write_query) {
163                                                                   error!("Failed to failed pool to influxdb: {:?}", e);
164                                                                }
165                                                            }
166                                                        ).await {
167                                                            error!("Failed to send failed pool info to influxdb: {:?}", e);
168                                                        }
169
170
171
172                                                    }
173                                                    _=>{
174                                                        error!("Disabled pool missing in market: address={:?}, msg={} amount={}", swap_error.pool, swap_error.msg, swap_error.amount);
175                                                    }
176                                                }
177                                            //}
178
179                                            drop(market_guard);
180                                            debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write released");
181
182                                        }
183                                    }
184                                    _=>{}
185                                }
186                            }
187                            Err(e)=>{
188                                error!("pool_health_update error {}", e)
189                            }
190                        }
191
192                    }
193                }
194    }
195}
196
197#[derive(Accessor, Consumer, Producer, Default)]
198pub struct PoolHealthMonitorActor {
199    #[accessor]
200    market: Option<SharedState<Market>>,
201    #[consumer]
202    pool_health_update_rx: Option<Broadcaster<MessageHealthEvent>>,
203    #[producer]
204    influxdb_tx: Option<Broadcaster<WriteQuery>>,
205}
206
207impl PoolHealthMonitorActor {
208    pub fn new() -> Self {
209        PoolHealthMonitorActor::default()
210    }
211
212    pub fn on_bc(self, bc: &Blockchain) -> Self {
213        Self {
214            market: Some(bc.market()),
215            pool_health_update_rx: Some(bc.health_monitor_channel()),
216            influxdb_tx: Some(bc.influxdb_write_channel()),
217        }
218    }
219}
220
221impl Actor for PoolHealthMonitorActor {
222    fn start(&self) -> ActorResult {
223        let task = tokio::task::spawn(pool_health_monitor_worker(
224            self.market.clone().unwrap(),
225            self.pool_health_update_rx.clone().unwrap(),
226            self.influxdb_tx.clone().unwrap(),
227        ));
228        Ok(vec![task])
229    }
230
231    fn name(&self) -> &'static str {
232        "PoolHealthMonitorActor"
233    }
234}