kabu_broadcast_accounts/accounts_monitor/
accounts_actor.rs1use alloy_consensus::Transaction;
2use alloy_eips::{BlockId, BlockNumberOrTag};
3use alloy_network::Network;
4use alloy_primitives::{Address, Log, U256};
5use alloy_provider::Provider;
6use alloy_rpc_types::eth::Log as EthLog;
7use alloy_rpc_types::TransactionTrait;
8use alloy_sol_types::SolEventInterface;
9use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, SharedState, WorkerResult};
10use kabu_core_actors_macros::{Accessor, Consumer};
11use kabu_core_blockchain::Blockchain;
12use kabu_defi_abi::IERC20::IERC20Events;
13use kabu_types_blockchain::{KabuBlock, KabuDataTypes, KabuTx};
14use kabu_types_entities::{AccountNonceAndBalanceState, LatestBlock};
15use kabu_types_events::MarketEvents;
16use std::marker::PhantomData;
17use std::time::Duration;
18use tokio::sync::broadcast::error::RecvError;
19use tokio::time::sleep;
20use tracing::debug;
21
22pub async fn nonce_and_balance_fetcher_worker<P, N>(
23 client: P,
24 accounts_state: SharedState<AccountNonceAndBalanceState>,
25 only_once: bool,
26) -> WorkerResult
27where
28 N: Network,
29 P: Provider<N> + Send + Sync + Clone + 'static,
30{
31 let eth_addr = Address::ZERO;
32
33 loop {
34 let accounts = accounts_state.read().await.get_accounts_vec();
35 for addr in accounts.into_iter() {
36 let nonce = client.get_transaction_count(addr).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await;
37 let balance = client.get_balance(addr).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await;
38
39 if let Some(acc) = accounts_state.write().await.get_mut_account(&addr) {
40 if let Ok(nonce) = nonce {
41 acc.set_nonce(nonce);
42 }
43 if let Ok(balance) = balance {
44 acc.set_balance(eth_addr, balance);
45 }
46 };
47 debug!("Account {} nonce {:?} balance {:?}", addr, nonce, balance);
48 }
49 if only_once {
50 break;
51 }
52
53 sleep(Duration::from_secs(20)).await;
54 }
55 Ok("Nonce and balance fetcher finished".to_string())
56}
57
58pub async fn nonce_and_balance_monitor_worker<LDT>(
59 accounts_state: SharedState<AccountNonceAndBalanceState>,
60 latest_block: SharedState<LatestBlock<LDT>>,
61 market_events_rx: Broadcaster<MarketEvents>,
62) -> WorkerResult
63where
64 LDT: KabuDataTypes<Log = EthLog>,
65{
66 let mut market_events = market_events_rx.subscribe();
67
68 loop {
69 tokio::select! {
70 msg = market_events.recv() => {
71 let market_event_msg : Result<MarketEvents, RecvError> = msg;
72 if let Ok(market_event_msg) = market_event_msg {
73 match market_event_msg {
74 MarketEvents::BlockTxUpdate{ .. }=>{
75 if let Some(block) = latest_block.read().await.block_with_txs.clone() {
76 let txs = block.get_transactions();
77
78 let mut accounts_lock = accounts_state.write().await;
80
81 for tx in txs {
82 let tx_from : Address = tx.get_from();
83 if accounts_lock.is_monitored(&tx_from) {
84 if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&tx_from) {
85 let spent = (TransactionTrait::max_fee_per_gas(&tx) + tx.max_priority_fee_per_gas().unwrap()) * tx.get_gas_limit() as u128 + tx.value().to::<u128>();
86 let value = U256::from(spent);
87 account.sub_balance(Address::ZERO, value).set_nonce(tx.get_nonce()+1);
88 debug!("Account {} : sub ETH balance {} -> {} nonce {}", tx_from, value, account.get_eth_balance(), tx.get_nonce()+1);
89 }
90 }
91
92 if let Some(to ) = tx.to() {
93 let to_addr: Address = to;
94 if accounts_lock.is_monitored(&to_addr) {
95 if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&to_addr) {
96 account.add_balance(Address::ZERO, tx.value());
97 debug!("Account {} : add ETH balance {} -> {}", to, tx.value(), account.get_eth_balance());
98 }
99 }
100 }
101 }
102
103 }
104 },
105 MarketEvents::BlockLogsUpdate { .. }=>{
106 let latest_block_guard = latest_block.read().await;
107 if let Some(logs) = latest_block_guard.logs.clone(){
108
109 let mut accounts_lock = accounts_state.write().await;
111
112 for log_entry in logs.iter() {
113 let log_entry: Option<Log> = Log::new(log_entry.inner.address, log_entry.topics().to_vec(), log_entry.inner.data.data.clone());
114 if let Some(log_entry) = log_entry {
115 if let Ok(event) = IERC20Events::decode_log(&log_entry){
116 if let IERC20Events::Transfer(event) = event.data {
117 if accounts_lock.is_monitored(&event.to) {
119 if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&event.to) {
120 account.add_balance(log_entry.address, event.value);
121 debug!("Account {} : add ERC20 {} balance {} -> {}", event.to, log_entry.address, event.value, account.get_balance(&log_entry.address));
122 }
123 } else if accounts_lock.is_monitored(&event.from) {
124 if let Some(&mut ref mut account) = accounts_lock.get_mut_account(&event.from) {
125 account.sub_balance(log_entry.address, event.value);
126 debug!("Account {} : sub ERC20 {} balance {} -> {}", event.from, log_entry.address, event.value, account.get_balance(&log_entry.address));
127 }
128 }
129 }
130 }
131 }
132 }
133 drop(accounts_lock);
134
135 }
136 }
137 _=>{}
138 }
139 }
140 }
141 }
142 }
143}
144
145#[derive(Accessor, Consumer)]
146pub struct NonceAndBalanceMonitorActor<P, N, LDT: KabuDataTypes + 'static> {
147 client: P,
148 only_once: bool,
149 with_fetcher: bool,
150 #[accessor]
151 accounts_nonce_and_balance: Option<SharedState<AccountNonceAndBalanceState>>,
152 #[accessor]
153 latest_block: Option<SharedState<LatestBlock<LDT>>>,
154 #[consumer]
155 market_events: Option<Broadcaster<MarketEvents>>,
156 _n: PhantomData<N>,
157}
158
159impl<P, N, LDT> NonceAndBalanceMonitorActor<P, N, LDT>
160where
161 N: Network,
162 P: Provider<N> + Send + Sync + Clone + 'static,
163 LDT: KabuDataTypes + 'static,
164{
165 pub fn new(client: P) -> NonceAndBalanceMonitorActor<P, N, LDT> {
166 NonceAndBalanceMonitorActor {
167 client,
168 accounts_nonce_and_balance: None,
169 latest_block: None,
170 market_events: None,
171 only_once: false,
172 with_fetcher: true,
173 _n: PhantomData,
174 }
175 }
176
177 pub fn only_once(self) -> Self {
178 Self { only_once: true, ..self }
179 }
180
181 pub fn without_fetcher(self) -> Self {
182 Self { with_fetcher: false, ..self }
183 }
184
185 pub fn on_bc(self, bc: &Blockchain<LDT>) -> NonceAndBalanceMonitorActor<P, N, LDT> {
186 NonceAndBalanceMonitorActor {
187 accounts_nonce_and_balance: Some(bc.nonce_and_balance()),
188 latest_block: Some(bc.latest_block().clone()),
189 market_events: Some(bc.market_events_channel().clone()),
190 ..self
191 }
192 }
193}
194
195impl<P, N, LDT> Actor for NonceAndBalanceMonitorActor<P, N, LDT>
196where
197 N: Network,
198 P: Provider<N> + Send + Sync + Clone + 'static,
199 LDT: KabuDataTypes<Log = EthLog> + 'static,
200{
201 fn start(&self) -> ActorResult {
202 let mut handles = Vec::new();
203
204 if self.with_fetcher {
205 let fetcher_task = tokio::task::spawn(nonce_and_balance_fetcher_worker(
206 self.client.clone(),
207 self.accounts_nonce_and_balance.clone().unwrap(),
208 self.only_once,
209 ));
210
211 if self.only_once {
212 loop {
213 if fetcher_task.is_finished() {
214 break;
215 }
216 std::thread::sleep(Duration::from_millis(100));
217 }
218 } else {
219 handles.push(fetcher_task);
220 }
221 }
222
223 let monitor_task = tokio::task::spawn(nonce_and_balance_monitor_worker(
224 self.accounts_nonce_and_balance.clone().unwrap(),
225 self.latest_block.clone().unwrap(),
226 self.market_events.clone().unwrap(),
227 ));
228 handles.push(monitor_task);
229
230 Ok(handles)
231 }
232
233 fn name(&self) -> &'static str {
234 "NonceAndBalanceMonitorActor"
235 }
236}