kabu_defi_health_monitor/
pool_health_monitor.rs1use alloy_primitives::Address;
2use eyre::Result;
3use influxdb::{Timestamp, WriteQuery};
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6use tokio::sync::broadcast::error::RecvError;
7use tracing::{debug, error, info};
8
9use kabu_core_actors::{subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
10use kabu_core_actors_macros::{Accessor, Consumer, Producer};
11use kabu_core_blockchain::Blockchain;
12use kabu_defi_address_book::TokenAddressEth;
13use kabu_types_entities::{Market, PoolId, PoolProtocol};
14use kabu_types_events::{HealthEvent, MessageHealthEvent};
15use lazy_static::lazy_static;
16
17lazy_static! {
18 static ref TRUSTED_TOKENS: HashSet<Address> = HashSet::from_iter(vec![
19 TokenAddressEth::WETH,
20 TokenAddressEth::USDC,
21 TokenAddressEth::USDT,
22 TokenAddressEth::STETH,
23 TokenAddressEth::WSTETH,
24 TokenAddressEth::WBTC,
25 TokenAddressEth::CRV,
26 TokenAddressEth::DAI
27 ]);
28}
29
30pub async fn pool_health_monitor_worker(
31 market: SharedState<Market>,
32 pool_health_monitor_rx: Broadcaster<MessageHealthEvent>,
33 influx_channel_tx: Broadcaster<WriteQuery>,
34) -> WorkerResult {
35 subscribe!(pool_health_monitor_rx);
36
37 let mut pool_errors_map: HashMap<PoolId, u32> = HashMap::new();
38 loop {
41 tokio::select! {
42 msg = pool_health_monitor_rx.recv() => {
43
44 let pool_health_update : Result<MessageHealthEvent, RecvError> = msg;
45 match pool_health_update {
46 Ok(pool_health_message)=>{
47 match pool_health_message.inner {
48 HealthEvent::SwapLineEstimationError(estimate_error) => {
49 debug!("SwapPath health_monitor message update: {} path : {}", estimate_error.msg, estimate_error.swap_path);
50 let start_time=std::time::Instant::now();
56 let mut market_guard = market.write().await;
57 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
58 let is_ok = market_guard.set_path_disabled(&estimate_error.swap_path, true);
59 info!("Disabling path : swap_path={} msg={} hash={:#?} ok={}", estimate_error.swap_path, estimate_error.msg, estimate_error.swap_path.get_hash(),is_ok);
60
61
62
63 for (idx, pool) in estimate_error.swap_path.pools.iter().enumerate() {
64 let tokens = [estimate_error.swap_path.tokens[idx].get_address(), estimate_error.swap_path.tokens[idx+1].get_address()];
65 if pool.get_protocol() == PoolProtocol::UniswapV2Like || pool.get_protocol() == PoolProtocol::UniswapV3Like {
66 let pool_id = pool.get_pool_id();
68
69
70
71 market_guard.set_pool_disabled(&pool_id, &tokens[0], &tokens[1], true);
73
74 match market_guard.get_pool(&pool_id) {
75 Some(pool)=>{
76 info!("Disabling pool: protocol={}, pool_id={}, msg=ESTIMATION_FAILED", pool.get_protocol(),pool_id);
77
78 let amount_f64 = -2.0f64;
79 let pool_protocol = pool.get_protocol().to_string();
80 let pool_id = pool.get_pool_id().to_string();
81 let influx_channel_clone = influx_channel_tx.clone();
82
83 if let Err(e) = tokio::time::timeout(
84 Duration::from_secs(1),
85 async move {
86 let start_time_utc = chrono::Utc::now();
87
88 let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "pool_disabled")
89 .add_field("message", "ESTIMATION_FAILED")
90 .add_field("amount", amount_f64)
91 .add_tag("id", pool_id)
92 .add_tag("protocol", pool_protocol)
93 .add_tag("token_from", tokens[0].to_string())
94 .add_tag("token_to", tokens[1].to_string());
95
96 if let Err(e) = influx_channel_clone.send(write_query) {
97 error!("Failed to failed pool to influxdb: {:?}", e);
98 }
99 }
100 ).await {
101 error!("Failed to send failed pool info to influxdb: {:?}", e);
102 }
103 }
104 _=>{
105 error!("Disabled pool missing in market: address={}", pool_id);
106 }
107 }
108 }
110
111
112
113 }
114
115
116 drop(market_guard);
117 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write released");
118
119 }
122 HealthEvent::PoolSwapError(swap_error)=>{
123 debug!("Pool health_monitor message update: {:?} {} {} ", swap_error.pool, swap_error.msg, swap_error.amount);
124 let entry = pool_errors_map.entry(swap_error.pool).or_insert(0);
125 *entry += 1;
126 if *entry >= 10 {
127 let start_time=std::time::Instant::now();
128 let mut market_guard = market.write().await;
129 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
130
131 market_guard.set_pool_disabled(&swap_error.pool, &swap_error.token_from, &swap_error.token_to, true);
133
134
135 match market_guard.get_pool(&swap_error.pool) {
136 Some(pool)=>{
137 info!("Disabling pool: protocol={}, address={:?}, msg={} amount={}", pool.get_protocol(),swap_error.pool, swap_error.msg, swap_error.amount);
138
139 let amount_f64 = if let Some(token_in) = market_guard.get_token(&swap_error.token_from) {
140 token_in.to_float(swap_error.amount)
141 } else {
142 -1.0f64
143 };
144
145 let pool_protocol = pool.get_protocol().to_string();
146 let pool_id = pool.get_pool_id().to_string();
147 let influx_channel_clone = influx_channel_tx.clone();
148
149 if let Err(e) = tokio::time::timeout(
150 Duration::from_secs(1),
151 async move {
152 let start_time_utc = chrono::Utc::now();
153
154 let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "pool_disabled")
155 .add_field("message", swap_error.msg)
156 .add_field("amount", amount_f64)
157 .add_tag("id", pool_id)
158 .add_tag("protocol", pool_protocol)
159 .add_tag("token_from", swap_error.token_from.to_string())
160 .add_tag("token_to", swap_error.token_to.to_string());
161
162 if let Err(e) = influx_channel_clone.send(write_query) {
163 error!("Failed to failed pool to influxdb: {:?}", e);
164 }
165 }
166 ).await {
167 error!("Failed to send failed pool info to influxdb: {:?}", e);
168 }
169
170
171
172 }
173 _=>{
174 error!("Disabled pool missing in market: address={:?}, msg={} amount={}", swap_error.pool, swap_error.msg, swap_error.amount);
175 }
176 }
177 drop(market_guard);
180 debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write released");
181
182 }
183 }
184 _=>{}
185 }
186 }
187 Err(e)=>{
188 error!("pool_health_update error {}", e)
189 }
190 }
191
192 }
193 }
194 }
195}
196
197#[derive(Accessor, Consumer, Producer, Default)]
198pub struct PoolHealthMonitorActor {
199 #[accessor]
200 market: Option<SharedState<Market>>,
201 #[consumer]
202 pool_health_update_rx: Option<Broadcaster<MessageHealthEvent>>,
203 #[producer]
204 influxdb_tx: Option<Broadcaster<WriteQuery>>,
205}
206
207impl PoolHealthMonitorActor {
208 pub fn new() -> Self {
209 PoolHealthMonitorActor::default()
210 }
211
212 pub fn on_bc(self, bc: &Blockchain) -> Self {
213 Self {
214 market: Some(bc.market()),
215 pool_health_update_rx: Some(bc.health_monitor_channel()),
216 influxdb_tx: Some(bc.influxdb_write_channel()),
217 }
218 }
219}
220
221impl Actor for PoolHealthMonitorActor {
222 fn start(&self) -> ActorResult {
223 let task = tokio::task::spawn(pool_health_monitor_worker(
224 self.market.clone().unwrap(),
225 self.pool_health_update_rx.clone().unwrap(),
226 self.influxdb_tx.clone().unwrap(),
227 ));
228 Ok(vec![task])
229 }
230
231 fn name(&self) -> &'static str {
232 "PoolHealthMonitorActor"
233 }
234}