kabu_core_actors/
actor_manager.rs

1use eyre::Result;
2use tokio::task::JoinHandle;
3use tracing::{error, info};
4
5use crate::{Actor, WorkerResult};
6
7#[derive(Default)]
8pub struct ActorsManager {
9    tasks: Vec<JoinHandle<WorkerResult>>,
10}
11
12impl ActorsManager {
13    pub fn new() -> Self {
14        Self::default()
15    }
16
17    pub fn start(&mut self, actor: impl Actor + 'static) -> Result<()> {
18        match actor.start() {
19            Ok(workers) => {
20                info!("{} started successfully", actor.name());
21                self.tasks.extend(workers);
22                Ok(())
23            }
24            Err(e) => {
25                error!("Error starting {} : {}", actor.name(), e);
26                Err(e)
27            }
28        }
29    }
30
31    pub fn start_and_wait(&mut self, actor: impl Actor + Send + Sync + 'static) -> Result<()> {
32        match actor.start_and_wait() {
33            Ok(_) => {
34                info!("{} started successfully", actor.name());
35                Ok(())
36            }
37            Err(e) => {
38                error!("Error starting {} : {}", actor.name(), e);
39                Err(e)
40            }
41        }
42    }
43
44    pub async fn wait(self) {
45        let mut f_remaining_futures = self.tasks;
46        let mut futures_counter = f_remaining_futures.len();
47
48        while futures_counter > 0 {
49            let (result, _index, remaining_futures) = futures::future::select_all(f_remaining_futures).await;
50            match result {
51                Ok(work_result) => match work_result {
52                    Ok(s) => {
53                        info!("ActorWorker {_index} finished : {s}")
54                    }
55                    Err(e) => {
56                        error!("ActorWorker {_index} finished with error : {e}")
57                    }
58                },
59                Err(e) => {
60                    error!("ActorWorker join error {_index} : {e}")
61                }
62            }
63            f_remaining_futures = remaining_futures;
64            futures_counter = f_remaining_futures.len();
65        }
66    }
67}