1use alloy_network::Ethereum;
2use alloy_primitives::{Address, B256, U256};
3use alloy_provider::Provider;
4use axum::Router;
5use eyre::{eyre, Result};
6use kabu_broadcast_accounts::{InitializeSignersOneShotBlockingActor, NonceAndBalanceMonitorActor, TxSignersActor};
7use kabu_broadcast_broadcaster::FlashbotsBroadcastActor;
8use kabu_broadcast_flashbots::client::RelayConfig;
9use kabu_broadcast_flashbots::Flashbots;
10use kabu_core_actors::{Actor, ActorsManager, SharedState};
11use kabu_core_block_history::BlockHistoryActor;
12use kabu_core_blockchain::{Blockchain, BlockchainState, Strategy};
13use kabu_core_mempool::MempoolActor;
14use kabu_core_router::SwapRouterActor;
15use kabu_defi_address_book::TokenAddressEth;
16use kabu_defi_health_monitor::{MetricsRecorderActor, PoolHealthMonitorActor, StuffingTxMonitorActor};
17use kabu_defi_market::{
18 HistoryPoolLoaderOneShotActor, NewPoolLoaderActor, PoolLoaderActor, ProtocolPoolLoaderOneShotActor, RequiredPoolLoaderActor,
19};
20use kabu_defi_pools::{PoolLoadersBuilder, PoolsLoadingConfig};
21use kabu_defi_preloader::MarketStatePreloadedOneShotActor;
22use kabu_defi_price::PriceActor;
23use kabu_evm_db::{DatabaseKabuExt, KabuDBError};
24use kabu_evm_utils::NWETH;
25use kabu_execution_estimator::{EvmEstimatorActor, GethEstimatorActor};
26use kabu_execution_multicaller::MulticallerSwapEncoder;
27use kabu_metrics::InfluxDbWriterActor;
28use kabu_node_actor_config::NodeBlockActorConfig;
29use kabu_node_debug_provider::DebugProviderExt;
30use kabu_node_json_rpc::{NodeBlockActor, NodeMempoolActor, WaitForNodeSyncOneShotBlockingActor};
31use kabu_rpc_handler::WebServerActor;
32use kabu_storage_db::DbPool;
33use kabu_strategy_backrun::{
34 BackrunConfig, BlockStateChangeProcessorActor, PendingTxStateChangeProcessorActor, StateChangeArbSearcherActor,
35};
36use kabu_strategy_merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
37use kabu_types_blockchain::{KabuDataTypesEVM, KabuDataTypesEthereum};
38use kabu_types_entities::required_state::RequiredState;
39use kabu_types_entities::{BlockHistoryState, PoolClass, SwapEncoder, TxSigners};
40use revm::{Database, DatabaseCommit, DatabaseRef};
41use std::collections::HashMap;
42use std::marker::PhantomData;
43use std::sync::Arc;
44use tokio_util::sync::CancellationToken;
45
46pub struct BlockchainActors<
47 P,
48 N,
49 DB: Clone + Send + Sync + 'static,
50 E: Clone = MulticallerSwapEncoder,
51 LDT: KabuDataTypesEVM + 'static = KabuDataTypesEthereum,
52> {
53 provider: P,
54 bc: Blockchain<LDT>,
55 state: BlockchainState<DB, LDT>,
56 strategy: Strategy<DB, LDT>,
57 pub signers: SharedState<TxSigners<LDT>>,
58 actor_manager: ActorsManager,
59 encoder: Option<E>,
60 has_mempool: bool,
61 has_state_update: bool,
62 has_signers: bool,
63 mutlicaller_address: Option<Address>,
64 relays: Vec<RelayConfig>,
65 _n: PhantomData<N>,
66}
67
68impl<P, DB, E> BlockchainActors<P, Ethereum, DB, E, KabuDataTypesEthereum>
69where
70 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
71 DB: DatabaseRef<Error = KabuDBError>
72 + Database<Error = KabuDBError>
73 + DatabaseCommit
74 + DatabaseKabuExt
75 + BlockHistoryState<KabuDataTypesEthereum>
76 + Send
77 + Sync
78 + Clone
79 + Default
80 + 'static,
81 E: SwapEncoder + Send + Sync + Clone + 'static,
82{
83 pub fn new(
84 provider: P,
85 encoder: E,
86 bc: Blockchain<KabuDataTypesEthereum>,
87 state: BlockchainState<DB, KabuDataTypesEthereum>,
88 strategy: Strategy<DB, KabuDataTypesEthereum>,
89 relays: Vec<RelayConfig>,
90 ) -> Self {
91 Self {
92 provider,
93 bc,
94 state,
95 strategy,
96 signers: SharedState::new(TxSigners::new()),
97 actor_manager: ActorsManager::new(),
98 encoder: Some(encoder),
99 has_mempool: false,
100 has_state_update: false,
101 has_signers: false,
102 mutlicaller_address: None,
103 relays,
104 _n: PhantomData,
105 }
106 }
107
108 pub async fn wait(self) {
109 self.actor_manager.wait().await
110 }
111
112 pub fn start(&mut self, actor: impl Actor + 'static) -> Result<&mut Self> {
114 self.actor_manager.start(actor)?;
115 Ok(self)
116 }
117
118 pub fn start_and_wait(&mut self, actor: impl Actor + Send + Sync + 'static) -> Result<&mut Self> {
120 self.actor_manager.start_and_wait(actor)?;
121 Ok(self)
122 }
123
124 pub fn initialize_signers_with_anvil(&mut self) -> Result<&mut Self> {
126 let key: B256 = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse()?;
127
128 self.actor_manager.start_and_wait(
129 InitializeSignersOneShotBlockingActor::new(Some(key.to_vec())).with_signers(self.signers.clone()).on_bc(&self.bc),
130 )?;
131 self.with_signers()?;
132 Ok(self)
133 }
134
135 pub fn initialize_signers_with_key(&mut self, key: Option<Vec<u8>>) -> Result<&mut Self> {
137 self.actor_manager
138 .start_and_wait(InitializeSignersOneShotBlockingActor::new(key).with_signers(self.signers.clone()).on_bc(&self.bc))?;
139 self.with_signers()?;
140 Ok(self)
141 }
142
143 pub fn initialize_signers_with_keys(&mut self, keys: Vec<Vec<u8>>) -> Result<&mut Self> {
145 for key in keys {
146 self.actor_manager
147 .start_and_wait(InitializeSignersOneShotBlockingActor::new(Some(key)).with_signers(self.signers.clone()).on_bc(&self.bc))?;
148 }
149 self.with_signers()?;
150 Ok(self)
151 }
152
153 pub fn initialize_signers_with_encrypted_key(&mut self, key: Vec<u8>) -> Result<&mut Self> {
155 self.actor_manager.start_and_wait(
156 InitializeSignersOneShotBlockingActor::new_from_encrypted_key(key).with_signers(self.signers.clone()).on_bc(&self.bc),
157 )?;
158 self.with_signers()?;
159 Ok(self)
160 }
161
162 pub fn initialize_signers_with_env(&mut self) -> Result<&mut Self> {
164 self.actor_manager.start_and_wait(
165 InitializeSignersOneShotBlockingActor::new_from_encrypted_env().with_signers(self.signers.clone()).on_bc(&self.bc),
166 )?;
167 self.with_signers()?;
168 Ok(self)
169 }
170
171 pub fn with_signers(&mut self) -> Result<&mut Self> {
173 if !self.has_signers {
174 self.has_signers = true;
175 self.actor_manager.start(TxSignersActor::new().on_bc(&self.bc))?;
176 }
177 Ok(self)
178 }
179
180 pub fn with_swap_encoder(&mut self, swap_encoder: E) -> Result<&mut Self> {
182 self.mutlicaller_address = Some(swap_encoder.address());
183 self.encoder = Some(swap_encoder);
184 self.actor_manager.start(
185 SwapRouterActor::<DB, KabuDataTypesEthereum>::new().with_signers(self.signers.clone()).on_bc(&self.bc, &self.strategy),
186 )?;
187 Ok(self)
188 }
189
190 pub fn with_market_state_preloader(&mut self) -> Result<&mut Self> {
192 let mut address_vec = self.signers.inner().try_read()?.get_address_vec();
193
194 if let Some(kabu_multicaller) = self.mutlicaller_address {
195 address_vec.push(kabu_multicaller);
196 }
197
198 self.actor_manager.start_and_wait(
199 MarketStatePreloadedOneShotActor::new(self.provider.clone()).with_copied_accounts(address_vec).on_bc(&self.bc, &self.state),
200 )?;
201 Ok(self)
202 }
203
204 pub fn with_market_state_preloader_virtual(&mut self, address_to_copy: Vec<Address>) -> Result<&mut Self> {
206 let address_vec = self.signers.inner().try_read()?.get_address_vec();
207
208 let mut market_state_preloader = MarketStatePreloadedOneShotActor::new(self.provider.clone());
209
210 for address in address_vec {
211 market_state_preloader = market_state_preloader.with_copied_account(address).with_token_balance(
213 TokenAddressEth::ETH_NATIVE,
214 address,
215 NWETH::from_float(10.0),
216 );
217 }
218
219 market_state_preloader = market_state_preloader.with_copied_accounts(address_to_copy);
220
221 market_state_preloader = market_state_preloader.with_new_account(
222 kabu_execution_multicaller::DEFAULT_VIRTUAL_ADDRESS,
223 0,
224 U256::ZERO,
225 kabu_execution_multicaller::MulticallerDeployer::new().account_info().code,
226 );
227
228 market_state_preloader = market_state_preloader.with_token_balance(
229 TokenAddressEth::WETH,
230 kabu_execution_multicaller::DEFAULT_VIRTUAL_ADDRESS,
231 NWETH::from_float(10.0),
232 );
233
234 self.mutlicaller_address = Some(kabu_execution_multicaller::DEFAULT_VIRTUAL_ADDRESS);
235
236 self.actor_manager.start_and_wait(market_state_preloader.on_bc(&self.bc, &self.state))?;
237 Ok(self)
238 }
239
240 pub fn with_nonce_and_balance_monitor(&mut self) -> Result<&mut Self> {
242 self.actor_manager.start(NonceAndBalanceMonitorActor::new(self.provider.clone()).on_bc(&self.bc))?;
243 Ok(self)
244 }
245
246 pub fn with_nonce_and_balance_monitor_only_once(&mut self) -> Result<&mut Self> {
247 self.actor_manager.start(NonceAndBalanceMonitorActor::new(self.provider.clone()).only_once().on_bc(&self.bc))?;
248 Ok(self)
249 }
250
251 pub fn with_block_history(&mut self) -> Result<&mut Self> {
253 self.actor_manager.start(BlockHistoryActor::new(self.provider.clone()).on_bc(&self.bc, &self.state))?;
254 Ok(self)
255 }
256
257 pub fn with_price_station(&mut self) -> Result<&mut Self> {
259 self.actor_manager.start(PriceActor::new(self.provider.clone()).on_bc(&self.bc))?;
260 Ok(self)
261 }
262
263 pub fn with_block_events(&mut self, config: NodeBlockActorConfig) -> Result<&mut Self> {
265 self.actor_manager.start(NodeBlockActor::new(self.provider.clone(), config).on_bc(&self.bc))?;
266 Ok(self)
267 }
268
269 pub fn mempool(&mut self) -> Result<&mut Self> {
271 if !self.has_mempool {
272 self.has_mempool = true;
273 self.actor_manager.start(MempoolActor::new().on_bc(&self.bc))?;
274 }
275 Ok(self)
276 }
277
278 pub fn with_local_mempool_events(&mut self) -> Result<&mut Self> {
280 self.mempool()?;
281 self.actor_manager.start(NodeMempoolActor::new(self.provider.clone()).on_bc(&self.bc))?;
282 Ok(self)
283 }
284
285 pub fn with_remote_mempool<PM>(&mut self, provider: PM) -> Result<&mut Self>
287 where
288 PM: Provider<Ethereum> + Send + Sync + Clone + 'static,
289 {
290 self.mempool()?;
291 self.actor_manager.start(NodeMempoolActor::new(provider).on_bc(&self.bc))?;
292 Ok(self)
293 }
294
295 pub fn with_flashbots_broadcaster(&mut self, allow_broadcast: bool) -> Result<&mut Self> {
297 let flashbots = match self.relays.is_empty() {
298 true => Flashbots::new(self.provider.clone(), "https://relay.flashbots.net", None).with_default_relays(),
299 false => Flashbots::new(self.provider.clone(), "https://relay.flashbots.net", None).with_relays(self.relays.clone()),
300 };
301
302 self.actor_manager.start(FlashbotsBroadcastActor::new(flashbots, allow_broadcast).on_bc(&self.bc))?;
303 Ok(self)
304 }
305
306 pub fn with_composers(&mut self, allow_broadcast: bool) -> Result<&mut Self> {
308 self.with_evm_estimator()?.with_signers()?.with_flashbots_broadcaster(allow_broadcast)
309 }
310
311 pub fn with_health_monitor_pools(&mut self) -> Result<&mut Self> {
313 self.actor_manager.start(PoolHealthMonitorActor::new().on_bc(&self.bc))?;
314 Ok(self)
315 }
316
317 pub fn with_health_monitor_stuffing_tx(&mut self) -> Result<&mut Self> {
329 self.actor_manager.start(StuffingTxMonitorActor::new(self.provider.clone()).on_bc(&self.bc))?;
330 Ok(self)
331 }
332
333 pub fn with_new_pool_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
335 let pool_loader =
336 Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(self.provider.clone(), pools_config));
337 self.actor_manager.start(NewPoolLoaderActor::new(pool_loader).on_bc(&self.bc))?;
338 Ok(self)
339 }
340
341 pub fn with_pool_history_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
343 let pool_loaders =
344 Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(self.provider.clone(), pools_config));
345 self.actor_manager.start(HistoryPoolLoaderOneShotActor::new(self.provider.clone(), pool_loaders).on_bc(&self.bc))?;
346 Ok(self)
347 }
348
349 pub fn with_pool_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
351 let pool_loaders = Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(
352 self.provider.clone(),
353 pools_config.clone(),
354 ));
355 self.actor_manager.start(PoolLoaderActor::new(self.provider.clone(), pool_loaders, pools_config).on_bc(&self.bc, &self.state))?;
356 Ok(self)
357 }
358
359 pub fn with_curve_pool_protocol_loader(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
361 let pool_loaders =
362 Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(self.provider.clone(), pools_config));
363 self.actor_manager.start(ProtocolPoolLoaderOneShotActor::new(self.provider.clone(), pool_loaders).on_bc(&self.bc))?;
364 Ok(self)
365 }
366
367 pub fn with_pool_loaders(&mut self, pools_config: PoolsLoadingConfig) -> Result<&mut Self> {
369 if pools_config.is_enabled(PoolClass::Curve) {
370 self.with_new_pool_loader(pools_config.clone())?
371 .with_pool_history_loader(pools_config.clone())?
372 .with_curve_pool_protocol_loader(pools_config.clone())?
373 .with_pool_loader(pools_config)
374 } else {
375 self.with_new_pool_loader(pools_config.clone())?.with_pool_history_loader(pools_config.clone())?.with_pool_loader(pools_config)
376 }
377 }
378
379 pub fn with_preloaded_state(&mut self, pools: Vec<(Address, PoolClass)>, state_required: Option<RequiredState>) -> Result<&mut Self> {
381 let pool_loaders = Arc::new(PoolLoadersBuilder::<P, Ethereum, KabuDataTypesEthereum>::default_pool_loaders(
382 self.provider.clone(),
383 PoolsLoadingConfig::default(),
384 ));
385 let mut actor = RequiredPoolLoaderActor::new(self.provider.clone(), pool_loaders);
386
387 for (pool_address, pool_class) in pools {
388 actor = actor.with_pool_address(pool_address, pool_class);
389 }
390
391 if let Some(state_required) = state_required {
392 actor = actor.with_required_state(state_required);
393 }
394
395 self.actor_manager.start_and_wait(actor.on_bc(&self.bc, &self.state))?;
396 Ok(self)
397 }
398
399 pub fn with_geth_estimator(&mut self) -> Result<&mut Self> {
400 let flashbots = Flashbots::new(self.provider.clone(), "https://relay.flashbots.net", None).with_default_relays();
401
402 self.actor_manager.start(GethEstimatorActor::new(Arc::new(flashbots), self.encoder.clone().unwrap()).on_bc(&self.strategy))?;
403 Ok(self)
404 }
405
406 pub fn with_evm_estimator(&mut self) -> Result<&mut Self> {
408 self.actor_manager
409 .start(EvmEstimatorActor::<P, Ethereum, E, DB>::new(self.encoder.clone().unwrap()).on_bc(&self.bc, &self.strategy))?;
410 Ok(self)
411 }
412
413 pub fn with_evm_estimator_and_provider(&mut self) -> Result<&mut Self> {
415 self.actor_manager.start(
416 EvmEstimatorActor::new_with_provider(self.encoder.clone().unwrap(), Some(self.provider.clone()))
417 .on_bc(&self.bc, &self.strategy),
418 )?;
419 Ok(self)
420 }
421
422 pub fn with_swap_path_merger(&mut self) -> Result<&mut Self> {
424 let mutlicaller_address = self.encoder.clone().ok_or(eyre!("NO_ENCODER"))?.address();
425
426 self.actor_manager.start(ArbSwapPathMergerActor::new(mutlicaller_address).on_bc(&self.bc, &self.strategy))?;
427 Ok(self)
428 }
429
430 pub fn with_same_path_merger(&mut self) -> Result<&mut Self> {
432 self.actor_manager.start(SamePathMergerActor::new(self.provider.clone()).on_bc(&self.bc, &self.state, &self.strategy))?;
433 Ok(self)
434 }
435
436 pub fn with_diff_path_merger(&mut self) -> Result<&mut Self> {
438 self.actor_manager.start(DiffPathMergerActor::<DB>::new().on_bc(&self.bc))?;
439 Ok(self)
440 }
441
442 pub fn with_mergers(&mut self) -> Result<&mut Self> {
444 self.with_swap_path_merger()?.with_same_path_merger()?.with_diff_path_merger()
445 }
446
447 pub fn with_backrun_block(&mut self, backrun_config: BackrunConfig) -> Result<&mut Self> {
449 if !self.has_state_update {
450 self.actor_manager.start(StateChangeArbSearcherActor::new(backrun_config).on_bc(&self.bc, &self.strategy))?;
451 self.has_state_update = true
452 }
453 self.actor_manager.start(BlockStateChangeProcessorActor::new().on_bc(&self.bc, &self.state, &self.strategy))?;
454 Ok(self)
455 }
456
457 pub fn with_backrun_mempool(&mut self, backrun_config: BackrunConfig) -> Result<&mut Self> {
459 if !self.has_state_update {
460 self.actor_manager.start(StateChangeArbSearcherActor::new(backrun_config).on_bc(&self.bc, &self.strategy))?;
461 self.has_state_update = true
462 }
463 self.actor_manager.start(PendingTxStateChangeProcessorActor::new(self.provider.clone()).on_bc(
464 &self.bc,
465 &self.state,
466 &self.strategy,
467 ))?;
468 Ok(self)
469 }
470
471 pub async fn with_backrun(&mut self, backrun_config: BackrunConfig) -> Result<&mut Self> {
473 self.with_backrun_block(backrun_config.clone())?.with_backrun_mempool(backrun_config)
474 }
475
476 pub fn with_influxdb_writer(&mut self, url: String, database: String, tags: HashMap<String, String>) -> Result<&mut Self> {
478 self.actor_manager.start(InfluxDbWriterActor::new(url, database, tags).on_bc(&self.bc))?;
479 Ok(self)
480 }
481
482 pub fn with_block_latency_recorder(&mut self) -> Result<&mut Self> {
484 self.actor_manager.start(MetricsRecorderActor::new().on_bc(&self.bc, &self.state))?;
485 Ok(self)
486 }
487
488 pub fn with_web_server<S>(&mut self, host: String, router: Router<S>, db_pool: DbPool) -> Result<&mut Self>
490 where
491 S: Clone + Send + Sync + 'static,
492 Router: From<Router<S>>,
493 {
494 self.actor_manager
495 .start(WebServerActor::<S, DB>::new(host, router, db_pool, CancellationToken::new()).on_bc(&self.bc, &self.state))?;
496 Ok(self)
497 }
498
499 pub fn with_wait_for_node_sync(&mut self) -> Result<&mut Self> {
501 self.actor_manager.start_and_wait(WaitForNodeSyncOneShotBlockingActor::new(self.provider.clone()))?;
502 Ok(self)
503 }
504}