kabu_metrics/
influxdb_actor.rs

1use async_trait::async_trait;
2use eyre::eyre;
3use influxdb::{Client, ReadQuery, WriteQuery};
4use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
5use kabu_core_actors_macros::Consumer;
6use kabu_core_blockchain::Blockchain;
7use std::collections::HashMap;
8use std::time::Duration;
9use tokio::time::timeout;
10use tracing::{error, info, warn};
11
12pub async fn start_influxdb_worker(
13    url: String,
14    database: String,
15    tags: HashMap<String, String>,
16    event_receiver: Broadcaster<WriteQuery>,
17) -> WorkerResult {
18    let client = Client::new(url, database.clone());
19    let create_db_stmt = format!("CREATE DATABASE {database}");
20    let result = client.query(ReadQuery::new(create_db_stmt)).await;
21    match result {
22        Ok(_) => info!("Database created with name: {}", database),
23        Err(e) => info!("Database creation failed or already exists: {:?}", e),
24    }
25    let mut event_receiver = event_receiver.subscribe();
26    loop {
27        let event_result = event_receiver.recv().await;
28        match event_result {
29            Ok(mut event) => {
30                for (key, value) in tags.iter() {
31                    event = event.add_tag(key, value.clone());
32                }
33                let client_clone = client.clone();
34                tokio::task::spawn(async move {
35                    match timeout(Duration::from_millis(2000), client_clone.query(event)).await {
36                        Ok(inner_result) => {
37                            if let Err(e) = inner_result {
38                                error!("InfluxDB Write failed: {:?}", e);
39                            }
40                        }
41                        Err(elapsed) => {
42                            error!("InfluxDB Query timed out: {}", elapsed);
43                        }
44                    }
45                });
46            }
47            Err(e) => match e {
48                tokio::sync::broadcast::error::RecvError::Closed => {
49                    error!("InfluxDB channel closed");
50                    return Err(eyre!("INFLUXDB_CHANNEL_CLOSED"));
51                }
52                tokio::sync::broadcast::error::RecvError::Lagged(lagged) => {
53                    warn!("InfluxDB lagged: {:?}", lagged);
54                    continue;
55                }
56            },
57        }
58    }
59}
60
61#[derive(Consumer)]
62pub struct InfluxDbWriterActor {
63    url: String,
64    database: String,
65    tags: HashMap<String, String>,
66    #[consumer]
67    influxdb_write_channel_rx: Option<Broadcaster<WriteQuery>>,
68}
69
70impl InfluxDbWriterActor {
71    pub fn new(url: String, database: String, tags: HashMap<String, String>) -> Self {
72        Self { url, database, tags, influxdb_write_channel_rx: None }
73    }
74
75    pub fn on_bc(self, bc: &Blockchain) -> Self {
76        Self { influxdb_write_channel_rx: Some(bc.influxdb_write_channel()), ..self }
77    }
78}
79
80#[async_trait]
81impl Actor for InfluxDbWriterActor {
82    fn start(&self) -> ActorResult {
83        let influxdb_write_channel_rx = match &self.influxdb_write_channel_rx {
84            Some(rx) => rx.clone(),
85            None => {
86                error!("InfluxDB write channel is not set.");
87                return Err(eyre!("INFLUXDB_WRITE_CHANNEL_NOT_SET"));
88            }
89        };
90        let task = tokio::task::spawn(start_influxdb_worker(
91            self.url.clone(),
92            self.database.clone(),
93            self.tags.clone(),
94            influxdb_write_channel_rx.clone(),
95        ));
96        Ok(vec![task])
97    }
98
99    fn name(&self) -> &'static str {
100        "InfluxDbWriterActor"
101    }
102}