1use alloy_primitives::BlockNumber;
2use chrono::{Duration, Utc};
3use eyre::eyre;
4use influxdb::{Timestamp, WriteQuery};
5use tokio::sync::broadcast::error::RecvError;
6use tracing::{debug, error, info, trace};
7
8use kabu_core_actors::{run_sync, subscribe, Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
9use kabu_core_actors_macros::{Accessor, Consumer, Producer};
10use kabu_core_blockchain::Blockchain;
11use kabu_types_blockchain::{ChainParameters, Mempool, MempoolTx};
12use kabu_types_blockchain::{KabuBlock, KabuDataTypes, KabuDataTypesEthereum, KabuHeader, KabuTx};
13use kabu_types_events::{MempoolEvents, MessageBlock, MessageBlockHeader, MessageMempoolDataUpdate};
14
15pub async fn new_mempool_worker<LDT: KabuDataTypes>(
16 chain_parameters: ChainParameters,
17 mempool: SharedState<Mempool<LDT>>,
18 mempool_update_rx: Broadcaster<MessageMempoolDataUpdate<LDT>>,
19 block_header_rx: Broadcaster<MessageBlockHeader<LDT>>,
20 block_with_txs_rx: Broadcaster<MessageBlock<LDT>>,
21 broadcaster: Broadcaster<MempoolEvents>,
22 influxdb_write_channel_tx: Broadcaster<WriteQuery>,
23) -> WorkerResult {
24 subscribe!(mempool_update_rx);
25 subscribe!(block_header_rx);
26 subscribe!(block_with_txs_rx);
27
28 let mut current_gas_price: Option<u128> = None;
29 let mut last_cleaning_block: Option<BlockNumber> = None;
30
31 loop {
32 tokio::select! {
33 msg = mempool_update_rx.recv() => {
34 let mempool_update_msg = match msg {
35 Ok(mempool_update_msg) => mempool_update_msg,
36 Err(e) => {
37 match e {
38 RecvError::Closed => {
39 error!("Mempool update channel closed");
40 break Err(eyre!("MEMPOOL_UPDATE_RX_CLOSED"))
41 }
42 RecvError::Lagged(lag) => {
43 error!("Mempool update channel lagged by {} messages", lag);
44 continue;
45 }
46 }
47 }
48 };
49
50 let mut mempool_guard = mempool.write().await;
51 let tx_hash = mempool_update_msg.tx_hash;
52 let mempool_entry = mempool_guard.txs.entry(tx_hash).or_insert( MempoolTx::<LDT>{ tx_hash, source : mempool_update_msg.source(), ..MempoolTx::default()});
53 if let Some(logs) = &mempool_update_msg.mempool_tx.logs {
54 if mempool_entry.logs.is_none() {
55 mempool_entry.logs = Some(logs.clone());
56 run_sync!(broadcaster.send(MempoolEvents::MempoolLogUpdate {tx_hash } ));
57 }
58 }
59 if let Some(state_update) = &mempool_update_msg.mempool_tx.state_update {
60 if mempool_entry.state_update.is_none() {
61 mempool_entry.state_update = Some(state_update.clone());
62 run_sync!(broadcaster.send(MempoolEvents::MempoolStateUpdate{ tx_hash }));
63 }
64 }
65 if let Some(tx) = &mempool_update_msg.mempool_tx.tx {
66 if mempool_entry.tx.is_none() {
67 mempool_entry.tx = Some(tx.clone());
68 if let Some(cur_gas_price) = current_gas_price {
69 if tx.get_gas_limit() > 30000 && tx.get_gas_price() >= cur_gas_price && mempool_guard.is_valid_tx(tx) {
70 run_sync!(broadcaster.send(MempoolEvents::MempoolActualTxUpdate {tx_hash }));
71 }
72 }
73 run_sync!(broadcaster.send(MempoolEvents::MempoolTxUpdate {tx_hash }));
74 }
75 }
76 drop(mempool_guard);
77 },
78 msg = block_header_rx.recv() => {
79 let block_header = match msg {
80 Ok(message_block_header) => {message_block_header.inner}
81 Err(e) => {
82 match e {
83 RecvError::Closed => {
84 error!("Block header channel closed");
85 break Err(eyre!("BLOCK_HEADER_RX_CLOSED"))
86 }
87 RecvError::Lagged(lag) => {
88 error!("Block header channel lagged by {} messages", lag);
89 continue;
90 }
91 }
92 }
93 };
94
95 current_gas_price = block_header.header.get_base_fee();
96 let block_number = block_header.header.get_number();
97
98 let mempool_len = mempool.read().await.len();
99 debug!("Mempool len {}", mempool_len);
100
101
102 let mempool_read_guard = mempool.read().await;
103 let next_base_fee = block_header.header.get_next_base_fee(&chain_parameters);
104
105 let ok_txes = mempool_read_guard.filter_ok_by_gas_price(next_base_fee as u128);
106 debug!("Mempool gas update {} {}", next_base_fee, ok_txes.len());
107 for mempool_tx in ok_txes {
108 let tx = mempool_tx.tx.clone().unwrap();
109 if tx.get_gas_limit() < 50000 {
110 continue
111 }
112 if mempool_read_guard.is_valid_tx(&tx) {
113 let tx_hash = tx.get_tx_hash();
114 trace!("new tx ok {:?}", tx_hash);
115 run_sync!(broadcaster.send(MempoolEvents::MempoolActualTxUpdate { tx_hash }));
116 } else{
117 trace!("new tx gas change tx not valid {:?}", tx.get_tx_hash());
118 }
119 }
120 drop(mempool_read_guard);
121
122 match last_cleaning_block {
123 Some(bn)=>{
124 if block_number - bn > 20 {
125 let mut mempool_write_guard = mempool.write().await;
126 info!("Start mempool cleaning started. len : {}", mempool_write_guard.len());
127 mempool_write_guard.clean_txs( block_number - 50, Utc::now() - Duration::minutes(20) );
128 last_cleaning_block = Some(block_number);
129 info!("Start mempool cleaning finished len : {}", mempool_write_guard.len());
130 drop(mempool_write_guard)
131 }
132 }
133 None=>{
134 last_cleaning_block = Some(block_number)
135 }
136 }
137
138 },
139 msg = block_with_txs_rx.recv() => {
140 let block_with_txs = match msg {
141 Ok(block_with_txs) => block_with_txs.inner.block,
142 Err(e) => {
143 match e {
144 RecvError::Closed => {
145 error!("Block with txs channel closed");
146 break Err(eyre!("BLOCK_WITH_TXS_RX_CLOSED"))
147 }
148 RecvError::Lagged(lag) => {
149 error!("Block with txs channel lagged by {} messages", lag);
150 continue;
151 }
152 }
153 }
154 };
155 let mut mempool_write_guard = mempool.write().await;
156
157 let mut mempool_tx_counter=0;
158 let tx_count = block_with_txs.get_transactions().len();
159 let mempool_size = mempool_write_guard.len();
160
161 for tx in block_with_txs.get_transactions() {
162
163 if mempool_write_guard.is_tx(&tx.get_tx_hash()) {
164 mempool_tx_counter += 1;
165 }
166
167 mempool_write_guard
168 .set_mined(tx.get_tx_hash(), block_with_txs.get_header().get_number())
169 .set_nonce(tx.get_from(), tx.get_nonce());
170 }
171 let start_time_utc = chrono::Utc::now();
172 let write_query = WriteQuery::new(Timestamp::from(start_time_utc), "mempool")
173 .add_tag("block", block_with_txs.get_header().get_number())
174 .add_field("tx_count_block", tx_count as u64)
175 .add_field("tx_count_found", mempool_tx_counter)
176 .add_field("tx_mempool_size", mempool_size as u64);
177
178 if let Err(e) = influxdb_write_channel_tx.send(write_query) {
179 error!("Failed to send mempool stat to influxdb: {:?}", e);
180 }
181
182 drop(mempool_write_guard);
183 }
184 }
185 }
186}
187
188#[derive(Accessor, Consumer, Producer)]
189pub struct MempoolActor<LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
190 chain_parameters: ChainParameters,
191 #[accessor]
192 mempool: Option<SharedState<Mempool<LDT>>>,
193 #[consumer]
194 mempool_update_rx: Option<Broadcaster<MessageMempoolDataUpdate<LDT>>>,
195 #[consumer]
196 block_header_rx: Option<Broadcaster<MessageBlockHeader<LDT>>>,
197 #[consumer]
198 block_with_txs_rx: Option<Broadcaster<MessageBlock<LDT>>>,
199 #[producer]
200 mempool_events_tx: Option<Broadcaster<MempoolEvents>>,
201 #[producer]
202 influxdb_write_channel_tx: Option<Broadcaster<WriteQuery>>,
203}
204
205impl<LDT: KabuDataTypes> Default for MempoolActor<LDT> {
206 fn default() -> Self {
207 Self {
208 chain_parameters: ChainParameters::ethereum(),
209 mempool: None,
210 mempool_update_rx: None,
211 mempool_events_tx: None,
212 block_header_rx: None,
213 block_with_txs_rx: None,
214 influxdb_write_channel_tx: None,
215 }
216 }
217}
218
219impl<LDT: KabuDataTypes> MempoolActor<LDT> {
220 pub fn new() -> MempoolActor<LDT> {
221 MempoolActor::default()
222 }
223
224 pub fn on_bc(self, bc: &Blockchain<LDT>) -> MempoolActor<LDT> {
225 Self {
226 chain_parameters: bc.chain_parameters(),
227 mempool: Some(bc.mempool()),
228 mempool_update_rx: Some(bc.new_mempool_tx_channel()),
229 block_header_rx: Some(bc.new_block_headers_channel()),
230 block_with_txs_rx: Some(bc.new_block_with_tx_channel()),
231 mempool_events_tx: Some(bc.mempool_events_channel()),
232 influxdb_write_channel_tx: Some(bc.influxdb_write_channel()),
233 }
234 }
235}
236
237impl<LDT: KabuDataTypes> Actor for MempoolActor<LDT> {
238 fn start(&self) -> ActorResult {
239 let task = tokio::task::spawn(new_mempool_worker(
240 self.chain_parameters.clone(),
241 self.mempool.clone().unwrap(),
242 self.mempool_update_rx.clone().unwrap(),
243 self.block_header_rx.clone().unwrap(),
244 self.block_with_txs_rx.clone().unwrap(),
245 self.mempool_events_tx.clone().unwrap(),
246 self.influxdb_write_channel_tx.clone().unwrap(),
247 ));
248 Ok(vec![task])
249 }
250
251 fn name(&self) -> &'static str {
252 "MempoolActor"
253 }
254}