kabu_types_blockchain/
mempool.rs

1use crate::kabu_data_types::KabuTx;
2use crate::{AccountNonceAndTransactions, FetchState, GethStateUpdate, MempoolTx};
3use crate::{KabuDataTypes, KabuDataTypesEthereum};
4use alloy_primitives::map::HashMap;
5use alloy_primitives::BlockNumber;
6use alloy_primitives::{Address, TxHash};
7use chrono::{DateTime, Utc};
8use eyre::{eyre, Result};
9use std::collections::hash_map::Entry;
10
11#[derive(Clone, Debug, Default)]
12pub struct Mempool<LDT: KabuDataTypes = KabuDataTypesEthereum> {
13    pub txs: HashMap<TxHash, MempoolTx<LDT>>,
14    accounts: HashMap<Address, AccountNonceAndTransactions>,
15}
16
17impl<LDT: KabuDataTypes> Mempool<LDT> {
18    pub fn new() -> Mempool<KabuDataTypesEthereum> {
19        Mempool { txs: HashMap::default(), accounts: HashMap::default() }
20    }
21
22    pub fn len(&self) -> usize {
23        self.txs.len()
24    }
25
26    pub fn is_empty(&self) -> bool {
27        self.txs.is_empty()
28    }
29
30    pub fn is_tx(&self, tx_hash: &TxHash) -> bool {
31        self.txs.contains_key(tx_hash)
32    }
33
34    pub fn add_tx(&mut self, tx: LDT::Transaction) -> &mut Self {
35        let tx_hash: TxHash = tx.get_tx_hash();
36        let entry = self.txs.entry(tx_hash).or_default();
37        entry.tx = Some(tx);
38        self
39    }
40
41    pub fn add_tx_logs(&mut self, tx_hash: TxHash, logs: Vec<LDT::Log>) -> &mut Self {
42        let entry = self.txs.entry(tx_hash).or_default();
43        entry.logs = Some(logs);
44        self
45    }
46
47    pub fn add_tx_state_change(&mut self, tx_hash: TxHash, state_update: LDT::StateUpdate) -> &mut Self {
48        let entry = self.txs.entry(tx_hash).or_default();
49        entry.state_update = Some(state_update);
50        self
51    }
52
53    pub fn filter_by_gas_price(&self, gas_price: u128) -> Vec<&MempoolTx<LDT>> {
54        self.txs
55            .values()
56            .filter(|&item| item.mined.is_none() && item.tx.clone().map_or_else(|| false, |tx| tx.get_gas_price() >= gas_price))
57            .collect()
58    }
59
60    pub fn filter_ok_by_gas_price(&self, gas_price: u128) -> Vec<&MempoolTx<LDT>> {
61        self.txs
62            .values()
63            .filter(|&item| {
64                item.mined.is_none()
65                    && !item.failed.unwrap_or(false)
66                    && item.tx.clone().map_or_else(|| false, |tx| tx.get_gas_price() >= gas_price)
67            })
68            .collect()
69    }
70
71    pub fn filter_on_block(&self, block_number: BlockNumber) -> Vec<&MempoolTx<LDT>> {
72        self.txs.values().filter(|&item| item.mined == Some(block_number)).collect()
73    }
74
75    pub fn is_mined(&self, tx_hash: &TxHash) -> bool {
76        match self.txs.get(tx_hash) {
77            Some(tx) => tx.mined.is_some(),
78            None => false,
79        }
80    }
81
82    pub fn is_failed(&self, tx_hash: &TxHash) -> bool {
83        match self.txs.get(tx_hash) {
84            Some(e) => e.failed.unwrap_or(false),
85            None => false,
86        }
87    }
88
89    pub fn clean(&mut self) {
90        self.txs = Default::default();
91        self.accounts = Default::default();
92    }
93
94    pub fn clean_txs(&mut self, max_block_number: BlockNumber, max_time: DateTime<Utc>) {
95        self.txs = self
96            .txs
97            .clone()
98            .into_iter()
99            .filter(|(_, v)| v.mined.unwrap_or(max_block_number + 1) > max_block_number && v.time > max_time)
100            .collect();
101    }
102
103    pub fn set_mined(&mut self, tx_hash: TxHash, block_number: BlockNumber) -> &mut Self {
104        let entry = self.txs.entry(tx_hash).or_default();
105        entry.mined = Some(block_number);
106        self
107    }
108
109    pub fn set_failed(&mut self, tx_hash: TxHash) {
110        if let Entry::Occupied(mut e) = self.txs.entry(tx_hash) {
111            let value = e.get_mut();
112            value.failed = Some(true)
113        }
114    }
115
116    pub fn set_nonce(&mut self, account: Address, nonce: u64) -> &mut Self {
117        let entry = self.accounts.entry(account).or_default();
118        entry.set_nonce(Some(nonce));
119        self
120    }
121
122    pub fn is_valid_tx(&self, tx: &LDT::Transaction) -> bool {
123        self.accounts.get(&tx.get_from()).map_or_else(|| true, |acc| acc.nonce.map_or_else(|| true, |nonce| tx.get_nonce() == nonce + 1))
124    }
125
126    pub fn get_tx_by_hash(&self, tx_hash: &TxHash) -> Option<&MempoolTx<LDT>> {
127        self.txs.get(tx_hash)
128    }
129
130    pub fn get_or_fetch_pre_state(&mut self, _tx_hash: &TxHash) -> Result<FetchState<GethStateUpdate>> {
131        Err(eyre!("NOT_IMPLEMENTED"))
132    }
133
134    pub fn remove_tx(&mut self, tx_hash: &TxHash) -> Option<MempoolTx<LDT>> {
135        self.txs.remove(tx_hash)
136    }
137}