kabu_exex/
main.rs

1use crate::arguments::{AppArgs, Command, KabuArgs};
2use alloy::eips::BlockId;
3use alloy::providers::{IpcConnect, ProviderBuilder, WsConnect};
4use alloy::rpc::client::ClientBuilder;
5use clap::{CommandFactory, FromArgMatches, Parser};
6use kabu::core::blockchain::{Blockchain, BlockchainState, Strategy};
7use kabu::core::topology::TopologyConfig;
8use kabu::evm::db::{AlloyDB, KabuDB};
9use kabu::node::actor_config::NodeBlockActorConfig;
10use kabu::node::exex::mempool_worker;
11use kabu::types::blockchain::KabuDataTypesEthereum;
12use kabu::types::entities::MarketState;
13use reth::chainspec::{Chain, EthereumChainSpecParser};
14use reth::cli::Cli;
15use reth_node_ethereum::node::EthereumAddOns;
16use reth_node_ethereum::EthereumNode;
17use reth_provider::providers::BlockchainProvider;
18use std::time::Duration;
19use tokio::{signal, task};
20use tracing::{error, info};
21use tracing_subscriber::layer::SubscriberExt;
22use tracing_subscriber::util::SubscriberInitExt;
23use tracing_subscriber::{fmt, EnvFilter, Layer};
24
25mod arguments;
26mod kabu_runtime;
27
28fn main() -> eyre::Result<()> {
29    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
30    let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
31    tracing_subscriber::registry().with(fmt_layer).init();
32
33    // ignore arguments used by reth
34    let app_args = AppArgs::from_arg_matches_mut(&mut AppArgs::command().ignore_errors(true).get_matches())?;
35    match app_args.command {
36        Command::Node(_) => Cli::<EthereumChainSpecParser, KabuArgs>::parse().run(|builder, kabu_args: KabuArgs| async move {
37            let topology_config = TopologyConfig::load_from_file(kabu_args.kabu_config.clone())?;
38
39            let bc = Blockchain::new(builder.config().chain.chain.id());
40            let bc_clone = bc.clone();
41
42            let handle = builder
43                .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
44                .with_components(EthereumNode::components())
45                .with_add_ons(EthereumAddOns::default())
46                .install_exex("kabu-exex", |node_ctx| kabu_runtime::init(node_ctx, bc_clone, NodeBlockActorConfig::all_enabled()))
47                .launch()
48                .await?;
49
50            let mempool = handle.node.pool.clone();
51            let ipc_provider =
52                ProviderBuilder::new().disable_recommended_fillers().connect_ipc(IpcConnect::new(handle.node.config.rpc.ipcpath)).await?;
53            let alloy_db = AlloyDB::new(ipc_provider.clone(), BlockId::latest()).unwrap();
54
55            let state_db = KabuDB::new().with_ext_db(alloy_db);
56
57            let bc_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new_with_market_state(MarketState::new(state_db));
58
59            let strategy = Strategy::<KabuDB>::new();
60
61            let bc_clone = bc.clone();
62            tokio::task::spawn(async move {
63                if let Err(e) = kabu_runtime::start_kabu(
64                    ipc_provider,
65                    bc_clone,
66                    bc_state,
67                    strategy,
68                    topology_config,
69                    kabu_args.kabu_config.clone(),
70                    true,
71                )
72                .await
73                {
74                    error!("Error starting kabu: {:?}", e);
75                }
76            });
77            tokio::task::spawn(mempool_worker(mempool, bc));
78
79            handle.node_exit_future.await
80        }),
81        Command::Remote(kabu_args) => {
82            let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;
83
84            rt.block_on(async {
85                info!("Loading config from {}", kabu_args.kabu_config);
86                let topology_config = TopologyConfig::load_from_file(kabu_args.kabu_config.clone())?;
87
88                let client_config = topology_config.clients.get("remote").unwrap();
89                let transport = WsConnect::new(client_config.url.clone());
90                let client = ClientBuilder::default().ws(transport).await?;
91                let provider = ProviderBuilder::new().disable_recommended_fillers().connect_client(client);
92                let bc = Blockchain::new(Chain::mainnet().id());
93                let bc_clone = bc.clone();
94
95                let bc_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
96
97                let strategy = Strategy::<KabuDB>::new();
98
99                if let Err(e) =
100                    kabu_runtime::start_kabu(provider, bc_clone, bc_state, strategy, topology_config, kabu_args.kabu_config.clone(), false)
101                        .await
102                {
103                    error!("Error starting kabu: {:#?}", e);
104                    panic!("{}", e)
105                }
106
107                // keep kabu running
108                tokio::select! {
109                    _ = signal::ctrl_c() => {
110                    info!("CTRL+C received... exiting");
111                }
112                _ = async {
113                        loop {
114                        tokio::time::sleep(Duration::from_secs(60)).await;
115                        task::yield_now().await;
116                        }
117                    } => {}
118                }
119                Ok::<(), eyre::Error>(())
120            })?;
121            Ok(())
122        }
123    }
124}