kabu_core_blockchain_actors/
actor.rs

1use alloy_network::Ethereum;
2use alloy_primitives::{Address, B256, U256};
3use alloy_provider::Provider;
4use axum::Router;
5use eyre::{eyre, Result};
6use kabu_broadcast_accounts::{InitializeSignersOneShotBlockingActor, NonceAndBalanceMonitorActor, TxSignersActor};
7use kabu_broadcast_broadcaster::FlashbotsBroadcastActor;
8use kabu_broadcast_flashbots::client::RelayConfig;
9use kabu_broadcast_flashbots::Flashbots;
10use kabu_core_actors::{Actor, ActorsManager, SharedState};
11use kabu_core_block_history::BlockHistoryActor;
12use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
13use kabu_core_mempool::MempoolActor;
14use kabu_core_router::SwapRouterActor;
15use kabu_defi_address_book::TokenAddressEth;
16use kabu_defi_health_monitor::{MetricsRecorderActor, PoolHealthMonitorActor, StuffingTxMonitorActor};
17use kabu_defi_market::{
18    HistoryPoolLoaderOneShotActor, NewPoolLoaderActor, PoolLoaderActor, ProtocolPoolLoaderOneShotActor, RequiredPoolLoaderActor,
19};
20use kabu_defi_pools::{PoolLoadersBuilder, PoolsLoadingConfig};
21use kabu_defi_preloader::MarketStatePreloadedOneShotActor;
22use kabu_defi_price::PriceActor;
23use kabu_evm_db::{DatabaseKabuExt, KabuDBError};
24use kabu_evm_utils::NWETH;
25use kabu_execution_estimator::{EvmEstimatorActor, GethEstimatorActor};
26use kabu_execution_multicaller::MulticallerSwapEncoder;
27use kabu_metrics::InfluxDbWriterActor;
28use kabu_node_actor_config::NodeBlockActorConfig;
29use kabu_node_debug_provider::DebugProviderExt;
30use kabu_node_json_rpc::{NodeBlockActor, NodeMempoolActor, WaitForNodeSyncOneShotBlockingActor};
31use kabu_rpc_handler::WebServerActor;
32use kabu_storage_db::DbPool;
33use kabu_strategy_backrun::{
34    BackrunConfig, BlockStateChangeProcessorActor, PendingTxStateChangeProcessorActor, StateChangeArbSearcherActor,
35};
36use kabu_strategy_merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
37use kabu_types_blockchain::{KabuDataTypesEVM, KabuDataTypesEthereum};
38use kabu_types_entities::required_state::RequiredState;
39use kabu_types_entities::{BlockHistoryState, PoolClass, SwapEncoder, TxSigners};
40use revm::{Database, DatabaseCommit, DatabaseRef};
41use std::collections::HashMap;
42use std::marker::PhantomData;
43use std::sync::Arc;
44use tokio_util::sync::CancellationToken;
45
46pub struct BlockchainActors<
47    P,
48    N,
49    DB: Clone + Send + Sync + 'static,
50    E: Clone = MulticallerSwapEncoder,
51    LDT: KabuDataTypesEVM + 'static = KabuDataTypesEthereum,
52> {
53    provider: P,
54    bc: Blockchain<LDT>,
55    state: BlockchainState<DB, LDT>,
56    strategy: Strategy<DB, LDT>,
57    pub signers: SharedState<TxSigners<LDT>>,
58    actor_manager: ActorsManager,
59    encoder: Option<E>,
60    has_mempool: bool,
61    has_state_update: bool,
62    has_signers: bool,
63    mutlicaller_address: Option<Address>,
64    relays: Vec<RelayConfig>,
65    _n: PhantomData<N>,
66}
67
68impl<P, DB, E> BlockchainActors<P, Ethereum, DB, E, KabuDataTypesEthereum>
69where
70    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
71    DB: DatabaseRef<Error = KabuDBError>
72        + Database<Error = KabuDBError>
73        + DatabaseCommit
74        + DatabaseKabuExt
75        + BlockHistoryState<KabuDataTypesEthereum>
76        + Send
77        + Sync
78        + Clone
79        + Default
80        + 'static,
81    E: SwapEncoder + Send + Sync + Clone + 'static,
82{
83    pub fn new(
84        provider: P,
85        encoder: E,
86        bc: Blockchain<KabuDataTypesEthereum>,
87        state: BlockchainState<DB, KabuDataTypesEthereum>,
88        strategy: Strategy<DB, KabuDataTypesEthereum>,
89        relays: Vec<RelayConfig>,
90    ) -> Self {
91        Self {
92            provider,
93            bc,
94            state,
95            strategy,
96            signers: SharedState::new(TxSigners::new()),
97            actor_manager: ActorsManager::new(),
98            encoder: Some(encoder),
99            has_mempool: false,
100            has_state_update: false,
101            has_signers: false,
102            mutlicaller_address: None,
103            relays,
104            _n: PhantomData,
105        }
106    }
107
108    pub async fn wait(self) {
109        self.actor_manager.wait().await
110    }
111
112    /// Start a custom actor
113    pub fn start(&mut self, actor: impl Actor + 'static) -> Result<&mut Self> {
114        self.actor_manager.start(actor)?;
115        Ok(self)
116    }
117
118    /// Start a custom actor and wait for it to finish
119    pub fn start_and_wait(&mut self, actor: impl Actor + Send + Sync + 'static) -> Result<&mut Self> {
120        self.actor_manager.start_and_wait(actor)?;
121        Ok(self)
122    }
123
124    /// Initialize signers with the default anvil Private Key
125    pub fn initialize_signers_with_anvil(&mut self) -> Result<&mut Self> {
126        let key: B256 = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse()?;
127
128        self.actor_manager.start_and_wait(
129            InitializeSignersOneShotBlockingActor::new(Some(key.to_vec())).with_signers(self.signers.clone()).on_bc(&self.bc),
130        )?;
131        self.with_signers()?;
132        Ok(self)
133    }
134
135    /// Initialize signers with the private key. Random key generated if param in None
136    pub fn initialize_signers_with_key(&mut self, key: Option<Vec<u8>>) -> Result<&mut Self> {
137        self.actor_manager
138            .start_and_wait(InitializeSignersOneShotBlockingActor::new(key).with_signers(self.signers.clone()).on_bc(&self.bc))?;
139        self.with_signers()?;
140        Ok(self)
141    }
142
143    /// Initialize signers with multiple private keys
144    pub fn initialize_signers_with_keys(&mut self, keys: Vec<Vec<u8>>) -> Result<&mut Self> {
145        for key in keys {
146            self.actor_manager
147                .start_and_wait(InitializeSignersOneShotBlockingActor::new(Some(key)).with_signers(self.signers.clone()).on_bc(&self.bc))?;
148        }
149        self.with_signers()?;
150        Ok(self)
151    }
152
153    /// Initialize signers with encrypted private key
154    pub fn initialize_signers_with_encrypted_key(&mut self, key: Vec<u8>) -> Result<&mut Self> {
155        self.actor_manager.start_and_wait(
156            InitializeSignersOneShotBlockingActor::new_from_encrypted_key(key).with_signers(self.signers.clone()).on_bc(&self.bc),
157        )?;
158        self.with_signers()?;
159        Ok(self)
160    }
161
162    /// Initializes signers with encrypted key form DATA env var
163    pub fn initialize_signers_with_env(&mut self) -> Result<&mut Self> {
164        self.actor_manager.start_and_wait(
165            InitializeSignersOneShotBlockingActor::new_from_encrypted_env().with_signers(self.signers.clone()).on_bc(&self.bc),
166        )?;
167        self.with_signers()?;
168        Ok(self)
169    }
170
171    /// Starts signer actor
172    pub fn with_signers(&mut self) -> Result<&mut Self> {
173        if !self.has_signers {
174            self.has_signers = true;
175            self.actor_manager.start(TxSignersActor::new().on_bc(&self.bc))?;
176        }
177        Ok(self)
178    }
179
180    /// Initializes encoder and start encoder actor
181    pub fn with_swap_encoder(&mut self, swap_encoder: E) -> Result<&mut Self> {
182        self.mutlicaller_address = Some(swap_encoder.address());
183        self.encoder = Some(swap_encoder);
184        self.actor_manager.start(
185            SwapRouterActor::<DB, KabuDataTypesEthereum>::new().with_signers(self.signers.clone()).on_bc(&self.bc, &self.strategy),
186        )?;
187        Ok(self)
188    }
189
190    /// Starts market state preloader
191    pub fn with_market_state_preloader(&mut self) -> Result<&mut Self> {
192        let mut address_vec = self.signers.inner().try_read()?.get_address_vec();
193
194        if let Some(kabu_multicaller) = self.mutlicaller_address {
195            address_vec.push(kabu_multicaller);
196        }
197
198        self.actor_manager.start_and_wait(
199            MarketStatePreloadedOneShotActor::new(self.provider.clone()).with_copied_accounts(address_vec).on_bc(&self.bc, &self.state),
200        )?;
201        Ok(self)
202    }
203
204    /// Starts preloaded virtual artefacts
205    pub fn with_market_state_preloader_virtual(&mut self, address_to_copy: Vec<Address>) -> Result<&mut Self> {
206        let address_vec = self.signers.inner().try_read()?.get_address_vec();
207
208        let mut market_state_preloader = MarketStatePreloadedOneShotActor::new(self.provider.clone());
209
210        for address in address_vec {
211            //            market_state_preloader = market_state_preloader.with_new_account(address, 0, NWETH::from_float(10.0), None);
212            market_state_preloader = market_state_preloader.with_copied_account(address).with_token_balance(
213                TokenAddressEth::ETH_NATIVE,
214                address,
215                NWETH::from_float(10.0),
216            );
217        }
218
219        market_state_preloader = market_state_preloader.with_copied_accounts(address_to_copy);
220
221        market_state_preloader = market_state_preloader.with_new_account(
222            kabu_execution_multicaller::DEFAULT_VIRTUAL_ADDRESS,
223            0,
224            U256::ZERO,
225            kabu_execution_multicaller::MulticallerDeployer::new().account_info().code,
226        );
227
228        market_state_preloader = market_state_preloader.with_token_balance(
229            TokenAddressEth::WETH,
230            kabu_execution_multicaller::DEFAULT_VIRTUAL_ADDRESS,
231            NWETH::from_float(10.0),
232        );
233
234        self.mutlicaller_address = Some(kabu_execution_multicaller::DEFAULT_VIRTUAL_ADDRESS);
235
236        self.actor_manager.start_and_wait(market_state_preloader.on_bc(&self.bc, &self.state))?;
237        Ok(self)
238    }
239
240    /// Starts nonce and balance monitor
241    pub fn with_nonce_and_balance_monitor(&mut self) -> Result<&mut Self> {
242        self.actor_manager.start(NonceAndBalanceMonitorActor::new(self.provider.clone()).on_bc(&self.bc))?;
243        Ok(self)
244    }
245
246    pub fn with_nonce_and_balance_monitor_only_once(&mut self) -> Result<&mut Self> {
247        self.actor_manager.start(NonceAndBalanceMonitorActor::new(self.provider.clone()).only_once().on_bc(&self.bc))?;
248        Ok(self)
249    }
250
251    /// Starts block history actor
252    pub fn with_block_history(&mut self) -> Result<&mut Self> {
253        self.actor_manager.start(BlockHistoryActor::new(self.provider.clone()).on_bc(&self.bc, &self.state))?;
254        Ok(self)
255    }
256
257    /// Starts token price calculator
258    pub fn with_price_station(&mut self) -> Result<&mut Self> {
259        self.actor_manager.start(PriceActor::new(self.provider.clone()).on_bc(&self.bc))?;
260        Ok(self)
261    }
262
263    /// Starts receiving blocks events through RPC
264    pub fn with_block_events(&mut self, config: NodeBlockActorConfig) -> Result<&mut Self> {
265        self.actor_manager.start(NodeBlockActor::new(self.provider.clone(), config).on_bc(&self.bc))?;
266        Ok(self)
267    }
268
269    /// Starts mempool actor collecting pending txes from all mempools and pulling new tx hashes in mempool_events channel
270    pub fn mempool(&mut self) -> Result<&mut Self> {
271        if !self.has_mempool {
272            self.has_mempool = true;
273            self.actor_manager.start(MempoolActor::new().on_bc(&self.bc))?;
274        }
275        Ok(self)
276    }
277
278    /// Starts local node pending tx provider
279    pub fn with_local_mempool_events(&mut self) -> Result<&mut Self> {
280        self.mempool()?;
281        self.actor_manager.start(NodeMempoolActor::new(self.provider.clone()).on_bc(&self.bc))?;
282        Ok(self)
283    }
284
285    /// Starts remote node pending tx provider
286    pub fn with_remote_mempool<PM>(&mut self, provider: PM) -> Result<&mut Self>
287    where
288        PM: Provider<Ethereum> + Send + Sync + Clone + 'static,
289    {
290        self.mempool()?;
291        self.actor_manager.start(NodeMempoolActor::new(provider).on_bc(&self.bc))?;
292        Ok(self)
293    }
294
295    /// Starts flashbots broadcaster
296    pub fn with_flashbots_broadcaster(&mut self, allow_broadcast: bool) -> Result<&mut Self> {
297        let flashbots = match self.relays.is_empty() {
298            true => Flashbots::new(self.provider.clone(), "https://relay.flashbots.net", None).with_default_relays(),
299            false => Flashbots::new(self.provider.clone(), "https://relay.flashbots.net", None).with_relays(self.relays.clone()),
300        };
301
302        self.actor_manager.start(FlashbotsBroadcastActor::new(flashbots, allow_broadcast).on_bc(&self.bc))?;
303        Ok(self)
304    }
305
306    /// Start composer : estimator, signer and broadcaster
307    pub fn with_composers(&mut self, allow_broadcast: bool) -> Result<&mut Self> {
308        self.with_evm_estimator()?.with_signers()?.with_flashbots_broadcaster(allow_broadcast)
309    }
310
311    /// Starts pool health monitor
312    pub fn with_health_monitor_pools(&mut self) -> Result<&mut Self> {
313        self.actor_manager.start(PoolHealthMonitorActor::new().on_bc(&self.bc))?;
314        Ok(self)
315    }
316
317    //TODO : Move out of Blockchain
318    /*
319    /// Starts state health monitor
320    pub fn with_health_monitor_state(&mut self) -> Result<&mut Self> {
321        self.actor_manager.start(StateHealthMonitorActor::new(self.provider.clone()).on_bc(&self.bc))?;
322        Ok(self)
323    }
324
325     */
326
327    /// Starts stuffing tx monitor
328    pub fn with_health_monitor_stuffing_tx(&mut self) -> Result<&mut Self> {
329        self.actor_manager.start(StuffingTxMonitorActor::new(self.provider.clone()).on_bc(&self.bc))?;
330        Ok(self)
331    }
332
333    /// Start pool loader from new block events
334    pub fn with_new_pool_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
335        let pool_loader =
336            Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(self.provider.clone(), pools_config));
337        self.actor_manager.start(NewPoolLoaderActor::new(pool_loader).on_bc(&self.bc))?;
338        Ok(self)
339    }
340
341    /// Start pool loader for last 10000 blocks
342    pub fn with_pool_history_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
343        let pool_loaders =
344            Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(self.provider.clone(), pools_config));
345        self.actor_manager.start(HistoryPoolLoaderOneShotActor::new(self.provider.clone(), pool_loaders).on_bc(&self.bc))?;
346        Ok(self)
347    }
348
349    /// Start pool loader from new block events
350    pub fn with_pool_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
351        let pool_loaders = Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(
352            self.provider.clone(),
353            pools_config.clone(),
354        ));
355        self.actor_manager.start(PoolLoaderActor::new(self.provider.clone(), pool_loaders, pools_config).on_bc(&self.bc, &self.state))?;
356        Ok(self)
357    }
358
359    /// Start pool loader for curve + steth + wsteth
360    pub fn with_curve_pool_protocol_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
361        let pool_loaders =
362            Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(self.provider.clone(), pools_config));
363        self.actor_manager.start(ProtocolPoolLoaderOneShotActor::new(self.provider.clone(), pool_loaders).on_bc(&self.bc))?;
364        Ok(self)
365    }
366
367    /// Start all pool loaders
368    pub fn with_pool_loaders(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
369        if pools_config.is_enabled(PoolClass::Curve) {
370            self.with_new_pool_loader(pools_config.clone())?
371                .with_pool_history_loader(pools_config.clone())?
372                .with_curve_pool_protocol_loader(pools_config.clone())?
373                .with_pool_loader(pools_config)
374        } else {
375            self.with_new_pool_loader(pools_config.clone())?.with_pool_history_loader(pools_config.clone())?.with_pool_loader(pools_config)
376        }
377    }
378
379    //
380    pub fn with_preloaded_state(&mut self, pools: Vec<(Address, PoolClass)>, state_required: Option<RequiredState>) -> Result<&mut Self> {
381        let pool_loaders = Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(
382            self.provider.clone(),
383            PoolsLoadingConfig::default(),
384        ));
385        let mut actor = RequiredPoolLoaderActor::new(self.provider.clone(), pool_loaders);
386
387        for (pool_address, pool_class) in pools {
388            actor = actor.with_pool_address(pool_address, pool_class);
389        }
390
391        if let Some(state_required) = state_required {
392            actor = actor.with_required_state(state_required);
393        }
394
395        self.actor_manager.start_and_wait(actor.on_bc(&self.bc, &self.state))?;
396        Ok(self)
397    }
398
399    pub fn with_geth_estimator(&mut self) -> Result<&mut Self> {
400        let flashbots = Flashbots::new(self.provider.clone(), "https://relay.flashbots.net", None).with_default_relays();
401
402        self.actor_manager.start(GethEstimatorActor::new(Arc::new(flashbots), self.encoder.clone().unwrap()).on_bc(&self.strategy))?;
403        Ok(self)
404    }
405
406    /// Starts EVM gas estimator and tips filler
407    pub fn with_evm_estimator(&mut self) -> Result<&mut Self> {
408        self.actor_manager
409            .start(EvmEstimatorActor::<P, Ethereum, E, DB>::new(self.encoder.clone().unwrap()).on_bc(&self.bc, &self.strategy))?;
410        Ok(self)
411    }
412
413    /// Starts EVM gas estimator and tips filler
414    pub fn with_evm_estimator_and_provider(&mut self) -> Result<&mut Self> {
415        self.actor_manager.start(
416            EvmEstimatorActor::new_with_provider(self.encoder.clone().unwrap(), Some(self.provider.clone()))
417                .on_bc(&self.bc, &self.strategy),
418        )?;
419        Ok(self)
420    }
421
422    /// Start swap path merger
423    pub fn with_swap_path_merger(&mut self) -> Result<&mut Self> {
424        let mutlicaller_address = self.encoder.clone().ok_or(eyre!("NO_ENCODER"))?.address();
425
426        self.actor_manager.start(ArbSwapPathMergerActor::new(mutlicaller_address).on_bc(&self.bc, &self.strategy))?;
427        Ok(self)
428    }
429
430    /// Start same path merger
431    pub fn with_same_path_merger(&mut self) -> Result<&mut Self> {
432        self.actor_manager.start(SamePathMergerActor::new(self.provider.clone()).on_bc(&self.bc, &self.state, &self.strategy))?;
433        Ok(self)
434    }
435
436    /// Start diff path merger
437    pub fn with_diff_path_merger(&mut self) -> Result<&mut Self> {
438        self.actor_manager.start(DiffPathMergerActor::<DB>::new().on_bc(&self.bc))?;
439        Ok(self)
440    }
441
442    /// Start all mergers
443    pub fn with_mergers(&mut self) -> Result<&mut Self> {
444        self.with_swap_path_merger()?.with_same_path_merger()?.with_diff_path_merger()
445    }
446
447    /// Start backrun on block
448    pub fn with_backrun_block(&mut self, backrun_config: BackrunConfig) -> Result<&mut Self> {
449        if !self.has_state_update {
450            self.actor_manager.start(StateChangeArbSearcherActor::new(backrun_config).on_bc(&self.bc, &self.strategy))?;
451            self.has_state_update = true
452        }
453        self.actor_manager.start(BlockStateChangeProcessorActor::new().on_bc(&self.bc, &self.state, &self.strategy))?;
454        Ok(self)
455    }
456
457    /// Start backrun for pending txs
458    pub fn with_backrun_mempool(&mut self, backrun_config: BackrunConfig) -> Result<&mut Self> {
459        if !self.has_state_update {
460            self.actor_manager.start(StateChangeArbSearcherActor::new(backrun_config).on_bc(&self.bc, &self.strategy))?;
461            self.has_state_update = true
462        }
463        self.actor_manager.start(PendingTxStateChangeProcessorActor::new(self.provider.clone()).on_bc(
464            &self.bc,
465            &self.state,
466            &self.strategy,
467        ))?;
468        Ok(self)
469    }
470
471    /// Start backrun for blocks and pending txs
472    pub async fn with_backrun(&mut self, backrun_config: BackrunConfig) -> Result<&mut Self> {
473        self.with_backrun_block(backrun_config.clone())?.with_backrun_mempool(backrun_config)
474    }
475
476    /// Start influxdb writer
477    pub fn with_influxdb_writer(&mut self, url: String, database: String, tags: HashMap<String, String>) -> Result<&mut Self> {
478        self.actor_manager.start(InfluxDbWriterActor::new(url, database, tags).on_bc(&self.bc))?;
479        Ok(self)
480    }
481
482    /// Start block latency recorder
483    pub fn with_block_latency_recorder(&mut self) -> Result<&mut Self> {
484        self.actor_manager.start(MetricsRecorderActor::new().on_bc(&self.bc, &self.state))?;
485        Ok(self)
486    }
487
488    /// Start web server
489    pub fn with_web_server<S>(&mut self, host: String, router: Router<S>, db_pool: DbPool) -> Result<&mut Self>
490    where
491        S: Clone + Send + Sync + 'static,
492        Router: From<Router<S>>,
493    {
494        self.actor_manager
495            .start(WebServerActor::<S, DB>::new(host, router, db_pool, CancellationToken::new()).on_bc(&self.bc, &self.state))?;
496        Ok(self)
497    }
498
499    /// Wait for node sync
500    pub fn with_wait_for_node_sync(&mut self) -> Result<&mut Self> {
501        self.actor_manager.start_and_wait(WaitForNodeSyncOneShotBlockingActor::new(self.provider.clone()))?;
502        Ok(self)
503    }
504}