kabu_broadcast_broadcaster/
flashbots.rs

1use std::sync::Arc;
2
3use alloy_network::Ethereum;
4use alloy_primitives::Bytes;
5use alloy_provider::Provider;
6use eyre::{eyre, Result};
7use tokio::sync::broadcast::error::RecvError;
8use tracing::error;
9
10use kabu_broadcast_flashbots::Flashbots;
11use kabu_core_actors::{subscribe, Actor, ActorResult, Broadcaster, Consumer, WorkerResult};
12use kabu_core_actors_macros::{Accessor, Consumer};
13use kabu_core_blockchain::Blockchain;
14use kabu_types_events::{MessageTxCompose, RlpState, TxComposeData, TxComposeMessageType};
15
16async fn broadcast_task<P>(broadcast_request: TxComposeData, client: Arc<Flashbots<P>>) -> Result<()>
17where
18    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
19{
20    let block_number = broadcast_request.next_block_number;
21
22    if let Some(rlp_bundle) = broadcast_request.rlp_bundle.clone() {
23        let stuffing_rlp_bundle: Vec<Bytes> = rlp_bundle.iter().map(|item| item.unwrap()).collect();
24        let backrun_rlp_bundle: Vec<Bytes> =
25            rlp_bundle.iter().filter(|item| matches!(item, RlpState::Backrun(_))).map(|item| item.unwrap()).collect();
26
27        if stuffing_rlp_bundle.iter().any(|i| i.is_empty()) || backrun_rlp_bundle.iter().any(|i| i.is_empty()) {
28            Err(eyre!("RLP_BUNDLE_IS_INCORRECT"))
29        } else {
30            client.broadcast_txes(backrun_rlp_bundle.clone(), block_number).await?;
31            client.broadcast_txes(stuffing_rlp_bundle.clone(), block_number).await?;
32
33            Ok(())
34        }
35    } else {
36        error!("rlp_bundle is None");
37        Err(eyre!("RLP_BUNDLE_IS_NONE"))
38    }
39}
40
41async fn flashbots_broadcaster_worker<P>(
42    client: Arc<Flashbots<P>>,
43    bundle_rx: Broadcaster<MessageTxCompose>,
44    allow_broadcast: bool,
45) -> WorkerResult
46where
47    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
48{
49    subscribe!(bundle_rx);
50
51    //let mut current_block: u64 = 0;
52
53    loop {
54        tokio::select! {
55            msg = bundle_rx.recv() => {
56                let broadcast_msg : Result<MessageTxCompose, RecvError> = msg;
57                match broadcast_msg {
58                    Ok(compose_request) => {
59                        if let TxComposeMessageType::Broadcast(broadcast_request)  = compose_request.inner {
60                            if allow_broadcast {
61                                      tokio::task::spawn(
62                                        broadcast_task(
63                                            broadcast_request,
64                                            client.clone(),
65                                        )
66                                    );
67                                }
68
69                            //TODO : Move smart mode to Strategy router
70                            /*
71                            if smart_mode {
72                                if current_block < broadcast_request.next_block_number {
73                                    current_block = broadcast_request.next_block_number;
74                                    best_request = BestTxSwapCompose::new_with_pct( U256::from(8000));
75                                }
76
77                                if best_request.check(&broadcast_request) {
78                                    if allow_broadcast {
79                                         tokio::task::spawn(
80                                            broadcast_task(
81                                            broadcast_request,
82                                            client.clone(),
83                                            )
84                                        );
85                                    } else {
86                                       info!("broadcast_request (best_request)");
87                                    }
88                                }
89                            } else if allow_broadcast {
90                                      tokio::task::spawn(
91                                        broadcast_task(
92                                            broadcast_request,
93                                            client.clone(),
94                                        )
95                                    );
96                            } else {
97                                info!("broadcast_request");
98                            }
99
100                             */
101                        }
102                    }
103                    Err(e)=>{
104                        error!("flashbots_broadcaster_worker {}", e)
105                    }
106                }
107            }
108        }
109    }
110}
111
112#[derive(Accessor, Consumer)]
113pub struct FlashbotsBroadcastActor<P> {
114    client: Arc<Flashbots<P>>,
115    #[consumer]
116    tx_compose_channel_rx: Option<Broadcaster<MessageTxCompose>>,
117    allow_broadcast: bool,
118}
119
120impl<P> FlashbotsBroadcastActor<P>
121where
122    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
123{
124    pub fn new(client: Flashbots<P>, allow_broadcast: bool) -> FlashbotsBroadcastActor<P> {
125        FlashbotsBroadcastActor { client: Arc::new(client), tx_compose_channel_rx: None, allow_broadcast }
126    }
127
128    pub fn on_bc(self, bc: &Blockchain) -> Self {
129        Self { tx_compose_channel_rx: Some(bc.tx_compose_channel()), ..self }
130    }
131}
132
133impl<P> Actor for FlashbotsBroadcastActor<P>
134where
135    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
136{
137    fn start(&self) -> ActorResult {
138        let task = tokio::task::spawn(flashbots_broadcaster_worker(
139            self.client.clone(),
140            self.tx_compose_channel_rx.clone().unwrap(),
141            self.allow_broadcast,
142        ));
143        Ok(vec![task])
144    }
145
146    fn name(&self) -> &'static str {
147        "FlashbotsBroadcastActor"
148    }
149}