kabu_core_actors/
actor.rs1use crate::channels::Broadcaster;
2use crate::shared_state::SharedState;
3use eyre::{eyre, Result};
4use tokio::task::JoinHandle;
5use tracing::info;
6
7pub type WorkerResult = Result<String>;
8
9pub type ActorResult = Result<Vec<JoinHandle<WorkerResult>>>;
10
11pub trait Actor {
12 fn wait(&self, handles: ActorResult) -> Result<()> {
13 let handles = handles?;
14 let actor_name = self.name();
15 futures::executor::block_on(async {
16 for handle in handles {
17 match handle.await {
18 Ok(result) => match result {
19 Ok(msg) => info!("One-shot actor '{}' completed with message: {}", actor_name, msg),
20 Err(e) => return Err(eyre!("Actor '{}' failed with error: {}", actor_name, e)),
21 },
22 Err(e) => return Err(eyre!("Actor task execution failed for '{}' with error: {}", actor_name, e)),
23 }
24 }
25 Ok(())
26 })
27 }
28
29 fn start_and_wait(&self) -> Result<()> {
30 let handles = self.start();
31 self.wait(handles)
32 }
33
34 fn start(&self) -> ActorResult;
35
36 fn name(&self) -> &'static str;
37}
38
39pub trait Producer<T>
40where
41 T: Sync + Send + Clone,
42{
43 fn produce(&mut self, _broadcaster: Broadcaster<T>) -> &mut Self {
44 panic!("Not implemented");
45 }
46}
47
48pub trait Consumer<T>
49where
50 T: Sync + Send + Clone,
51{
52 fn consume(&mut self, _receiver: Broadcaster<T>) -> &mut Self {
53 panic!("Not implemented");
54 }
55}
56
57pub trait Accessor<T> {
58 fn access(&mut self, _data: SharedState<T>) -> &mut Self {
59 panic!("Not implemented");
60 }
61}
62
63#[cfg(test)]
64mod test {
65 use crate::actor::{Consumer, Producer, SharedState};
66 use crate::channels::Broadcaster;
67
68 #[allow(dead_code)]
71 #[derive(Clone)]
72 struct DataStruct0 {
73 data: Option<SharedState<i32>>,
74 }
75
76 #[allow(dead_code)]
77 #[derive(Clone)]
78 struct DataStruct1 {
79 data: String,
80 }
81
82 #[allow(dead_code)]
83 #[derive(Clone)]
84 struct DataStruct2 {
85 pub data: u32,
86 }
87
88 #[allow(dead_code)]
89 #[derive(Clone)]
90 struct DataStruct3 {
91 data: u128,
92 }
93
94 #[allow(dead_code)]
95 struct TestActor {
96 state: Option<SharedState<DataStruct0>>,
97 broadcaster0: Option<Broadcaster<DataStruct0>>,
98 broadcaster1: Option<Broadcaster<DataStruct1>>,
99 consumer2: Option<Broadcaster<DataStruct2>>,
100 }
101
102 impl TestActor {
103 pub fn new() -> Self {
104 Self { state: None, broadcaster0: None, broadcaster1: None, consumer2: None }
105 }
106
107 pub async fn start(&self) {}
108 }
109
110 impl Consumer<DataStruct2> for TestActor {
111 fn consume(&mut self, consumer: Broadcaster<DataStruct2>) -> &mut Self {
112 self.consumer2 = Some(consumer);
113 self
114 }
115 }
116
117 impl Producer<DataStruct0> for TestActor {
118 fn produce(&mut self, broadcaster: Broadcaster<DataStruct0>) -> &mut Self {
119 self.broadcaster0 = Some(broadcaster);
120 self
121 }
122 }
123
124 impl Producer<DataStruct1> for TestActor {
125 fn produce(&mut self, broadcaster: Broadcaster<DataStruct1>) -> &mut Self {
126 self.broadcaster1 = Some(broadcaster);
127 self
128 }
129 }
130
131 #[tokio::test]
132 async fn test_actor() {
133 let channel0: Broadcaster<DataStruct0> = Broadcaster::new(10);
134 let channel1: Broadcaster<DataStruct1> = Broadcaster::new(10);
135 let channel2: Broadcaster<DataStruct2> = Broadcaster::new(10);
136
137 let mut test_actor: TestActor = TestActor::new();
138 test_actor.produce(channel0).produce(channel1).consume(channel2).start().await;
139 }
140}