kabu_defi_preloader/
preloader_actor.rs

1use std::collections::btree_map::Entry;
2use std::collections::BTreeMap;
3use std::marker::PhantomData;
4
5use alloy_eips::{BlockId, BlockNumberOrTag};
6use alloy_network::Network;
7use alloy_primitives::{Address, Bytes, U256};
8use alloy_provider::Provider;
9use alloy_rpc_types_trace::geth::AccountState;
10use eyre::{eyre, Result};
11use kabu_core_actors::{Accessor, Actor, ActorResult, SharedState, WorkerResult};
12use kabu_core_actors_macros::Accessor;
13use kabu_core_blockchain::{Blockchain, BlockchainState};
14use kabu_defi_address_book::TokenAddressEth;
15use kabu_evm_utils::{BalanceCheater, NWETH};
16use kabu_types_blockchain::{GethStateUpdate, KabuDataTypes};
17use kabu_types_entities::{AccountNonceAndBalanceState, MarketState, TxSigners};
18use revm::{Database, DatabaseCommit, DatabaseRef};
19use tracing::{debug, error, trace};
20
21async fn fetch_account_state<P, N>(client: P, address: Address) -> Result<AccountState>
22where
23    N: Network,
24    P: Provider<N> + Send + Sync + Clone + 'static,
25{
26    let code = client.get_code_at(address).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await.ok();
27    let balance = client.get_balance(address).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await.ok();
28    let nonce = client.get_transaction_count(address).block_id(BlockId::Number(BlockNumberOrTag::Latest)).await.ok();
29
30    Ok(AccountState { balance, code, nonce, storage: BTreeMap::new() })
31}
32
33async fn set_monitor_token_balance(
34    account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>,
35    owner: Address,
36    token: Address,
37    balance: U256,
38) {
39    if let Some(account_nonce_balance) = account_nonce_balance_state {
40        debug!("set_monitor_balance {} {} {}", owner, token, balance);
41        let mut account_nonce_balance_guard = account_nonce_balance.write().await;
42        let entry = account_nonce_balance_guard.get_entry_or_default(owner);
43        debug!("set_monitor_balance {:?}", entry);
44
45        entry.add_balance(token, balance);
46    }
47}
48
49async fn set_monitor_nonce(account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>, owner: Address, nonce: u64) {
50    if let Some(account_nonce_balance) = account_nonce_balance_state {
51        debug!("set_monitor_nonce {} {}", owner, nonce);
52        let mut account_nonce_balance_guard = account_nonce_balance.write().await;
53        let entry = account_nonce_balance_guard.get_entry_or_default(owner);
54        debug!("set_monitor_nonce {:?}", entry);
55        entry.set_nonce(nonce);
56    }
57}
58
59pub async fn preload_market_state<P, N, DB>(
60    client: P,
61    copied_accounts_vec: Vec<Address>,
62    new_accounts_vec: Vec<(Address, u64, U256, Option<Bytes>)>,
63    token_balances_vec: Vec<(Address, Address, U256)>,
64    market_state: SharedState<MarketState<DB>>,
65    account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>,
66) -> WorkerResult
67where
68    N: Network,
69    P: Provider<N> + Send + Sync + Clone + 'static,
70    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
71{
72    let mut market_state_guard = market_state.write().await;
73
74    let mut state: GethStateUpdate = BTreeMap::new();
75
76    for address in copied_accounts_vec {
77        trace!("Loading address : {address}");
78        let acc_state = fetch_account_state(client.clone(), address).await?;
79
80        set_monitor_token_balance(account_nonce_balance_state.clone(), address, Address::ZERO, acc_state.balance.unwrap_or_default()).await;
81
82        set_monitor_nonce(account_nonce_balance_state.clone(), address, acc_state.nonce.unwrap_or_default()).await;
83        trace!("Loaded address : {address} {:?}", acc_state);
84
85        state.insert(address, acc_state);
86    }
87
88    for (address, nonce, balance, code) in new_accounts_vec {
89        trace!("new_accounts added {} {} {}", address, nonce, balance);
90        set_monitor_token_balance(account_nonce_balance_state.clone(), address, NWETH::NATIVE_ADDRESS, balance).await;
91        state.insert(address, AccountState { balance: Some(balance), code, nonce: Some(nonce), storage: BTreeMap::new() });
92    }
93
94    for (token, owner, balance) in token_balances_vec {
95        if token == TokenAddressEth::ETH_NATIVE {
96            match state.entry(owner) {
97                Entry::Vacant(e) => {
98                    e.insert(AccountState { balance: Some(balance), nonce: Some(0), code: None, storage: BTreeMap::new() });
99                }
100                Entry::Occupied(mut e) => {
101                    e.get_mut().balance = Some(balance);
102                }
103            }
104        } else {
105            match state.entry(token) {
106                Entry::Vacant(e) => {
107                    let mut acc_state = fetch_account_state(client.clone(), token).await?;
108                    acc_state.storage.insert(BalanceCheater::get_balance_cell(token, owner)?.into(), balance.into());
109                    e.insert(acc_state);
110                }
111                Entry::Occupied(mut e) => {
112                    e.get_mut().storage.insert(BalanceCheater::get_balance_cell(token, owner)?.into(), balance.into());
113                }
114            }
115        }
116
117        set_monitor_token_balance(account_nonce_balance_state.clone(), owner, token, balance).await;
118    }
119    market_state_guard.apply_geth_update(state);
120
121    Ok("DONE".to_string())
122}
123
124#[allow(dead_code)]
125#[derive(Accessor)]
126pub struct MarketStatePreloadedOneShotActor<P, N, DB> {
127    name: &'static str,
128    client: P,
129    copied_accounts: Vec<Address>,
130    new_accounts: Vec<(Address, u64, U256, Option<Bytes>)>,
131    token_balances: Vec<(Address, Address, U256)>,
132    #[accessor]
133    market_state: Option<SharedState<MarketState<DB>>>,
134    #[accessor]
135    account_nonce_balance_state: Option<SharedState<AccountNonceAndBalanceState>>,
136    _n: PhantomData<N>,
137}
138
139#[allow(dead_code)]
140impl<P, N, DB> MarketStatePreloadedOneShotActor<P, N, DB>
141where
142    N: Network,
143    P: Provider<N> + Send + Sync + Clone + 'static,
144    DB: DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
145{
146    fn name(&self) -> &'static str {
147        self.name
148    }
149
150    pub fn new(client: P) -> Self {
151        Self {
152            name: "MarketStatePreloadedOneShotActor",
153            client,
154            copied_accounts: Vec::new(),
155            new_accounts: Vec::new(),
156            token_balances: Vec::new(),
157            market_state: None,
158            account_nonce_balance_state: None,
159            _n: PhantomData,
160        }
161    }
162
163    pub fn with_name(self, name: &'static str) -> Self {
164        Self { name, ..self }
165    }
166
167    pub fn on_bc<LDT: KabuDataTypes>(self, bc: &Blockchain<LDT>, state: &BlockchainState<DB, LDT>) -> Self {
168        Self { account_nonce_balance_state: Some(bc.nonce_and_balance()), market_state: Some(state.market_state_commit()), ..self }
169    }
170
171    pub fn with_signers(self, tx_signers: SharedState<TxSigners>) -> Self {
172        match tx_signers.try_read() {
173            Ok(signers) => {
174                let mut addresses = self.copied_accounts;
175                addresses.extend(signers.get_address_vec());
176                Self { copied_accounts: addresses, ..self }
177            }
178            Err(e) => {
179                error!("tx_signers.try_read() {}", e);
180                self
181            }
182        }
183    }
184
185    pub fn with_copied_account(self, address: Address) -> Self {
186        let mut copied_accounts = self.copied_accounts;
187        copied_accounts.push(address);
188        Self { copied_accounts, ..self }
189    }
190
191    pub fn with_copied_accounts<T: Into<Address>>(self, address_vec: Vec<T>) -> Self {
192        let mut copied_accounts = self.copied_accounts;
193        let more: Vec<Address> = address_vec.into_iter().map(Into::<Address>::into).collect();
194        copied_accounts.extend(more);
195        Self { copied_accounts, ..self }
196    }
197
198    pub fn with_new_account(self, address: Address, nonce: u64, balance: U256, code: Option<Bytes>) -> Self {
199        let mut new_accounts = self.new_accounts;
200        new_accounts.push((address, nonce, balance, code));
201        Self { new_accounts, ..self }
202    }
203
204    pub fn with_token_balance(self, token: Address, owner: Address, balance: U256) -> Self {
205        let mut token_balances = self.token_balances;
206        token_balances.push((token, owner, balance));
207        Self { token_balances, ..self }
208    }
209}
210
211impl<P, N, DB> Actor for MarketStatePreloadedOneShotActor<P, N, DB>
212where
213    N: Network,
214    P: Provider<N> + Send + Sync + Clone + 'static,
215    DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
216{
217    fn start_and_wait(&self) -> Result<()> {
218        let rt = tokio::runtime::Runtime::new()?; // we need a different runtime to wait for the result
219        let handler = rt.spawn(preload_market_state(
220            self.client.clone(),
221            self.copied_accounts.clone(),
222            self.new_accounts.clone(),
223            self.token_balances.clone(),
224            self.market_state.clone().unwrap(),
225            self.account_nonce_balance_state.clone(),
226        ));
227
228        self.wait(Ok(vec![handler]))?;
229        rt.shutdown_background();
230        Ok(())
231    }
232
233    fn start(&self) -> ActorResult {
234        Err(eyre!("NEED_TO_BE_WAITED"))
235    }
236
237    fn name(&self) -> &'static str {
238        "MarketStatePreloadedOneShotActor"
239    }
240}