kabu_exex/
kabu_runtime.rs

1use alloy::network::Ethereum;
2use alloy::primitives::Address;
3use alloy::providers::Provider;
4use axum::Router;
5use eyre::OptionExt;
6use kabu::core::blockchain::{Blockchain, BlockchainState, Strategy};
7use kabu::core::blockchain_actors::BlockchainActors;
8use kabu::core::topology::{BroadcasterConfig, EncoderConfig, TopologyConfig};
9use kabu::defi::pools::PoolsLoadingConfig;
10use kabu::evm::db::{DatabaseKabuExt, KabuDBError};
11use kabu::execution::multicaller::MulticallerSwapEncoder;
12use kabu::node::actor_config::NodeBlockActorConfig;
13use kabu::node::debug_provider::DebugProviderExt;
14use kabu::node::exex::kabu_exex;
15use kabu::storage::db::init_db_pool;
16use kabu::strategy::backrun::{BackrunConfig, BackrunConfigSection};
17use kabu::types::blockchain::KabuDataTypesEthereum;
18use kabu::types::entities::strategy_config::load_from_file;
19use kabu::types::entities::{BlockHistoryState, PoolClass};
20use reth::api::NodeTypes;
21use reth::revm::{Database, DatabaseCommit, DatabaseRef};
22use reth_exex::ExExContext;
23use reth_node_api::FullNodeComponents;
24use reth_primitives::EthPrimitives;
25use std::env;
26use std::future::Future;
27use tracing::info;
28
29pub async fn init<Node>(
30    ctx: ExExContext<Node>,
31    bc: Blockchain,
32    config: NodeBlockActorConfig,
33) -> eyre::Result<impl Future<Output = eyre::Result<()>>>
34where
35    Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
36{
37    Ok(kabu_exex(ctx, bc, config.clone()))
38}
39
40pub async fn start_kabu<P, DB>(
41    provider: P,
42    bc: Blockchain,
43    bc_state: BlockchainState<DB, KabuDataTypesEthereum>,
44    strategy: Strategy<DB>,
45    topology_config: TopologyConfig,
46    kabu_config_filepath: String,
47    is_exex: bool,
48) -> eyre::Result<()>
49where
50    P: Provider<Ethereum> + DebugProviderExt<Ethereum> + Send + Sync + Clone + 'static,
51    DB: Database<Error = KabuDBError>
52        + DatabaseRef<Error = KabuDBError>
53        + DatabaseCommit
54        + DatabaseKabuExt
55        + BlockHistoryState<KabuDataTypesEthereum>
56        + Send
57        + Sync
58        + Clone
59        + Default
60        + 'static,
61{
62    let chain_id = provider.get_chain_id().await?;
63
64    info!(chain_id = ?chain_id, "Starting Kabu" );
65
66    let (_encoder_name, encoder) = topology_config.encoders.iter().next().ok_or_eyre("NO_ENCODER")?;
67
68    let multicaller_address: Option<Address> = match encoder {
69        EncoderConfig::SwapStep(e) => e.address.parse().ok(),
70    };
71    let multicaller_address = multicaller_address.ok_or_eyre("MULTICALLER_ADDRESS_NOT_SET")?;
72    let private_key_encrypted = hex::decode(env::var("DATA")?)?;
73    info!(address=?multicaller_address, "Multicaller");
74
75    let webserver_host = topology_config.webserver.unwrap_or_default().host;
76    let db_url = topology_config.database.unwrap().url;
77    let db_pool = init_db_pool(db_url).await?;
78
79    // Get flashbots relays from config
80    let relays = topology_config
81        .actors
82        .broadcaster
83        .as_ref()
84        .and_then(|b| b.get("mainnet"))
85        .map(|b| match b {
86            BroadcasterConfig::Flashbots(f) => f.relays(),
87        })
88        .unwrap_or_default();
89
90    let pools_config =
91        PoolsLoadingConfig::disable_all(PoolsLoadingConfig::default()).enable(PoolClass::UniswapV2).enable(PoolClass::UniswapV3);
92
93    let backrun_config: BackrunConfigSection = load_from_file::<BackrunConfigSection>(kabu_config_filepath.into()).await?;
94    let backrun_config: BackrunConfig = backrun_config.backrun_strategy;
95
96    let swap_encoder = MulticallerSwapEncoder::default_with_address(multicaller_address);
97
98    let mut bc_actors = BlockchainActors::new(provider.clone(), swap_encoder.clone(), bc.clone(), bc_state, strategy, relays);
99    bc_actors
100        .mempool()?
101        .with_wait_for_node_sync()? // wait for node to sync before
102        .initialize_signers_with_encrypted_key(private_key_encrypted)? // initialize signer with encrypted key
103        .with_block_history()? // collect blocks
104        .with_price_station()? // calculate price fo tokens
105        .with_health_monitor_pools()? // monitor pools health to disable empty
106        //.with_health_monitor_state()? // monitor state health
107        .with_health_monitor_stuffing_tx()? // collect stuffing tx information
108        .with_swap_encoder(swap_encoder)? // convert swaps to opcodes and passes to estimator
109        .with_evm_estimator()? // estimate gas, add tips
110        .with_signers()? // start signer actor that signs transactions before broadcasting
111        .with_flashbots_broadcaster( true)? // broadcast signed txes to flashbots
112        .with_market_state_preloader()? // preload contracts to market state
113        .with_nonce_and_balance_monitor()? // start monitoring balances of
114        .with_pool_history_loader(pools_config.clone())? // load pools used in latest 10000 blocks
115        //.with_curve_pool_protocol_loader()? // load curve + steth + wsteth
116        .with_new_pool_loader(pools_config.clone())? // load new pools
117        .with_pool_loader(pools_config.clone())?
118        .with_swap_path_merger()? // load merger for multiple swap paths
119        .with_diff_path_merger()? // load merger for different swap paths
120        .with_same_path_merger()? // load merger for same swap paths with different stuffing txes
121        .with_backrun_block(backrun_config.clone())? // load backrun searcher for incoming block
122        .with_backrun_mempool(backrun_config)? // load backrun searcher for mempool txes
123        .with_web_server(webserver_host, Router::new(), db_pool)? // start web server
124    ;
125
126    if !is_exex {
127        bc_actors.with_block_events(NodeBlockActorConfig::all_enabled())?.with_remote_mempool(provider.clone())?;
128    }
129
130    if let Some(influxdb_config) = topology_config.influxdb {
131        bc_actors
132            .with_influxdb_writer(influxdb_config.url, influxdb_config.database, influxdb_config.tags)?
133            .with_block_latency_recorder()?;
134    }
135
136    bc_actors.wait().await;
137
138    Ok(())
139}