kabu_node_json_rpc/
node_mempool_actor.rs

1use alloy_network::{Ethereum, TransactionResponse};
2use alloy_primitives::TxHash;
3use alloy_provider::Provider;
4use futures::StreamExt;
5use tracing::error;
6
7use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer, WorkerResult};
8use kabu_core_actors_macros::*;
9use kabu_core_blockchain::Blockchain;
10use kabu_types_blockchain::KabuDataTypesEthereum;
11use kabu_types_blockchain::MempoolTx;
12use kabu_types_events::{MessageMempoolDataUpdate, NodeMempoolDataUpdate};
13
14/// Worker listens for new transactions in the node mempool and broadcasts [`MessageMempoolDataUpdate`].
15pub async fn new_node_mempool_worker<P>(client: P, name: String, mempool_tx: Broadcaster<MessageMempoolDataUpdate>) -> WorkerResult
16where
17    P: Provider<Ethereum> + Send + Sync + 'static,
18{
19    let mempool_subscription = client.subscribe_full_pending_transactions().await?;
20    let mut stream = mempool_subscription.into_stream();
21
22    while let Some(tx) = stream.next().await {
23        let tx_hash: TxHash = tx.tx_hash();
24        let update_msg: MessageMempoolDataUpdate = MessageMempoolDataUpdate::new_with_source(
25            NodeMempoolDataUpdate { tx_hash, mempool_tx: MempoolTx { tx: Some(tx), ..MempoolTx::default() } },
26            name.clone(),
27        );
28        if let Err(e) = mempool_tx.send(update_msg) {
29            error!("mempool_tx.send error : {}", e);
30            break;
31        }
32    }
33    Ok(name)
34}
35
36#[derive(Producer)]
37pub struct NodeMempoolActor<P> {
38    name: &'static str,
39    client: P,
40    #[producer]
41    mempool_tx: Option<Broadcaster<MessageMempoolDataUpdate>>,
42}
43
44impl<P> NodeMempoolActor<P>
45where
46    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
47{
48    pub fn new(client: P) -> NodeMempoolActor<P> {
49        NodeMempoolActor { client, name: "NodeMempoolActor", mempool_tx: None }
50    }
51
52    pub fn with_name(self, name: String) -> Self {
53        Self { name: Box::leak(name.into_boxed_str()), ..self }
54    }
55
56    fn get_name(&self) -> &'static str {
57        self.name
58    }
59
60    pub fn on_bc(self, bc: &Blockchain<KabuDataTypesEthereum>) -> Self {
61        Self { mempool_tx: Some(bc.new_mempool_tx_channel()), ..self }
62    }
63}
64
65impl<P> Actor for NodeMempoolActor<P>
66where
67    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
68{
69    fn start(&self) -> ActorResult {
70        let task =
71            tokio::task::spawn(new_node_mempool_worker(self.client.clone(), self.name.to_string(), self.mempool_tx.clone().unwrap()));
72        Ok(vec![task])
73    }
74
75    fn name(&self) -> &'static str {
76        self.get_name()
77    }
78}