kabu_broadcast_broadcaster/
flashbots.rs1use std::sync::Arc;
2
3use alloy_network::Ethereum;
4use alloy_primitives::Bytes;
5use alloy_provider::Provider;
6use eyre::{eyre, Result};
7use tokio::sync::broadcast::error::RecvError;
8use tracing::error;
9
10use kabu_broadcast_flashbots::Flashbots;
11use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
12use kabu_core_actors_macros::{Accessor, Consumer};
13use kabu_core_blockchain::Blockchain;
14use kabu_types_events::{MessageTxCompose, RlpState, TxComposeData, TxComposeMessageType};
15
16async fn broadcast_task<P>(broadcast_request: TxComposeData, client: Arc<Flashbots<P>>) -> Result<()>
17where
18 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
19{
20 let block_number = broadcast_request.next_block_number;
21
22 if let Some(rlp_bundle) = broadcast_request.rlp_bundle.clone() {
23 let stuffing_rlp_bundle: Vec<Bytes> = rlp_bundle.iter().map(|item| item.unwrap()).collect();
24 let backrun_rlp_bundle: Vec<Bytes> =
25 rlp_bundle.iter().filter(|item| matches!(item, RlpState::Backrun(_))).map(|item| item.unwrap()).collect();
26
27 if stuffing_rlp_bundle.iter().any(|i| i.is_empty()) || backrun_rlp_bundle.iter().any(|i| i.is_empty()) {
28 Err(eyre!("RLP_BUNDLE_IS_INCORRECT"))
29 } else {
30 client.broadcast_txes(backrun_rlp_bundle.clone(), block_number).await?;
31 client.broadcast_txes(stuffing_rlp_bundle.clone(), block_number).await?;
32
33 Ok(())
34 }
35 } else {
36 error!("rlp_bundle is None");
37 Err(eyre!("RLP_BUNDLE_IS_NONE"))
38 }
39}
40
41async fn flashbots_broadcaster_worker<P>(
42 client: Arc<Flashbots<P>>,
43 bundle_rx: Broadcaster<MessageTxCompose>,
44 allow_broadcast: bool,
45) -> WorkerResult
46where
47 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
48{
49 subscribe!(bundle_rx);
50
51 loop {
54 tokio::select! {
55 msg = bundle_rx.recv() => {
56 let broadcast_msg : Result<MessageTxCompose, RecvError> = msg;
57 match broadcast_msg {
58 Ok(compose_request) => {
59 if let TxComposeMessageType::Broadcast(broadcast_request) = compose_request.inner {
60 if allow_broadcast {
61 tokio::task::spawn(
62 broadcast_task(
63 broadcast_request,
64 client.clone(),
65 )
66 );
67 }
68
69 }
102 }
103 Err(e)=>{
104 error!("flashbots_broadcaster_worker {}", e)
105 }
106 }
107 }
108 }
109 }
110}
111
112#[derive(Accessor, Consumer)]
113pub struct FlashbotsBroadcastActor<P> {
114 client: Arc<Flashbots<P>>,
115 #[consumer]
116 tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose>>,
117 allow_broadcast: bool,
118}
119
120impl<P> FlashbotsBroadcastActor<P>
121where
122 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
123{
124 pub fn new(client: Flashbots<P>, allow_broadcast: bool) -> FlashbotsBroadcastActor<P> {
125 FlashbotsBroadcastActor { client: Arc::new(client), tx_compose_channel_rx: None, allow_broadcast }
126 }
127
128 pub fn on_bc(self, bc: &Blockchain) -> Self {
129 Self { tx_compose_channel_rx: Some(bc.tx_compose_channel()), ..self }
130 }
131}
132
133impl<P> Actor for FlashbotsBroadcastActor<P>
134where
135 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
136{
137 fn start(&self) -> ActorResult {
138 let task = tokio::task::spawn(flashbots_broadcaster_worker(
139 self.client.clone(),
140 self.tx_compose_channel_rx.clone().unwrap(),
141 self.allow_broadcast,
142 ));
143 Ok(vec![task])
144 }
145
146 fn name(&self) -> &'static str {
147 "FlashbotsBroadcastActor"
148 }
149}