kabu_broadcast_broadcaster/
anvil.rs

1use alloy_eips::BlockNumberOrTag;
2use alloy_network::{Ethereum, Network};
3use alloy_provider::Provider;
4use alloy_rpc_types::BlockTransactions;
5use eyre::Result;
6use tokio::sync::broadcast::error::RecvError;
7use tokio::sync::broadcast::Receiver;
8use tracing::{error, info};
9
10use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
11use kabu_core_actors_macros::{Accessor, Consumer};
12use kabu_core_blockchain::Blockchain;
13use kabu_node_debug_provider::AnvilProviderExt;
14use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
15use kabu_types_events::{MessageTxCompose, TxComposeData, TxComposeMessageType};
16
17async fn broadcast_task<P, N>(client: P, request: TxComposeData) -> Result<()>
18where
19    N: Network,
20    P: Provider<N> + AnvilProviderExt<N> + Clone + Send + Sync + 'static,
21{
22    info!("Hardhat broadcast request received : {}", request.origin.unwrap_or("UNKNOWN_ORIGIN".to_string()));
23    //let snap = client.dev_rpc().snapshot().await?;
24    //info!("Hardhat snapshot created {snap}");
25
26    for tx_rlp in request.rlp_bundle.unwrap_or_default().iter() {
27        let tx_bytes = tx_rlp.clone().unwrap().clone();
28
29        //let envelope = TxEnvelope::decode_2718(&mut tx_bytes.as_ref())?;
30        //debug!("sending tx to anvil: {} {:?}", tx_bytes.len(), envelope);
31
32        match client.send_raw_transaction(&tx_bytes).await {
33            Err(e) => error!("send_raw_transaction error : {e}"),
34            Ok(_) => {
35                info!("send_raw_transaction error : Hardhat transaction broadcast successfully",);
36            }
37        }
38    }
39
40    Ok(())
41}
42
43async fn anvil_broadcaster_worker<P>(client: P, bundle_rx: Broadcaster<MessageTxCompose>) -> WorkerResult
44where
45    P: Provider<Ethereum> + AnvilProviderExt<Ethereum> + Send + Sync + Clone + 'static,
46{
47    let mut bundle_rx: Receiver<MessageTxCompose> = bundle_rx.subscribe();
48
49    loop {
50        tokio::select! {
51            msg = bundle_rx.recv() => {
52                let broadcast_msg : Result<MessageTxCompose,RecvError> = msg;
53                match broadcast_msg {
54                    Ok(compose_request) => {
55                        if let TxComposeMessageType::Broadcast(broadcast_request) = compose_request.inner {
56                            info!("Broadcasting to hardhat:" );
57                            let snap_shot = client.snapshot().await?;
58                            client.set_automine(false).await?;
59                            match broadcast_task(client.clone(), broadcast_request).await{
60                                Err(e)=>error!("{e}"),
61                                Ok(_)=>info!("Hardhat broadcast successful")
62                            }
63                            client.mine().await?;
64
65                            let block = client.get_block_by_number(BlockNumberOrTag::Latest).await?.unwrap_or_default();
66                            if let BlockTransactions::Hashes(hashes) = block.transactions {
67                                for tx_hash in hashes {
68                                    let reciept = client.get_transaction_receipt(tx_hash).await?.unwrap();
69                                    info!("Block : {} Mined: {} hash:  {} gas : {}", reciept.block_number.unwrap_or_default(), reciept.status(), tx_hash, reciept.gas_used, );
70                                }
71                            }
72                            client.revert(snap_shot).await?;
73                        }
74                    }
75                    Err(e)=>{
76                        error!("{}", e)
77                    }
78                }
79            }
80        }
81    }
82}
83
84#[derive(Accessor, Consumer)]
85pub struct AnvilBroadcastActor<P, LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
86    client: P,
87    #[consumer]
88    tx_compose_rx: Option<Broadcaster<MessageTxCompose<LDT>>>,
89}
90
91impl<P> AnvilBroadcastActor<P>
92where
93    P: Provider<Ethereum> + AnvilProviderExt<Ethereum> + Send + Sync + Clone + 'static,
94{
95    pub fn new(client: P) -> AnvilBroadcastActor<P> {
96        Self { client, tx_compose_rx: None }
97    }
98
99    pub fn on_bc(self, bc: &Blockchain<KabuDataTypesEthereum>) -> Self {
100        Self { tx_compose_rx: Some(bc.tx_compose_channel()), ..self }
101    }
102}
103
104impl<P> Actor for AnvilBroadcastActor<P>
105where
106    P: Provider<Ethereum> + AnvilProviderExt<Ethereum> + Send + Sync + Clone + 'static,
107{
108    fn start(&self) -> ActorResult {
109        let task = tokio::task::spawn(anvil_broadcaster_worker(self.client.clone(), self.tx_compose_rx.clone().unwrap()));
110        Ok(vec![task])
111    }
112
113    fn name(&self) -> &'static str {
114        "AnvilBroadcastActor"
115    }
116}