kabu_node_player/
compose.rs1use kabu_core_actors::{Broadcaster, SharedState, WorkerResult};
2use kabu_evm_utils::reth_types::decode_into_transaction;
3use kabu_types_blockchain::Mempool;
4use kabu_types_events::{MessageTxCompose, RlpState, TxComposeMessageType};
5use tokio::select;
6use tracing::{error, info};
7
8pub(crate) async fn replayer_compose_worker(mempool: SharedState<Mempool>, compose_channel: Broadcaster<MessageTxCompose>) -> WorkerResult {
9 let mut compose_channel_rx = compose_channel.subscribe();
10
11 loop {
12 select! {
13 msg = compose_channel_rx.recv() => {
14 if let Ok(msg) = msg {
15 if let TxComposeMessageType::Broadcast(broadcast_msg) = msg.inner {
16 info!("Broadcast compose message received. {:?}", broadcast_msg.tx_bundle);
17 for tx in broadcast_msg.rlp_bundle.unwrap_or_default() {
18 match tx {
19 RlpState::Backrun( rlp_tx) | RlpState::Stuffing( rlp_tx)=>{
20 match decode_into_transaction( &rlp_tx ) {
21 Ok(new_tx)=>{
22 mempool.write().await.add_tx(new_tx);
23 }
24 Err(e)=>{
25 error!("decode_into_transaction {}", e);
26 }
27 }
28
29 }
30 _=>{
31 error!("Unknown RLP tx type");
32 }
33 }
34 }
35 }
36 }
37 }
38 }
39 }
40}