1use crate::arguments::{AppArgs, Command, KabuArgs};
2use alloy::eips::BlockId;
3use alloy::providers::{IpcConnect, ProviderBuilder, WsConnect};
4use alloy::rpc::client::ClientBuilder;
5use clap::{CommandFactory, FromArgMatches, Parser};
6use kabu::core::blockchain::{Blockchain, BlockchainState, Strategy};
7use kabu::core::topology::TopologyConfig;
8use kabu::evm::db::{AlloyDB, KabuDB};
9use kabu::node::actor_config::NodeBlockActorConfig;
10use kabu::node::exex::mempool_worker;
11use kabu::types::blockchain::KabuDataTypesEthereum;
12use kabu::types::entities::MarketState;
13use reth::chainspec::{Chain, EthereumChainSpecParser};
14use reth::cli::Cli;
15use reth_node_ethereum::node::EthereumAddOns;
16use reth_node_ethereum::EthereumNode;
17use reth_provider::providers::BlockchainProvider;
18use std::time::Duration;
19use tokio::{signal, task};
20use tracing::{error, info};
21use tracing_subscriber::layer::SubscriberExt;
22use tracing_subscriber::util::SubscriberInitExt;
23use tracing_subscriber::{fmt, EnvFilter, Layer};
24
25mod arguments;
26mod kabu_runtime;
27
28fn main() -> eyre::Result<()> {
29 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
30 let fmt_layer = fmt::Layer::default().with_thread_ids(true).with_file(false).with_line_number(true).with_filter(env_filter);
31 tracing_subscriber::registry().with(fmt_layer).init();
32
33 let app_args = AppArgs::from_arg_matches_mut(&mut AppArgs::command().ignore_errors(true).get_matches())?;
35 match app_args.command {
36 Command::Node(_) => Cli::<EthereumChainSpecParser, KabuArgs>::parse().run(|builder, kabu_args: KabuArgs| async move {
37 let topology_config = TopologyConfig::load_from_file(kabu_args.kabu_config.clone())?;
38
39 let bc = Blockchain::new(builder.config().chain.chain.id());
40 let bc_clone = bc.clone();
41
42 let handle = builder
43 .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
44 .with_components(EthereumNode::components())
45 .with_add_ons(EthereumAddOns::default())
46 .install_exex("kabu-exex", |node_ctx| kabu_runtime::init(node_ctx, bc_clone, NodeBlockActorConfig::all_enabled()))
47 .launch()
48 .await?;
49
50 let mempool = handle.node.pool.clone();
51 let ipc_provider =
52 ProviderBuilder::new().disable_recommended_fillers().connect_ipc(IpcConnect::new(handle.node.config.rpc.ipcpath)).await?;
53 let alloy_db = AlloyDB::new(ipc_provider.clone(), BlockId::latest()).unwrap();
54
55 let state_db = KabuDB::new().with_ext_db(alloy_db);
56
57 let bc_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new_with_market_state(MarketState::new(state_db));
58
59 let strategy = Strategy::<KabuDB>::new();
60
61 let bc_clone = bc.clone();
62 tokio::task::spawn(async move {
63 if let Err(e) = kabu_runtime::start_kabu(
64 ipc_provider,
65 bc_clone,
66 bc_state,
67 strategy,
68 topology_config,
69 kabu_args.kabu_config.clone(),
70 true,
71 )
72 .await
73 {
74 error!("Error starting kabu: {:?}", e);
75 }
76 });
77 tokio::task::spawn(mempool_worker(mempool, bc));
78
79 handle.node_exit_future.await
80 }),
81 Command::Remote(kabu_args) => {
82 let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;
83
84 rt.block_on(async {
85 info!("Loading config from {}", kabu_args.kabu_config);
86 let topology_config = TopologyConfig::load_from_file(kabu_args.kabu_config.clone())?;
87
88 let client_config = topology_config.clients.get("remote").unwrap();
89 let transport = WsConnect::new(client_config.url.clone());
90 let client = ClientBuilder::default().ws(transport).await?;
91 let provider = ProviderBuilder::new().disable_recommended_fillers().connect_client(client);
92 let bc = Blockchain::new(Chain::mainnet().id());
93 let bc_clone = bc.clone();
94
95 let bc_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
96
97 let strategy = Strategy::<KabuDB>::new();
98
99 if let Err(e) =
100 kabu_runtime::start_kabu(provider, bc_clone, bc_state, strategy, topology_config, kabu_args.kabu_config.clone(), false)
101 .await
102 {
103 error!("Error starting kabu: {:#?}", e);
104 panic!("{}", e)
105 }
106
107 tokio::select! {
109 _ = signal::ctrl_c() => {
110 info!("CTRL+C received... exiting");
111 }
112 _ = async {
113 loop {
114 tokio::time::sleep(Duration::from_secs(60)).await;
115 task::yield_now().await;
116 }
117 } => {}
118 }
119 Ok::<(), eyre::Error>(())
120 })?;
121 Ok(())
122 }
123 }
124}