kabu_types_entities/
datafetcher.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::hash::Hash;
4use std::sync::Arc;
5
6use eyre::Result;
7use tokio::sync::RwLock;
8use tracing::error;
9
10use crate::FetchState::Fetching;
11
12#[derive(Debug, Clone)]
13pub enum FetchState<T: Clone> {
14    Fetching(Arc<RwLock<Option<T>>>),
15    Ready(T),
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct DataFetcher<K, V>
20where
21    K: Clone + Default + Eq + PartialEq + Hash + Send + Sync + 'static,
22    V: Clone + Default + Send + Sync + 'static,
23{
24    data: HashMap<K, FetchState<V>>,
25}
26
27impl<K, V> DataFetcher<K, V>
28where
29    K: Clone + Default + Eq + PartialEq + Hash + Send + Sync + 'static,
30    V: Clone + Default + Send + Sync + 'static,
31{
32    pub fn new() -> Self {
33        Self {
34            //data : Arc::new(RwLock::new(HashMap::new())),
35            data: HashMap::new(),
36        }
37    }
38
39    pub async fn fetch<F, Fut>(&mut self, key: K, fx: F) -> FetchState<V>
40    where
41        F: FnOnce(K) -> Fut + Send + 'static,
42        Fut: Future<Output = Result<V>> + Send + 'static,
43    {
44        if let Some(x) = self.data.get(&key) {
45            return x.clone();
46        }
47
48        let lock: Arc<RwLock<Option<V>>> = Arc::new(RwLock::new(None));
49
50        let lock_clone = lock.clone();
51        self.data.insert(key.clone(), Fetching(lock.clone()));
52
53        let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
54
55        tokio::task::spawn(async move {
56            let mut write_guard = lock_clone.write().await;
57            if let Err(e) = tx.send(true) {
58                error!("{}", e)
59            }
60            let fetched_data = fx(key).await;
61
62            match fetched_data {
63                Ok(v) => {
64                    *write_guard = Some(v);
65                }
66                _ => {
67                    *write_guard = None;
68                }
69            }
70        });
71
72        if let Err(e) = rx.await {
73            error!("{}", e)
74        };
75        Fetching(lock)
76    }
77
78    pub async fn get<F, Fut>(&mut self, key: K, fx: F) -> Result<Option<V>>
79    where
80        F: FnOnce(K) -> Fut + Send + 'static,
81        Fut: Future<Output = Result<V>> + Send + 'static,
82    {
83        match self.fetch(key.clone(), fx).await {
84            Fetching(lock) => {
85                let ret = lock.read().await;
86                Ok(ret.clone())
87            }
88            FetchState::Ready(v) => Ok(Some(v)),
89        }
90    }
91}