kabu_metrics/
influxdb_actor.rs1use async_trait::async_trait;
2use eyre::eyre;
3use influxdb::{Client, ReadQuery, WriteQuery};
4use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
5use kabu_core_actors_macros::Consumer;
6use kabu_core_blockchain::Blockchain;
7use std::collections::HashMap;
8use std::time::Duration;
9use tokio::time::timeout;
10use tracing::{error, info, warn};
11
12pub async fn start_influxdb_worker(
13 url: String,
14 database: String,
15 tags: HashMap<String, String>,
16 event_receiver: Broadcaster<WriteQuery>,
17) -> WorkerResult {
18 let client = Client::new(url, database.clone());
19 let create_db_stmt = format!("CREATE DATABASE {database}");
20 let result = client.query(ReadQuery::new(create_db_stmt)).await;
21 match result {
22 Ok(_) => info!("Database created with name: {}", database),
23 Err(e) => info!("Database creation failed or already exists: {:?}", e),
24 }
25 let mut event_receiver = event_receiver.subscribe();
26 loop {
27 let event_result = event_receiver.recv().await;
28 match event_result {
29 Ok(mut event) => {
30 for (key, value) in tags.iter() {
31 event = event.add_tag(key, value.clone());
32 }
33 let client_clone = client.clone();
34 tokio::task::spawn(async move {
35 match timeout(Duration::from_millis(2000), client_clone.query(event)).await {
36 Ok(inner_result) => {
37 if let Err(e) = inner_result {
38 error!("InfluxDB Write failed: {:?}", e);
39 }
40 }
41 Err(elapsed) => {
42 error!("InfluxDB Query timed out: {}", elapsed);
43 }
44 }
45 });
46 }
47 Err(e) => match e {
48 tokio::sync::broadcast::error::RecvError::Closed => {
49 error!("InfluxDB channel closed");
50 return Err(eyre!("INFLUXDB_CHANNEL_CLOSED"));
51 }
52 tokio::sync::broadcast::error::RecvError::Lagged(lagged) => {
53 warn!("InfluxDB lagged: {:?}", lagged);
54 continue;
55 }
56 },
57 }
58 }
59}
60
61#[derive(Consumer)]
62pub struct InfluxDbWriterActor {
63 url: String,
64 database: String,
65 tags: HashMap<String, String>,
66 #[consumer]
67 influxdb_write_channel_rx: Option<Broadcaster<WriteQuery>>,
68}
69
70impl InfluxDbWriterActor {
71 pub fn new(url: String, database: String, tags: HashMap<String, String>) -> Self {
72 Self { url, database, tags, influxdb_write_channel_rx: None }
73 }
74
75 pub fn on_bc(self, bc: &Blockchain) -> Self {
76 Self { influxdb_write_channel_rx: Some(bc.influxdb_write_channel()), ..self }
77 }
78}
79
80#[async_trait]
81impl Actor for InfluxDbWriterActor {
82 fn start(&self) -> ActorResult {
83 let influxdb_write_channel_rx = match &self.influxdb_write_channel_rx {
84 Some(rx) => rx.clone(),
85 None => {
86 error!("InfluxDB write channel is not set.");
87 return Err(eyre!("INFLUXDB_WRITE_CHANNEL_NOT_SET"));
88 }
89 };
90 let task = tokio::task::spawn(start_influxdb_worker(
91 self.url.clone(),
92 self.database.clone(),
93 self.tags.clone(),
94 influxdb_write_channel_rx.clone(),
95 ));
96 Ok(vec![task])
97 }
98
99 fn name(&self) -> &'static str {
100 "InfluxDbWriterActor"
101 }
102}