kabu_node_json_rpc/
node_mempool_actor.rs1use alloy_network::{Ethereum, TransactionResponse};
2use alloy_primitives::TxHash;
3use alloy_provider::Provider;
4use futures::StreamExt;
5use tracing::error;
6
7use kabu_core_actors::{Actor, ActorResult, Broadcaster, Producer, WorkerResult};
8use kabu_core_actors_macros::*;
9use kabu_core_blockchain::Blockchain;
10use kabu_types_blockchain::KabuDataTypesEthereum;
11use kabu_types_blockchain::MempoolTx;
12use kabu_types_events::{MessageMempoolDataUpdate, NodeMempoolDataUpdate};
13
14pub async fn new_node_mempool_worker<P>(client: P, name: String, mempool_tx: Broadcaster<MessageMempoolDataUpdate>) -> WorkerResult
16where
17 P: Provider<Ethereum> + Send + Sync + 'static,
18{
19 let mempool_subscription = client.subscribe_full_pending_transactions().await?;
20 let mut stream = mempool_subscription.into_stream();
21
22 while let Some(tx) = stream.next().await {
23 let tx_hash: TxHash = tx.tx_hash();
24 let update_msg: MessageMempoolDataUpdate = MessageMempoolDataUpdate::new_with_source(
25 NodeMempoolDataUpdate { tx_hash, mempool_tx: MempoolTx { tx: Some(tx), ..MempoolTx::default() } },
26 name.clone(),
27 );
28 if let Err(e) = mempool_tx.send(update_msg) {
29 error!("mempool_tx.send error : {}", e);
30 break;
31 }
32 }
33 Ok(name)
34}
35
36#[derive(Producer)]
37pub struct NodeMempoolActor<P> {
38 name: &'static str,
39 client: P,
40 #[producer]
41 mempool_tx: Option<Broadcaster<MessageMempoolDataUpdate>>,
42}
43
44impl<P> NodeMempoolActor<P>
45where
46 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
47{
48 pub fn new(client: P) -> NodeMempoolActor<P> {
49 NodeMempoolActor { client, name: "NodeMempoolActor", mempool_tx: None }
50 }
51
52 pub fn with_name(self, name: String) -> Self {
53 Self { name: Box::leak(name.into_boxed_str()), ..self }
54 }
55
56 fn get_name(&self) -> &'static str {
57 self.name
58 }
59
60 pub fn on_bc(self, bc: &Blockchain<KabuDataTypesEthereum>) -> Self {
61 Self { mempool_tx: Some(bc.new_mempool_tx_channel()), ..self }
62 }
63}
64
65impl<P> Actor for NodeMempoolActor<P>
66where
67 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
68{
69 fn start(&self) -> ActorResult {
70 let task =
71 tokio::task::spawn(new_node_mempool_worker(self.client.clone(), self.name.to_string(), self.mempool_tx.clone().unwrap()));
72 Ok(vec![task])
73 }
74
75 fn name(&self) -> &'static str {
76 self.get_name()
77 }
78}