kabu_broadcast_accounts/accounts_monitor/
accounts_actor.rs

1use alloy_consensus::Transaction;
2use alloy_eips::{BlockId, BlockNumberOrTag};
3use alloy_network::Network;
4use alloy_primitives::{Address, Log, U256};
5use alloy_provider::Provider;
6use alloy_rpc_types::eth::Log as EthLog;
7use alloy_rpc_types::TransactionTrait;
8use alloy_sol_types::SolEventInterface;
9use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, SharedState, WorkerResult};
10use kabu_core_actors_macros::{Accessor, Consumer};
11use kabu_core_blockchain::Blockchain;
12use kabu_defi_abi::IERC20::IERC20Events;
13use kabu_types_blockchain::{KabuBlock, KabuDataTypes, KabuTx};
14use kabu_types_entities::{AccountNonceAndBalanceState, LatestBlock};
15use kabu_types_events::MarketEvents;
16use std::marker::PhantomData;
17use std::time::Duration;
18use tokio::sync::broadcast::error::RecvError;
19use tokio::time::sleep;
20use tracing::debug;
21
22pub async fn nonce_and_balance_fetcher_worker<P, N>(
23    client: P,
24    accounts_state: SharedState<AccountNonceAndBalanceState>,
25    only_once: bool,
26) -> WorkerResult
27where
28    N: Network,
29    P: Provider<N> + Send + Sync + Clone + 'static,
30{
31    let eth_addr = Address::ZERO;
32
33    loop {
34        let accounts = accounts_state.read().await.get_accounts_vec();
35        for addr in accounts.into_iter() {
36            let nonce = client.get_transaction_count(addr).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await;
37            let balance = client.get_balance(addr).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await;
38
39            if let Some(acc) = accounts_state.write().await.get_mut_account(&addr) {
40                if let Ok(nonce) = nonce {
41                    acc.set_nonce(nonce);
42                }
43                if let Ok(balance) = balance {
44                    acc.set_balance(eth_addr, balance);
45                }
46            };
47            debug!("Account {} nonce {:?} balance {:?}", addr, nonce, balance);
48        }
49        if only_once {
50            break;
51        }
52
53        sleep(Duration::from_secs(20)).await;
54    }
55    Ok("Nonce and balance fetcher finished".to_string())
56}
57
58pub async fn nonce_and_balance_monitor_worker<LDT>(
59    accounts_state: SharedState<AccountNonceAndBalanceState>,
60    latest_block: SharedState<LatestBlock<LDT>>,
61    market_events_rx: Broadcaster<MarketEvents>,
62) -> WorkerResult
63where
64    LDT: KabuDataTypes<Log = EthLog>,
65{
66    let mut market_events = market_events_rx.subscribe();
67
68    loop {
69        tokio::select! {
70            msg = market_events.recv() => {
71                let market_event_msg : Result<MarketEvents, RecvError> = msg;
72                if let Ok(market_event_msg) = market_event_msg {
73                    match market_event_msg {
74                        MarketEvents::BlockTxUpdate{  .. }=>{
75                            if let Some(block) = latest_block.read().await.block_with_txs.clone() {
76                                let txs = block.get_transactions();
77
78                                // acquire accounts shared state write lock
79                                let mut accounts_lock = accounts_state.write().await;
80
81                                for tx in txs {
82                                    let tx_from : Address = tx.get_from();
83                                    if accounts_lock.is_monitored(&tx_from) {
84                                        if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&tx_from) {
85                                            let spent = (TransactionTrait::max_fee_per_gas(&tx) + tx.max_priority_fee_per_gas().unwrap()) * tx.get_gas_limit() as u128 + tx.value().to::<u128>();
86                                            let value = U256::from(spent);
87                                            account.sub_balance(Address::ZERO, value).set_nonce(tx.get_nonce()+1);
88                                            debug!("Account {} : sub ETH balance {} -> {} nonce {}", tx_from, value, account.get_eth_balance(), tx.get_nonce()+1);
89                                        }
90                                    }
91
92                                    if let Some(to )  = tx.to() {
93                                        let to_addr: Address = to;
94                                        if accounts_lock.is_monitored(&to_addr) {
95                                            if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&to_addr) {
96                                                account.add_balance(Address::ZERO, tx.value());
97                                                debug!("Account {} : add ETH balance {} -> {}", to, tx.value(), account.get_eth_balance());
98                                            }
99                                        }
100                                    }
101                                }
102
103                            }
104                        },
105                        MarketEvents::BlockLogsUpdate {  .. }=>{
106                            let latest_block_guard = latest_block.read().await;
107                            if let Some(logs) = latest_block_guard.logs.clone(){
108
109                                    // acquire accounts shared state write lock
110                                    let mut accounts_lock = accounts_state.write().await;
111
112                                    for log_entry in logs.iter() {
113                                        let log_entry: Option<Log> = Log::new(log_entry.inner.address, log_entry.topics().to_vec(), log_entry.inner.data.data.clone());
114                                        if let Some(log_entry) = log_entry {
115                                            if let Ok(event) = IERC20Events::decode_log(&log_entry){
116                                                if let  IERC20Events::Transfer(event) = event.data {
117                                                    //debug!("ERC20TransferEvent {} : {:?}", log_entry.address, event);
118                                                    if accounts_lock.is_monitored(&event.to) {
119                                                        if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&event.to) {
120                                                            account.add_balance(log_entry.address, event.value);
121                                                            debug!("Account {} : add ERC20 {} balance {} -> {}", event.to, log_entry.address, event.value, account.get_balance(&log_entry.address));
122                                                        }
123                                                    } else if accounts_lock.is_monitored(&event.from) {
124                                                        if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&event.from) {
125                                                            account.sub_balance(log_entry.address, event.value);
126                                                            debug!("Account {} : sub ERC20 {} balance {} -> {}", event.from, log_entry.address, event.value, account.get_balance(&log_entry.address));
127                                                        }
128                                                    }
129                                                }
130                                            }
131                                        }
132                                    }
133                                    drop(accounts_lock);
134
135                            }
136                        }
137                        _=>{}
138                    }
139                }
140            }
141        }
142    }
143}
144
145#[derive(Accessor, Consumer)]
146pub struct NonceAndBalanceMonitorActor<P, N, LDT: KabuDataTypes + 'static> {
147    client: P,
148    only_once: bool,
149    with_fetcher: bool,
150    #[accessor]
151    accounts_nonce_and_balance: Option<SharedState<AccountNonceAndBalanceState>>,
152    #[accessor]
153    latest_block: Option<SharedState<LatestBlock<LDT>>>,
154    #[consumer]
155    market_events: Option<Broadcaster<MarketEvents>>,
156    _n: PhantomData<N>,
157}
158
159impl<P, N, LDT> NonceAndBalanceMonitorActor<P, N, LDT>
160where
161    N: Network,
162    P: Provider<N> + Send + Sync + Clone + 'static,
163    LDT: KabuDataTypes + 'static,
164{
165    pub fn new(client: P) -> NonceAndBalanceMonitorActor<P, N, LDT> {
166        NonceAndBalanceMonitorActor {
167            client,
168            accounts_nonce_and_balance: None,
169            latest_block: None,
170            market_events: None,
171            only_once: false,
172            with_fetcher: true,
173            _n: PhantomData,
174        }
175    }
176
177    pub fn only_once(self) -> Self {
178        Self { only_once: true, ..self }
179    }
180
181    pub fn without_fetcher(self) -> Self {
182        Self { with_fetcher: false, ..self }
183    }
184
185    pub fn on_bc(self, bc: &Blockchain<LDT>) -> NonceAndBalanceMonitorActor<P, N, LDT> {
186        NonceAndBalanceMonitorActor {
187            accounts_nonce_and_balance: Some(bc.nonce_and_balance()),
188            latest_block: Some(bc.latest_block().clone()),
189            market_events: Some(bc.market_events_channel().clone()),
190            ..self
191        }
192    }
193}
194
195impl<P, N, LDT> Actor for NonceAndBalanceMonitorActor<P, N, LDT>
196where
197    N: Network,
198    P: Provider<N> + Send + Sync + Clone + 'static,
199    LDT: KabuDataTypes<Log = EthLog> + 'static,
200{
201    fn start(&self) -> ActorResult {
202        let mut handles = Vec::new();
203
204        if self.with_fetcher {
205            let fetcher_task = tokio::task::spawn(nonce_and_balance_fetcher_worker(
206                self.client.clone(),
207                self.accounts_nonce_and_balance.clone().unwrap(),
208                self.only_once,
209            ));
210
211            if self.only_once {
212                loop {
213                    if fetcher_task.is_finished() {
214                        break;
215                    }
216                    std::thread::sleep(Duration::from_millis(100));
217                }
218            } else {
219                handles.push(fetcher_task);
220            }
221        }
222
223        let monitor_task = tokio::task::spawn(nonce_and_balance_monitor_worker(
224            self.accounts_nonce_and_balance.clone().unwrap(),
225            self.latest_block.clone().unwrap(),
226            self.market_events.clone().unwrap(),
227        ));
228        handles.push(monitor_task);
229
230        Ok(handles)
231    }
232
233    fn name(&self) -> &'static str {
234        "NonceAndBalanceMonitorActor"
235    }
236}