kabu_broadcast_accounts/signers/
signers_actor.rs1use alloy_primitives::Bytes;
2use eyre::{eyre, Result};
3use tokio::sync::broadcast::error::RecvError;
4use tokio::sync::broadcast::Receiver;
5use tracing::{error, info};
6
7use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
8use kabu_core_actors_macros::{Accessor, Consumer, Producer};
9use kabu_core_blockchain::Blockchain;
10use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum, KabuTx};
11use kabu_types_events::{MessageTxCompose, RlpState, TxComposeData, TxComposeMessageType, TxState};
12
13async fn sign_task<LDT: KabuDataTypes>(
14 sign_request: TxComposeData<LDT>,
15 compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
16) -> Result<()> {
17 let signer = match sign_request.signer.clone() {
18 Some(signer) => signer,
19 None => {
20 error!("No signer found in sign_request");
21 return Err(eyre!("NO_SIGNER_FOUND"));
22 }
23 };
24
25 let rlp_bundle: Vec<RlpState> = sign_request
26 .tx_bundle
27 .clone()
28 .unwrap()
29 .iter()
30 .map(|tx_request| match &tx_request {
31 TxState::Stuffing(t) => RlpState::Stuffing(t.encode().into()),
32 TxState::SignatureRequired(t) => {
33 let tx = signer.sign_sync(t.clone()).unwrap();
34 let tx_hash = tx.get_tx_hash();
35 let signed_tx_bytes = Bytes::from(tx.encode());
36
37 info!("Tx signed {tx_hash:?}");
38 RlpState::Backrun(signed_tx_bytes)
39 }
40 TxState::ReadyForBroadcast(t) => RlpState::Backrun(t.clone()),
41 TxState::ReadyForBroadcastStuffing(t) => RlpState::Stuffing(t.clone()),
42 })
43 .collect();
44
45 if rlp_bundle.iter().any(|item| item.is_none()) {
46 error!("Bundle is not ready. Cannot sign");
47 return Err(eyre!("CANNOT_SIGN_BUNDLE"));
48 }
49
50 let broadcast_request = TxComposeData { rlp_bundle: Some(rlp_bundle), ..sign_request };
51
52 match compose_channel_tx.send(MessageTxCompose::broadcast(broadcast_request)) {
53 Err(e) => {
54 error!("{e}");
55 Err(eyre!("BROADCAST_ERROR"))
56 }
57 _ => Ok(()),
58 }
59}
60
61async fn request_listener_worker<LDT: KabuDataTypes>(
62 compose_channel_rx: Broadcaster<MessageTxCompose<LDT>>,
63 compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
64) -> WorkerResult {
65 let mut compose_channel_rx: Receiver<MessageTxCompose<LDT>> = compose_channel_rx.subscribe();
66
67 loop {
68 tokio::select! {
69 msg = compose_channel_rx.recv() => {
70 let compose_request_msg : Result<MessageTxCompose<LDT>, RecvError> = msg;
71 match compose_request_msg {
72 Ok(compose_request) =>{
73
74 if let TxComposeMessageType::Sign( sign_request)= compose_request.inner {
75 tokio::task::spawn(
76 sign_task(
77 sign_request,
78 compose_channel_tx.clone(),
79 )
80 );
81 }
82 }
83 Err(e)=>{error!("{}",e)}
84 }
85 }
86 }
87 }
88}
89
90#[derive(Accessor, Consumer, Producer)]
91pub struct TxSignersActor<LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
92 #[consumer]
93 compose_channel_rx: Option<Broadcaster<MessageTxCompose<LDT>>>,
94 #[producer]
95 compose_channel_tx: Option<Broadcaster<MessageTxCompose<LDT>>>,
96}
97
98impl<LDT: KabuDataTypes + 'static> Default for TxSignersActor<LDT> {
99 fn default() -> Self {
100 Self { compose_channel_rx: None, compose_channel_tx: None }
101 }
102}
103
104impl<LDT: KabuDataTypes> TxSignersActor<LDT> {
105 pub fn new() -> TxSignersActor<LDT> {
106 TxSignersActor::<LDT>::default()
107 }
108
109 pub fn on_bc(self, bc: &Blockchain<LDT>) -> Self {
110 Self { compose_channel_rx: Some(bc.tx_compose_channel()), compose_channel_tx: Some(bc.tx_compose_channel()) }
111 }
112}
113
114impl<LDT: KabuDataTypes> Actor for TxSignersActor<LDT> {
115 fn start(&self) -> ActorResult {
116 let task =
117 tokio::task::spawn(request_listener_worker(self.compose_channel_rx.clone().unwrap(), self.compose_channel_tx.clone().unwrap()));
118
119 Ok(vec![task])
120 }
121
122 fn name(&self) -> &'static str {
123 "SignersActor"
124 }
125}