kabu_core_mempool/
mempool_actor.rs

1use alloy_primitives::BlockNumber;
2use chrono::{Duration, Utc};
3use eyre::eyre;
4use influxdb::{Timestamp, WriteQuery};
5use tokio::sync::broadcast::error::RecvError;
6use tracing::{debug, error, info, trace};
7
8use kabu_core_actors::{run_sync, subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
9use kabu_core_actors_macros::{Accessor, Consumer, Producer};
10use kabu_core_blockchain::Blockchain;
11use kabu_types_blockchain::{ChainParameters, Mempool, MempoolTx};
12use kabu_types_blockchain::{KabuBlock, KabuDataTypes, KabuDataTypesEthereum, KabuHeader, KabuTx};
13use kabu_types_events::{MempoolEvents, MessageBlock, MessageBlockHeader, MessageMempoolDataUpdate};
14
15pub async fn new_mempool_worker<LDT: KabuDataTypes>(
16    chain_parameters: ChainParameters,
17    mempool: SharedState<Mempool<LDT>>,
18    mempool_update_rx: Broadcaster<MessageMempoolDataUpdate<LDT>>,
19    block_header_rx: Broadcaster<MessageBlockHeader<LDT>>,
20    block_with_txs_rx: Broadcaster<MessageBlock<LDT>>,
21    broadcaster: Broadcaster<MempoolEvents>,
22    influxdb_write_channel_tx: Broadcaster<WriteQuery>,
23) -> WorkerResult {
24    subscribe!(mempool_update_rx);
25    subscribe!(block_header_rx);
26    subscribe!(block_with_txs_rx);
27
28    let mut current_gas_price: Option<u128> = None;
29    let mut last_cleaning_block: Option<BlockNumber> = None;
30
31    loop {
32        tokio::select! {
33            msg = mempool_update_rx.recv() => {
34                let mempool_update_msg = match msg {
35                    Ok(mempool_update_msg) => mempool_update_msg,
36                    Err(e) => {
37                        match e {
38                            RecvError::Closed => {
39                                error!("Mempool update channel closed");
40                                break Err(eyre!("MEMPOOL_UPDATE_RX_CLOSED"))
41                            }
42                            RecvError::Lagged(lag) => {
43                                error!("Mempool update channel lagged by {} messages", lag);
44                                continue;
45                            }
46                        }
47                    }
48                };
49
50                let mut mempool_guard = mempool.write().await;
51                let tx_hash = mempool_update_msg.tx_hash;
52                let mempool_entry = mempool_guard.txs.entry(tx_hash).or_insert( MempoolTx::<LDT>{ tx_hash,  source : mempool_update_msg.source(), ..MempoolTx::default()});
53                if let Some(logs) = &mempool_update_msg.mempool_tx.logs {
54                    if mempool_entry.logs.is_none() {
55                        mempool_entry.logs = Some(logs.clone());
56                        run_sync!(broadcaster.send(MempoolEvents::MempoolLogUpdate {tx_hash } ));
57                    }
58                }
59                if let Some(state_update) = &mempool_update_msg.mempool_tx.state_update {
60                    if mempool_entry.state_update.is_none() {
61                        mempool_entry.state_update = Some(state_update.clone());
62                        run_sync!(broadcaster.send(MempoolEvents::MempoolStateUpdate{ tx_hash }));
63                    }
64                }
65                if let Some(tx) = &mempool_update_msg.mempool_tx.tx {
66                    if mempool_entry.tx.is_none() {
67                        mempool_entry.tx = Some(tx.clone());
68                        if let Some(cur_gas_price) = current_gas_price {
69                            if tx.get_gas_limit() > 30000 && tx.get_gas_price() >= cur_gas_price && mempool_guard.is_valid_tx(tx) {
70                                run_sync!(broadcaster.send(MempoolEvents::MempoolActualTxUpdate {tx_hash }));
71                            }
72                        }
73                        run_sync!(broadcaster.send(MempoolEvents::MempoolTxUpdate {tx_hash }));
74                    }
75                }
76                drop(mempool_guard);
77            },
78            msg = block_header_rx.recv() => {
79                let block_header = match msg {
80                    Ok(message_block_header) => {message_block_header.inner}
81                     Err(e) => {
82                        match e {
83                            RecvError::Closed => {
84                                error!("Block header channel closed");
85                                break Err(eyre!("BLOCK_HEADER_RX_CLOSED"))
86                            }
87                            RecvError::Lagged(lag) => {
88                                error!("Block header channel lagged by {} messages", lag);
89                                continue;
90                            }
91                        }
92                    }
93                };
94
95                current_gas_price = block_header.header.get_base_fee();
96                let block_number = block_header.header.get_number();
97
98                let mempool_len = mempool.read().await.len();
99                debug!("Mempool len {}", mempool_len);
100
101
102                let mempool_read_guard = mempool.read().await;
103                let next_base_fee =  block_header.header.get_next_base_fee(&chain_parameters);
104
105                let ok_txes = mempool_read_guard.filter_ok_by_gas_price(next_base_fee as u128);
106                debug!("Mempool gas update {} {}", next_base_fee, ok_txes.len());
107                for mempool_tx in ok_txes {
108                    let tx = mempool_tx.tx.clone().unwrap();
109                    if tx.get_gas_limit()  < 50000 {
110                        continue
111                    }
112                    if mempool_read_guard.is_valid_tx(&tx) {
113                        let tx_hash = tx.get_tx_hash();
114                        trace!("new tx ok {:?}", tx_hash);
115                        run_sync!(broadcaster.send(MempoolEvents::MempoolActualTxUpdate { tx_hash }));
116                    } else{
117                       trace!("new tx gas change tx not valid {:?}", tx.get_tx_hash());
118                    }
119                }
120                drop(mempool_read_guard);
121
122                match last_cleaning_block {
123                    Some(bn)=>{
124                        if block_number - bn > 20 {
125                            let mut mempool_write_guard = mempool.write().await;
126                            info!("Start mempool cleaning started. len : {}", mempool_write_guard.len());
127                            mempool_write_guard.clean_txs( block_number - 50, Utc::now() - Duration::minutes(20) );
128                            last_cleaning_block = Some(block_number);
129                            info!("Start mempool cleaning finished len : {}", mempool_write_guard.len());
130                            drop(mempool_write_guard)
131                        }
132                    }
133                    None=>{
134                        last_cleaning_block = Some(block_number)
135                    }
136                }
137
138            },
139            msg = block_with_txs_rx.recv() => {
140                let block_with_txs = match msg {
141                    Ok(block_with_txs) => block_with_txs.inner.block,
142                    Err(e) => {
143                        match e {
144                            RecvError::Closed => {
145                                error!("Block with txs channel closed");
146                                break Err(eyre!("BLOCK_WITH_TXS_RX_CLOSED"))
147                            }
148                            RecvError::Lagged(lag) => {
149                                error!("Block with txs channel lagged by {} messages", lag);
150                                continue;
151                            }
152                        }
153                    }
154                };
155                let mut mempool_write_guard = mempool.write().await;
156
157                let mut mempool_tx_counter=0;
158                let tx_count = block_with_txs.get_transactions().len();
159                let mempool_size = mempool_write_guard.len();
160
161                for tx in block_with_txs.get_transactions() {
162
163                    if mempool_write_guard.is_tx(&tx.get_tx_hash()) {
164                        mempool_tx_counter += 1;
165                    }
166
167                    mempool_write_guard
168                        .set_mined(tx.get_tx_hash(), block_with_txs.get_header().get_number())
169                        .set_nonce(tx.get_from(), tx.get_nonce());
170                }
171                let start_time_utc =   chrono::Utc::now();
172                let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "mempool")
173                    .add_tag("block", block_with_txs.get_header().get_number())
174                    .add_field("tx_count_block", tx_count as u64)
175                    .add_field("tx_count_found", mempool_tx_counter)
176                    .add_field("tx_mempool_size", mempool_size as u64);
177
178                if let Err(e) = influxdb_write_channel_tx.send(write_query) {
179                       error!("Failed to send mempool stat to influxdb: {:?}", e);
180                }
181
182                drop(mempool_write_guard);
183            }
184        }
185    }
186}
187
188#[derive(Accessor, Consumer, Producer)]
189pub struct MempoolActor<LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
190    chain_parameters: ChainParameters,
191    #[accessor]
192    mempool: Option<SharedState<Mempool<LDT>>>,
193    #[consumer]
194    mempool_update_rx: Option<Broadcaster<MessageMempoolDataUpdate<LDT>>>,
195    #[consumer]
196    block_header_rx: Option<Broadcaster<MessageBlockHeader<LDT>>>,
197    #[consumer]
198    block_with_txs_rx: Option<Broadcaster<MessageBlock<LDT>>>,
199    #[producer]
200    mempool_events_tx: Option<Broadcaster<MempoolEvents>>,
201    #[producer]
202    influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
203}
204
205impl<LDT: KabuDataTypes> Default for MempoolActor<LDT> {
206    fn default() -> Self {
207        Self {
208            chain_parameters: ChainParameters::ethereum(),
209            mempool: None,
210            mempool_update_rx: None,
211            mempool_events_tx: None,
212            block_header_rx: None,
213            block_with_txs_rx: None,
214            influxdb_write_channel_tx: None,
215        }
216    }
217}
218
219impl<LDT: KabuDataTypes> MempoolActor<LDT> {
220    pub fn new() -> MempoolActor<LDT> {
221        MempoolActor::default()
222    }
223
224    pub fn on_bc(self, bc: &Blockchain<LDT>) -> MempoolActor<LDT> {
225        Self {
226            chain_parameters: bc.chain_parameters(),
227            mempool: Some(bc.mempool()),
228            mempool_update_rx: Some(bc.new_mempool_tx_channel()),
229            block_header_rx: Some(bc.new_block_headers_channel()),
230            block_with_txs_rx: Some(bc.new_block_with_tx_channel()),
231            mempool_events_tx: Some(bc.mempool_events_channel()),
232            influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
233        }
234    }
235}
236
237impl<LDT: KabuDataTypes> Actor for MempoolActor<LDT> {
238    fn start(&self) -> ActorResult {
239        let task = tokio::task::spawn(new_mempool_worker(
240            self.chain_parameters.clone(),
241            self.mempool.clone().unwrap(),
242            self.mempool_update_rx.clone().unwrap(),
243            self.block_header_rx.clone().unwrap(),
244            self.block_with_txs_rx.clone().unwrap(),
245            self.mempool_events_tx.clone().unwrap(),
246            self.influxdb_write_channel_tx.clone().unwrap(),
247        ));
248        Ok(vec![task])
249    }
250
251    fn name(&self) -> &'static str {
252        "MempoolActor"
253    }
254}