1use std::collections::btree_map::Entry;
2use std::collections::BTreeMap;
3use std::marker::PhantomData;
4
5use alloy_eips::{BlockId, BlockNumberOrTag};
6use alloy_network::Network;
7use alloy_primitives::{Address, Bytes, U256};
8use alloy_provider::Provider;
9use alloy_rpc_types_trace::geth::AccountState;
10use eyre::{eyre, Result};
11use kabu_core_actors::{Accessor, Actor, ActorResult, SharedState, WorkerResult};
12use kabu_core_actors_macros::Accessor;
13use kabu_core_blockchain::{Blockchain, BlockchainState};
14use kabu_defi_address_book::TokenAddressEth;
15use kabu_evm_utils::{BalanceCheater, NWETH};
16use kabu_types_blockchain::{GethStateUpdate, KabuDataTypes};
17use kabu_types_entities::{AccountNonceAndBalanceState, MarketState, TxSigners};
18use revm::{Database, DatabaseCommit, DatabaseRef};
19use tracing::{debug, error, trace};
20
21async fn fetch_account_state<P, N>(client: P, address: Address) -> Result<AccountState>
22where
23 N: Network,
24 P: Provider<N> + Send + Sync + Clone + 'static,
25{
26 let code = client.get_code_at(address).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await.ok();
27 let balance = client.get_balance(address).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await.ok();
28 let nonce = client.get_transaction_count(address).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await.ok();
29
30 Ok(AccountState { balance, code, nonce, storage: BTreeMap::new() })
31}
32
33async fn set_monitor_token_balance(
34 account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>,
35 owner: Address,
36 token: Address,
37 balance: U256,
38) {
39 if let Some(account_nonce_balance) = account_nonce_balance_state {
40 debug!("set_monitor_balance {} {} {}", owner, token, balance);
41 let mut account_nonce_balance_guard = account_nonce_balance.write().await;
42 let entry = account_nonce_balance_guard.get_entry_or_default(owner);
43 debug!("set_monitor_balance {:?}", entry);
44
45 entry.add_balance(token, balance);
46 }
47}
48
49async fn set_monitor_nonce(account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>, owner: Address, nonce: u64) {
50 if let Some(account_nonce_balance) = account_nonce_balance_state {
51 debug!("set_monitor_nonce {} {}", owner, nonce);
52 let mut account_nonce_balance_guard = account_nonce_balance.write().await;
53 let entry = account_nonce_balance_guard.get_entry_or_default(owner);
54 debug!("set_monitor_nonce {:?}", entry);
55 entry.set_nonce(nonce);
56 }
57}
58
59pub async fn preload_market_state<P, N, DB>(
60 client: P,
61 copied_accounts_vec: Vec<Address>,
62 new_accounts_vec: Vec<(Address, u64, U256, Option<Bytes>)>,
63 token_balances_vec: Vec<(Address, Address, U256)>,
64 market_state: SharedState<MarketState<DB>>,
65 account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>,
66) -> WorkerResult
67where
68 N: Network,
69 P: Provider<N> + Send + Sync + Clone + 'static,
70 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
71{
72 let mut market_state_guard = market_state.write().await;
73
74 let mut state: GethStateUpdate = BTreeMap::new();
75
76 for address in copied_accounts_vec {
77 trace!("Loading address : {address}");
78 let acc_state = fetch_account_state(client.clone(), address).await?;
79
80 set_monitor_token_balance(account_nonce_balance_state.clone(), address, Address::ZERO, acc_state.balance.unwrap_or_default()).await;
81
82 set_monitor_nonce(account_nonce_balance_state.clone(), address, acc_state.nonce.unwrap_or_default()).await;
83 trace!("Loaded address : {address} {:?}", acc_state);
84
85 state.insert(address, acc_state);
86 }
87
88 for (address, nonce, balance, code) in new_accounts_vec {
89 trace!("new_accounts added {} {} {}", address, nonce, balance);
90 set_monitor_token_balance(account_nonce_balance_state.clone(), address, NWETH::NATIVE_ADDRESS, balance).await;
91 state.insert(address, AccountState { balance: Some(balance), code, nonce: Some(nonce), storage: BTreeMap::new() });
92 }
93
94 for (token, owner, balance) in token_balances_vec {
95 if token == TokenAddressEth::ETH_NATIVE {
96 match state.entry(owner) {
97 Entry::Vacant(e) => {
98 e.insert(AccountState { balance: Some(balance), nonce: Some(0), code: None, storage: BTreeMap::new() });
99 }
100 Entry::Occupied(mut e) => {
101 e.get_mut().balance = Some(balance);
102 }
103 }
104 } else {
105 match state.entry(token) {
106 Entry::Vacant(e) => {
107 let mut acc_state = fetch_account_state(client.clone(), token).await?;
108 acc_state.storage.insert(BalanceCheater::get_balance_cell(token, owner)?.into(), balance.into());
109 e.insert(acc_state);
110 }
111 Entry::Occupied(mut e) => {
112 e.get_mut().storage.insert(BalanceCheater::get_balance_cell(token, owner)?.into(), balance.into());
113 }
114 }
115 }
116
117 set_monitor_token_balance(account_nonce_balance_state.clone(), owner, token, balance).await;
118 }
119 market_state_guard.apply_geth_update(state);
120
121 Ok("DONE".to_string())
122}
123
124#[allow(dead_code)]
125#[derive(Accessor)]
126pub struct MarketStatePreloadedOneShotActor<P, N, DB> {
127 name: &'static str,
128 client: P,
129 copied_accounts: Vec<Address>,
130 new_accounts: Vec<(Address, u64, U256, Option<Bytes>)>,
131 token_balances: Vec<(Address, Address, U256)>,
132 #[accessor]
133 market_state: Option<SharedState<MarketState<DB>>>,
134 #[accessor]
135 account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>,
136 _n: PhantomData<N>,
137}
138
139#[allow(dead_code)]
140impl<P, N, DB> MarketStatePreloadedOneShotActor<P, N, DB>
141where
142 N: Network,
143 P: Provider<N> + Send + Sync + Clone + 'static,
144 DB: DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
145{
146 fn name(&self) -> &'static str {
147 self.name
148 }
149
150 pub fn new(client: P) -> Self {
151 Self {
152 name: "MarketStatePreloadedOneShotActor",
153 client,
154 copied_accounts: Vec::new(),
155 new_accounts: Vec::new(),
156 token_balances: Vec::new(),
157 market_state: None,
158 account_nonce_balance_state: None,
159 _n: PhantomData,
160 }
161 }
162
163 pub fn with_name(self, name: &'static str) -> Self {
164 Self { name, ..self }
165 }
166
167 pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain<LDT>, state: &BlockchainState<DB, LDT>) -> Self {
168 Self { account_nonce_balance_state: Some(bc.nonce_and_balance()), market_state: Some(state.market_state_commit()), ..self }
169 }
170
171 pub fn with_signers(self, tx_signers: SharedState<TxSigners>) -> Self {
172 match tx_signers.try_read() {
173 Ok(signers) => {
174 let mut addresses = self.copied_accounts;
175 addresses.extend(signers.get_address_vec());
176 Self { copied_accounts: addresses, ..self }
177 }
178 Err(e) => {
179 error!("tx_signers.try_read() {}", e);
180 self
181 }
182 }
183 }
184
185 pub fn with_copied_account(self, address: Address) -> Self {
186 let mut copied_accounts = self.copied_accounts;
187 copied_accounts.push(address);
188 Self { copied_accounts, ..self }
189 }
190
191 pub fn with_copied_accounts<T: Into<Address>>(self, address_vec: Vec<T>) -> Self {
192 let mut copied_accounts = self.copied_accounts;
193 let more: Vec<Address> = address_vec.into_iter().map(Into::<Address>::into).collect();
194 copied_accounts.extend(more);
195 Self { copied_accounts, ..self }
196 }
197
198 pub fn with_new_account(self, address: Address, nonce: u64, balance: U256, code: Option<Bytes>) -> Self {
199 let mut new_accounts = self.new_accounts;
200 new_accounts.push((address, nonce, balance, code));
201 Self { new_accounts, ..self }
202 }
203
204 pub fn with_token_balance(self, token: Address, owner: Address, balance: U256) -> Self {
205 let mut token_balances = self.token_balances;
206 token_balances.push((token, owner, balance));
207 Self { token_balances, ..self }
208 }
209}
210
211impl<P, N, DB> Actor for MarketStatePreloadedOneShotActor<P, N, DB>
212where
213 N: Network,
214 P: Provider<N> + Send + Sync + Clone + 'static,
215 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
216{
217 fn start_and_wait(&self) -> Result<()> {
218 let rt = tokio::runtime::Runtime::new()?; let handler = rt.spawn(preload_market_state(
220 self.client.clone(),
221 self.copied_accounts.clone(),
222 self.new_accounts.clone(),
223 self.token_balances.clone(),
224 self.market_state.clone().unwrap(),
225 self.account_nonce_balance_state.clone(),
226 ));
227
228 self.wait(Ok(vec![handler]))?;
229 rt.shutdown_background();
230 Ok(())
231 }
232
233 fn start(&self) -> ActorResult {
234 Err(eyre!("NEED_TO_BE_WAITED"))
235 }
236
237 fn name(&self) -> &'static str {
238 "MarketStatePreloadedOneShotActor"
239 }
240}