kabu_broadcast_accounts/signers/
signers_actor.rs

1use alloy_primitives::Bytes;
2use eyre::{eyre, Result};
3use tokio::sync::broadcast::error::RecvError;
4use tokio::sync::broadcast::Receiver;
5use tracing::{error, info};
6
7use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
8use kabu_core_actors_macros::{Accessor, Consumer, Producer};
9use kabu_core_blockchain::Blockchain;
10use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum, KabuTx};
11use kabu_types_events::{MessageTxCompose, RlpState, TxComposeData, TxComposeMessageType, TxState};
12
13async fn sign_task<LDT: KabuDataTypes>(
14    sign_request: TxComposeData<LDT>,
15    compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
16) -> Result<()> {
17    let signer = match sign_request.signer.clone() {
18        Some(signer) => signer,
19        None => {
20            error!("No signer found in sign_request");
21            return Err(eyre!("NO_SIGNER_FOUND"));
22        }
23    };
24
25    let rlp_bundle: Vec<RlpState> = sign_request
26        .tx_bundle
27        .clone()
28        .unwrap()
29        .iter()
30        .map(|tx_request| match &tx_request {
31            TxState::Stuffing(t) => RlpState::Stuffing(t.encode().into()),
32            TxState::SignatureRequired(t) => {
33                let tx = signer.sign_sync(t.clone()).unwrap();
34                let tx_hash = tx.get_tx_hash();
35                let signed_tx_bytes = Bytes::from(tx.encode());
36
37                info!("Tx signed {tx_hash:?}");
38                RlpState::Backrun(signed_tx_bytes)
39            }
40            TxState::ReadyForBroadcast(t) => RlpState::Backrun(t.clone()),
41            TxState::ReadyForBroadcastStuffing(t) => RlpState::Stuffing(t.clone()),
42        })
43        .collect();
44
45    if rlp_bundle.iter().any(|item| item.is_none()) {
46        error!("Bundle is not ready. Cannot sign");
47        return Err(eyre!("CANNOT_SIGN_BUNDLE"));
48    }
49
50    let broadcast_request = TxComposeData { rlp_bundle: Some(rlp_bundle), ..sign_request };
51
52    match compose_channel_tx.send(MessageTxCompose::broadcast(broadcast_request)) {
53        Err(e) => {
54            error!("{e}");
55            Err(eyre!("BROADCAST_ERROR"))
56        }
57        _ => Ok(()),
58    }
59}
60
61async fn request_listener_worker<LDT: KabuDataTypes>(
62    compose_channel_rx: Broadcaster<MessageTxCompose<LDT>>,
63    compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
64) -> WorkerResult {
65    let mut compose_channel_rx: Receiver<MessageTxCompose<LDT>> = compose_channel_rx.subscribe();
66
67    loop {
68        tokio::select! {
69            msg = compose_channel_rx.recv() => {
70                let compose_request_msg : Result<MessageTxCompose<LDT>, RecvError> = msg;
71                match compose_request_msg {
72                    Ok(compose_request) =>{
73
74                        if let TxComposeMessageType::Sign( sign_request)= compose_request.inner {
75                            tokio::task::spawn(
76                                sign_task(
77                                    sign_request,
78                                    compose_channel_tx.clone(),
79                                )
80                            );
81                        }
82                    }
83                    Err(e)=>{error!("{}",e)}
84                }
85            }
86        }
87    }
88}
89
90#[derive(Accessor, Consumer, Producer)]
91pub struct TxSignersActor<LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
92    #[consumer]
93    compose_channel_rx: Option<Broadcaster<MessageTxCompose<LDT>>>,
94    #[producer]
95    compose_channel_tx: Option<Broadcaster<MessageTxCompose<LDT>>>,
96}
97
98impl<LDT: KabuDataTypes + 'static> Default for TxSignersActor<LDT> {
99    fn default() -> Self {
100        Self { compose_channel_rx: None, compose_channel_tx: None }
101    }
102}
103
104impl<LDT: KabuDataTypes> TxSignersActor<LDT> {
105    pub fn new() -> TxSignersActor<LDT> {
106        TxSignersActor::<LDT>::default()
107    }
108
109    pub fn on_bc(self, bc: &Blockchain<LDT>) -> Self {
110        Self { compose_channel_rx: Some(bc.tx_compose_channel()), compose_channel_tx: Some(bc.tx_compose_channel()) }
111    }
112}
113
114impl<LDT: KabuDataTypes> Actor for TxSignersActor<LDT> {
115    fn start(&self) -> ActorResult {
116        let task =
117            tokio::task::spawn(request_listener_worker(self.compose_channel_rx.clone().unwrap(), self.compose_channel_tx.clone().unwrap()));
118
119        Ok(vec![task])
120    }
121
122    fn name(&self) -> &'static str {
123        "SignersActor"
124    }
125}