kabu_node_player/
compose.rs

1use kabu_core_actors::{Broadcaster, SharedState, WorkerResult};
2use kabu_evm_utils::reth_types::decode_into_transaction;
3use kabu_types_blockchain::Mempool;
4use kabu_types_events::{MessageTxCompose, RlpState, TxComposeMessageType};
5use tokio::select;
6use tracing::{error, info};
7
8pub(crate) async fn replayer_compose_worker(mempool: SharedState<Mempool>, compose_channel: Broadcaster<MessageTxCompose>) -> WorkerResult {
9    let mut compose_channel_rx = compose_channel.subscribe();
10
11    loop {
12        select! {
13            msg = compose_channel_rx.recv() => {
14                if let Ok(msg) = msg {
15                    if let TxComposeMessageType::Broadcast(broadcast_msg) = msg.inner {
16                        info!("Broadcast compose message received. {:?}", broadcast_msg.tx_bundle);
17                        for tx in broadcast_msg.rlp_bundle.unwrap_or_default() {
18                            match tx {
19                                RlpState::Backrun( rlp_tx) | RlpState::Stuffing( rlp_tx)=>{
20                                    match decode_into_transaction( &rlp_tx ) {
21                                        Ok(new_tx)=>{
22                                            mempool.write().await.add_tx(new_tx);
23                                        }
24                                        Err(e)=>{
25                                            error!("decode_into_transaction {}", e);
26                                        }
27                                    }
28
29                                }
30                                _=>{
31                                    error!("Unknown RLP tx type");
32                                }
33                            }
34                        }
35                    }
36                }
37            }
38        }
39    }
40}