kabu_broadcast_broadcaster/
anvil.rs1use alloy_eips::BlockNumberOrTag;
2use alloy_network::{Ethereum, Network};
3use alloy_provider::Provider;
4use alloy_rpc_types::BlockTransactions;
5use eyre::Result;
6use tokio::sync::broadcast::error::RecvError;
7use tokio::sync::broadcast::Receiver;
8use tracing::{error, info};
9
10use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
11use kabu_core_actors_macros::{Accessor, Consumer};
12use kabu_core_blockchain::Blockchain;
13use kabu_node_debug_provider::AnvilProviderExt;
14use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
15use kabu_types_events::{MessageTxCompose, TxComposeData, TxComposeMessageType};
16
17async fn broadcast_task<P, N>(client: P, request: TxComposeData) -> Result<()>
18where
19 N: Network,
20 P: Provider<N> + AnvilProviderExt<N> + Clone + Send + Sync + 'static,
21{
22 info!("Hardhat broadcast request received : {}", request.origin.unwrap_or("UNKNOWN_ORIGIN".to_string()));
23 for tx_rlp in request.rlp_bundle.unwrap_or_default().iter() {
27 let tx_bytes = tx_rlp.clone().unwrap().clone();
28
29 match client.send_raw_transaction(&tx_bytes).await {
33 Err(e) => error!("send_raw_transaction error : {e}"),
34 Ok(_) => {
35 info!("send_raw_transaction error : Hardhat transaction broadcast successfully",);
36 }
37 }
38 }
39
40 Ok(())
41}
42
43async fn anvil_broadcaster_worker<P>(client: P, bundle_rx: Broadcaster<MessageTxCompose>) -> WorkerResult
44where
45 P: Provider<Ethereum> + AnvilProviderExt<Ethereum> + Send + Sync + Clone + 'static,
46{
47 let mut bundle_rx: Receiver<MessageTxCompose> = bundle_rx.subscribe();
48
49 loop {
50 tokio::select! {
51 msg = bundle_rx.recv() => {
52 let broadcast_msg : Result<MessageTxCompose,RecvError> = msg;
53 match broadcast_msg {
54 Ok(compose_request) => {
55 if let TxComposeMessageType::Broadcast(broadcast_request) = compose_request.inner {
56 info!("Broadcasting to hardhat:" );
57 let snap_shot = client.snapshot().await?;
58 client.set_automine(false).await?;
59 match broadcast_task(client.clone(), broadcast_request).await{
60 Err(e)=>error!("{e}"),
61 Ok(_)=>info!("Hardhat broadcast successful")
62 }
63 client.mine().await?;
64
65 let block = client.get_block_by_number(BlockNumberOrTag::Latest).await?.unwrap_or_default();
66 if let BlockTransactions::Hashes(hashes) = block.transactions {
67 for tx_hash in hashes {
68 let reciept = client.get_transaction_receipt(tx_hash).await?.unwrap();
69 info!("Block : {} Mined: {} hash: {} gas : {}", reciept.block_number.unwrap_or_default(), reciept.status(), tx_hash, reciept.gas_used, );
70 }
71 }
72 client.revert(snap_shot).await?;
73 }
74 }
75 Err(e)=>{
76 error!("{}", e)
77 }
78 }
79 }
80 }
81 }
82}
83
84#[derive(Accessor, Consumer)]
85pub struct AnvilBroadcastActor<P, LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
86 client: P,
87 #[consumer]
88 tx_compose_rx: Option<Broadcaster<MessageTxCompose<LDT>>>,
89}
90
91impl<P> AnvilBroadcastActor<P>
92where
93 P: Provider<Ethereum> + AnvilProviderExt<Ethereum> + Send + Sync + Clone + 'static,
94{
95 pub fn new(client: P) -> AnvilBroadcastActor<P> {
96 Self { client, tx_compose_rx: None }
97 }
98
99 pub fn on_bc(self, bc: &Blockchain<KabuDataTypesEthereum>) -> Self {
100 Self { tx_compose_rx: Some(bc.tx_compose_channel()), ..self }
101 }
102}
103
104impl<P> Actor for AnvilBroadcastActor<P>
105where
106 P: Provider<Ethereum> + AnvilProviderExt<Ethereum> + Send + Sync + Clone + 'static,
107{
108 fn start(&self) -> ActorResult {
109 let task = tokio::task::spawn(anvil_broadcaster_worker(self.client.clone(), self.tx_compose_rx.clone().unwrap()));
110 Ok(vec![task])
111 }
112
113 fn name(&self) -> &'static str {
114 "AnvilBroadcastActor"
115 }
116}