kabu_exex/
kabu_runtime.rs1use alloy::network::Ethereum;
2use alloy::primitives::Address;
3use alloy::providers::Provider;
4use axum::Router;
5use eyre::OptionExt;
6use kabu::core::blockchain::{Blockchain, BlockchainState, Strategy};
7use kabu::core::blockchain_actors::BlockchainActors;
8use kabu::core::topology::{BroadcasterConfig, EncoderConfig, TopologyConfig};
9use kabu::defi::pools::PoolsLoadingConfig;
10use kabu::evm::db::{DatabaseKabuExt, KabuDBError};
11use kabu::execution::multicaller::MulticallerSwapEncoder;
12use kabu::node::actor_config::NodeBlockActorConfig;
13use kabu::node::debug_provider::DebugProviderExt;
14use kabu::node::exex::kabu_exex;
15use kabu::storage::db::init_db_pool;
16use kabu::strategy::backrun::{BackrunConfig, BackrunConfigSection};
17use kabu::types::blockchain::KabuDataTypesEthereum;
18use kabu::types::entities::strategy_config::load_from_file;
19use kabu::types::entities::{BlockHistoryState, PoolClass};
20use reth::api::NodeTypes;
21use reth::revm::{Database, DatabaseCommit, DatabaseRef};
22use reth_exex::ExExContext;
23use reth_node_api::FullNodeComponents;
24use reth_primitives::EthPrimitives;
25use std::env;
26use std::future::Future;
27use tracing::info;
28
29pub async fn init<Node>(
30 ctx: ExExContext<Node>,
31 bc: Blockchain,
32 config: NodeBlockActorConfig,
33) -> eyre::Result<impl Future<Output = eyre::Result<()>>>
34where
35 Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
36{
37 Ok(kabu_exex(ctx, bc, config.clone()))
38}
39
40pub async fn start_kabu<P, DB>(
41 provider: P,
42 bc: Blockchain,
43 bc_state: BlockchainState<DB, KabuDataTypesEthereum>,
44 strategy: Strategy<DB>,
45 topology_config: TopologyConfig,
46 kabu_config_filepath: String,
47 is_exex: bool,
48) -> eyre::Result<()>
49where
50 P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
51 DB: Database<Error = KabuDBError>
52 + DatabaseRef<Error = KabuDBError>
53 + DatabaseCommit
54 + DatabaseKabuExt
55 + BlockHistoryState<KabuDataTypesEthereum>
56 + Send
57 + Sync
58 + Clone
59 + Default
60 + 'static,
61{
62 let chain_id = provider.get_chain_id().await?;
63
64 info!(chain_id = ?chain_id, "Starting Kabu" );
65
66 let (_encoder_name, encoder) = topology_config.encoders.iter().next().ok_or_eyre("NO_ENCODER")?;
67
68 let multicaller_address: Option<Address> = match encoder {
69 EncoderConfig::SwapStep(e) => e.address.parse().ok(),
70 };
71 let multicaller_address = multicaller_address.ok_or_eyre("MULTICALLER_ADDRESS_NOT_SET")?;
72 let private_key_encrypted = hex::decode(env::var("DATA")?)?;
73 info!(address=?multicaller_address, "Multicaller");
74
75 let webserver_host = topology_config.webserver.unwrap_or_default().host;
76 let db_url = topology_config.database.unwrap().url;
77 let db_pool = init_db_pool(db_url).await?;
78
79 let relays = topology_config
81 .actors
82 .broadcaster
83 .as_ref()
84 .and_then(|b| b.get("mainnet"))
85 .map(|b| match b {
86 BroadcasterConfig::Flashbots(f) => f.relays(),
87 })
88 .unwrap_or_default();
89
90 let pools_config =
91 PoolsLoadingConfig::disable_all(PoolsLoadingConfig::default()).enable(PoolClass::UniswapV2).enable(PoolClass::UniswapV3);
92
93 let backrun_config: BackrunConfigSection = load_from_file::<BackrunConfigSection>(kabu_config_filepath.into()).await?;
94 let backrun_config: BackrunConfig = backrun_config.backrun_strategy;
95
96 let swap_encoder = MulticallerSwapEncoder::default_with_address(multicaller_address);
97
98 let mut bc_actors = BlockchainActors::new(provider.clone(), swap_encoder.clone(), bc.clone(), bc_state, strategy, relays);
99 bc_actors
100 .mempool()?
101 .with_wait_for_node_sync()? .initialize_signers_with_encrypted_key(private_key_encrypted)? .with_block_history()? .with_price_station()? .with_health_monitor_pools()? .with_health_monitor_stuffing_tx()? .with_swap_encoder(swap_encoder)? .with_evm_estimator()? .with_signers()? .with_flashbots_broadcaster( true)? .with_market_state_preloader()? .with_nonce_and_balance_monitor()? .with_pool_history_loader(pools_config.clone())? .with_new_pool_loader(pools_config.clone())? .with_pool_loader(pools_config.clone())?
118 .with_swap_path_merger()? .with_diff_path_merger()? .with_same_path_merger()? .with_backrun_block(backrun_config.clone())? .with_backrun_mempool(backrun_config)? .with_web_server(webserver_host, Router::new(), db_pool)? ;
125
126 if !is_exex {
127 bc_actors.with_block_events(NodeBlockActorConfig::all_enabled())?.with_remote_mempool(provider.clone())?;
128 }
129
130 if let Some(influxdb_config) = topology_config.influxdb {
131 bc_actors
132 .with_influxdb_writer(influxdb_config.url, influxdb_config.database, influxdb_config.tags)?
133 .with_block_latency_recorder()?;
134 }
135
136 bc_actors.wait().await;
137
138 Ok(())
139}