kabu_types_entities/
datafetcher.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::hash::Hash;
4use std::sync::Arc;
5
6use eyre::Result;
7use tokio::sync::RwLock;
8use tracing::error;
9
10use crate::FetchState::Fetching;
11
12#[derive(Debug, Clone)]
13pub enum FetchState<T: Clone> {
14 Fetching(Arc<RwLock<Option<T>>>),
15 Ready(T),
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct DataFetcher<K, V>
20where
21 K: Clone + Default + Eq + PartialEq + Hash + Send + Sync + 'static,
22 V: Clone + Default + Send + Sync + 'static,
23{
24 data: HashMap<K, FetchState<V>>,
25}
26
27impl<K, V> DataFetcher<K, V>
28where
29 K: Clone + Default + Eq + PartialEq + Hash + Send + Sync + 'static,
30 V: Clone + Default + Send + Sync + 'static,
31{
32 pub fn new() -> Self {
33 Self {
34 data: HashMap::new(),
36 }
37 }
38
39 pub async fn fetch<F, Fut>(&mut self, key: K, fx: F) -> FetchState<V>
40 where
41 F: FnOnce(K) -> Fut + Send + 'static,
42 Fut: Future<Output = Result<V>> + Send + 'static,
43 {
44 if let Some(x) = self.data.get(&key) {
45 return x.clone();
46 }
47
48 let lock: Arc<RwLock<Option<V>>> = Arc::new(RwLock::new(None));
49
50 let lock_clone = lock.clone();
51 self.data.insert(key.clone(), Fetching(lock.clone()));
52
53 let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
54
55 tokio::task::spawn(async move {
56 let mut write_guard = lock_clone.write().await;
57 if let Err(e) = tx.send(true) {
58 error!("{}", e)
59 }
60 let fetched_data = fx(key).await;
61
62 match fetched_data {
63 Ok(v) => {
64 *write_guard = Some(v);
65 }
66 _ => {
67 *write_guard = None;
68 }
69 }
70 });
71
72 if let Err(e) = rx.await {
73 error!("{}", e)
74 };
75 Fetching(lock)
76 }
77
78 pub async fn get<F, Fut>(&mut self, key: K, fx: F) -> Result<Option<V>>
79 where
80 F: FnOnce(K) -> Fut + Send + 'static,
81 Fut: Future<Output = Result<V>> + Send + 'static,
82 {
83 match self.fetch(key.clone(), fx).await {
84 Fetching(lock) => {
85 let ret = lock.read().await;
86 Ok(ret.clone())
87 }
88 FetchState::Ready(v) => Ok(Some(v)),
89 }
90 }
91}