kabu_core_actors/
actor.rs

1use crate::channels::Broadcaster;
2use crate::shared_state::SharedState;
3use eyre::{eyre, Result};
4use tokio::task::JoinHandle;
5use tracing::info;
6
7pub type WorkerResult = Result<String>;
8
9pub type ActorResult = Result<Vec<JoinHandle<WorkerResult>>>;
10
11pub trait Actor {
12    fn wait(&self, handles: ActorResult) -> Result<()> {
13        let handles = handles?;
14        let actor_name = self.name();
15        futures::executor::block_on(async {
16            for handle in handles {
17                match handle.await {
18                    Ok(result) => match result {
19                        Ok(msg) => info!("One-shot actor '{}' completed with message: {}", actor_name, msg),
20                        Err(e) => return Err(eyre!("Actor '{}' failed with error: {}", actor_name, e)),
21                    },
22                    Err(e) => return Err(eyre!("Actor task execution failed for '{}' with error: {}", actor_name, e)),
23                }
24            }
25            Ok(())
26        })
27    }
28
29    fn start_and_wait(&self) -> Result<()> {
30        let handles = self.start();
31        self.wait(handles)
32    }
33
34    fn start(&self) -> ActorResult;
35
36    fn name(&self) -> &'static str;
37}
38
39pub trait Producer<T>
40where
41    T: Sync + Send + Clone,
42{
43    fn produce(&mut self, _broadcaster: Broadcaster<T>) -> &mut Self {
44        panic!("Not implemented");
45    }
46}
47
48pub trait Consumer<T>
49where
50    T: Sync + Send + Clone,
51{
52    fn consume(&mut self, _receiver: Broadcaster<T>) -> &mut Self {
53        panic!("Not implemented");
54    }
55}
56
57pub trait Accessor<T> {
58    fn access(&mut self, _data: SharedState<T>) -> &mut Self {
59        panic!("Not implemented");
60    }
61}
62
63#[cfg(test)]
64mod test {
65    use crate::actor::{Consumer, Producer, SharedState};
66    use crate::channels::Broadcaster;
67
68    //use crate::macros::*;
69
70    #[allow(dead_code)]
71    #[derive(Clone)]
72    struct DataStruct0 {
73        data: Option<SharedState<i32>>,
74    }
75
76    #[allow(dead_code)]
77    #[derive(Clone)]
78    struct DataStruct1 {
79        data: String,
80    }
81
82    #[allow(dead_code)]
83    #[derive(Clone)]
84    struct DataStruct2 {
85        pub data: u32,
86    }
87
88    #[allow(dead_code)]
89    #[derive(Clone)]
90    struct DataStruct3 {
91        data: u128,
92    }
93
94    #[allow(dead_code)]
95    struct TestActor {
96        state: Option<SharedState<DataStruct0>>,
97        broadcaster0: Option<Broadcaster<DataStruct0>>,
98        broadcaster1: Option<Broadcaster<DataStruct1>>,
99        consumer2: Option<Broadcaster<DataStruct2>>,
100    }
101
102    impl TestActor {
103        pub fn new() -> Self {
104            Self { state: None, broadcaster0: None, broadcaster1: None, consumer2: None }
105        }
106
107        pub async fn start(&self) {}
108    }
109
110    impl Consumer<DataStruct2> for TestActor {
111        fn consume(&mut self, consumer: Broadcaster<DataStruct2>) -> &mut Self {
112            self.consumer2 = Some(consumer);
113            self
114        }
115    }
116
117    impl Producer<DataStruct0> for TestActor {
118        fn produce(&mut self, broadcaster: Broadcaster<DataStruct0>) -> &mut Self {
119            self.broadcaster0 = Some(broadcaster);
120            self
121        }
122    }
123
124    impl Producer<DataStruct1> for TestActor {
125        fn produce(&mut self, broadcaster: Broadcaster<DataStruct1>) -> &mut Self {
126            self.broadcaster1 = Some(broadcaster);
127            self
128        }
129    }
130
131    #[tokio::test]
132    async fn test_actor() {
133        let channel0: Broadcaster<DataStruct0> = Broadcaster::new(10);
134        let channel1: Broadcaster<DataStruct1> = Broadcaster::new(10);
135        let channel2: Broadcaster<DataStruct2> = Broadcaster::new(10);
136
137        let mut test_actor: TestActor = TestActor::new();
138        test_actor.produce(channel0).produce(channel1).consume(channel2).start().await;
139    }
140}