kabu_core_actors/
actor_manager.rs1use eyre::Result;
2use tokio::task::JoinHandle;
3use tracing::{error, info};
4
5use crate::{Actor, WorkerResult};
6
7#[derive(Default)]
8pub struct ActorsManager {
9 tasks: Vec<JoinHandle<WorkerResult>>,
10}
11
12impl ActorsManager {
13 pub fn new() -> Self {
14 Self::default()
15 }
16
17 pub fn start(&mut self, actor: impl Actor + 'static) -> Result<()> {
18 match actor.start() {
19 Ok(workers) => {
20 info!("{} started successfully", actor.name());
21 self.tasks.extend(workers);
22 Ok(())
23 }
24 Err(e) => {
25 error!("Error starting {} : {}", actor.name(), e);
26 Err(e)
27 }
28 }
29 }
30
31 pub fn start_and_wait(&mut self, actor: impl Actor + Send + Sync + 'static) -> Result<()> {
32 match actor.start_and_wait() {
33 Ok(_) => {
34 info!("{} started successfully", actor.name());
35 Ok(())
36 }
37 Err(e) => {
38 error!("Error starting {} : {}", actor.name(), e);
39 Err(e)
40 }
41 }
42 }
43
44 pub async fn wait(self) {
45 let mut f_remaining_futures = self.tasks;
46 let mut futures_counter = f_remaining_futures.len();
47
48 while futures_counter > 0 {
49 let (result, _index, remaining_futures) = futures::future::select_all(f_remaining_futures).await;
50 match result {
51 Ok(work_result) => match work_result {
52 Ok(s) => {
53 info!("ActorWorker {_index} finished : {s}")
54 }
55 Err(e) => {
56 error!("ActorWorker {_index} finished with error : {e}")
57 }
58 },
59 Err(e) => {
60 error!("ActorWorker join error {_index} : {e}")
61 }
62 }
63 f_remaining_futures = remaining_futures;
64 futures_counter = f_remaining_futures.len();
65 }
66 }
67}