1use alloy::providers::Provider;
2use eyre::Result;
3use tracing::{error, info};
4
5use kabu::core::actors::{Accessor, Actor, Consumer, Producer};
6use kabu::core::router::SwapRouterActor;
7use kabu::core::topology::{Topology, TopologyConfig};
8use kabu::defi::health_monitor::{MetricsRecorderActor, StateHealthMonitorActor, StuffingTxMonitorActor};
9use kabu::evm::db::KabuDBType;
10use kabu::execution::multicaller::MulticallerSwapEncoder;
11use kabu::metrics::InfluxDbWriterActor;
12use kabu::strategy::backrun::{BackrunConfig, BackrunConfigSection, StateChangeArbActor};
13use kabu::strategy::merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
14use kabu::types::entities::strategy_config::load_from_file;
15use kabu::types::events::MarketEvents;
16
17#[tokio::main]
18async fn main() -> Result<()> {
19 env_logger::Builder::from_env(
20 env_logger::Env::default().default_filter_or("debug,tokio_tungstenite=off,tungstenite=off,alloy_rpc_client=off"),
21 )
22 .format_timestamp_micros()
23 .init();
24
25 let topology_config = TopologyConfig::load_from_file("config.toml".to_string())?;
26 let influxdb_config = topology_config.influxdb.clone();
27
28 let encoder = MulticallerSwapEncoder::default();
29
30 let topology =
31 Topology::<KabuDBType>::from_config(topology_config).with_swap_encoder(encoder).build_blockchains().start_clients().await?;
32
33 let mut worker_task_vec = topology.start_actors().await?;
34
35 let client = topology.get_client(Some("local".to_string()).as_ref())?;
40 let blockchain = topology.get_blockchain(Some("mainnet".to_string()).as_ref())?;
41 let blockchain_state = topology.get_blockchain_state(Some("mainnet".to_string()).as_ref())?;
42 let strategy = topology.get_strategy(Some("mainnet".to_string()).as_ref())?;
43
44 let tx_signers = topology.get_signers(Some("env_signer".to_string()).as_ref())?;
45
46 let backrun_config: BackrunConfigSection = load_from_file("./config.toml".to_string().into()).await?;
47 let backrun_config: BackrunConfig = backrun_config.backrun_strategy;
48
49 let block_nr = client.get_block_number().await?;
50 info!("Block : {}", block_nr);
51
52 info!("Creating shared state");
53
54 info!("Starting state change arb actor");
55 let mut state_change_arb_actor = StateChangeArbActor::new(client.clone(), true, true, backrun_config);
56 match state_change_arb_actor
57 .access(blockchain.mempool())
58 .access(blockchain.latest_block())
59 .access(blockchain.market())
60 .access(blockchain_state.market_state())
61 .access(blockchain_state.block_history())
62 .consume(blockchain.market_events_channel())
63 .consume(blockchain.mempool_events_channel())
64 .produce(strategy.swap_compose_channel())
65 .produce(blockchain.health_monitor_channel())
66 .produce(blockchain.influxdb_write_channel())
67 .start()
68 {
69 Err(e) => {
70 error!("{}", e)
71 }
72 Ok(r) => {
73 worker_task_vec.extend(r);
74 info!("State change arb actor started successfully")
75 }
76 }
77
78 let multicaller_address = topology.get_multicaller_address(None)?;
79 info!("Starting swap path encoder actor with multicaller at : {}", multicaller_address);
80
81 let mut swap_path_encoder_actor = SwapRouterActor::new();
82
83 match swap_path_encoder_actor
84 .access(tx_signers.clone())
85 .access(blockchain.nonce_and_balance())
86 .consume(strategy.swap_compose_channel())
87 .produce(strategy.swap_compose_channel())
88 .produce(blockchain.tx_compose_channel())
89 .start()
90 {
91 Ok(r) => {
92 worker_task_vec.extend(r);
93 info!("Swap path encoder actor started successfully")
94 }
95 Err(e) => {
96 panic!("ArbSwapPathEncoderActor {e}")
97 }
98 }
99
100 info!("Starting swap path merger actor");
101
102 let mut swap_path_merger_actor = ArbSwapPathMergerActor::new(multicaller_address);
103
104 match swap_path_merger_actor
105 .access(blockchain.latest_block())
106 .consume(blockchain.market_events_channel())
107 .consume(strategy.swap_compose_channel())
108 .produce(strategy.swap_compose_channel())
109 .start()
110 {
111 Ok(r) => {
112 worker_task_vec.extend(r);
113 info!("Swap path merger actor started successfully")
114 }
115 Err(e) => {
116 panic!("{}", e)
117 }
118 }
119
120 let mut same_path_merger_actor = SamePathMergerActor::new(client.clone());
121
122 match same_path_merger_actor
123 .access(blockchain_state.market_state())
124 .access(blockchain.latest_block())
125 .consume(blockchain.market_events_channel())
126 .consume(strategy.swap_compose_channel())
127 .produce(strategy.swap_compose_channel())
128 .start()
129 {
130 Ok(r) => {
131 worker_task_vec.extend(r);
132 info!("Same path merger actor started successfully")
133 }
134 Err(e) => {
135 panic!("{}", e)
136 }
137 }
138
139 let mut diff_path_merger_actor = DiffPathMergerActor::new();
141
142 match diff_path_merger_actor
143 .consume(blockchain.market_events_channel())
144 .consume(strategy.swap_compose_channel())
145 .produce(strategy.swap_compose_channel())
146 .start()
147 {
148 Ok(r) => {
149 worker_task_vec.extend(r);
150 info!("Diff path merger actor started successfully")
151 }
152 Err(e) => {
153 panic!("{}", e)
154 }
155 }
156
157 let mut state_health_monitor_actor = StateHealthMonitorActor::new(client.clone());
159
160 match state_health_monitor_actor
161 .access(blockchain_state.market_state())
162 .consume(blockchain.tx_compose_channel())
163 .consume(blockchain.market_events_channel())
164 .start()
165 {
166 Err(e) => {
167 panic!("State health monitor actor failed : {e}")
168 }
169 Ok(r) => {
170 worker_task_vec.extend(r);
171 info!("State health monitor actor started successfully")
172 }
173 }
174
175 let mut stuffing_txs_monitor_actor = StuffingTxMonitorActor::new(client.clone());
177 match stuffing_txs_monitor_actor
178 .access(blockchain.latest_block())
179 .consume(blockchain.tx_compose_channel())
180 .consume(blockchain.market_events_channel())
181 .produce(blockchain.influxdb_write_channel())
182 .start()
183 {
184 Err(e) => {
185 panic!("Stuffing txs monitor actor failed : {e}")
186 }
187 Ok(r) => {
188 worker_task_vec.extend(r);
189 info!("Stuffing txs monitor actor started successfully")
190 }
191 }
192
193 if let Some(influxdb_config) = influxdb_config {
195 let mut influxdb_writer_actor = InfluxDbWriterActor::new(influxdb_config.url, influxdb_config.database, influxdb_config.tags);
196 match influxdb_writer_actor.consume(blockchain.influxdb_write_channel()).start() {
197 Err(e) => {
198 panic!("InfluxDB writer actor failed : {e}")
199 }
200 Ok(r) => {
201 worker_task_vec.extend(r);
202 info!("InfluxDB writer actor started successfully")
203 }
204 }
205
206 let mut block_latency_recorder_actor = MetricsRecorderActor::new();
207 match block_latency_recorder_actor
208 .access(blockchain.market())
209 .access(blockchain_state.market_state())
210 .consume(blockchain.new_block_headers_channel())
211 .produce(blockchain.influxdb_write_channel())
212 .start()
213 {
214 Err(e) => {
215 panic!("Block latency recorder actor failed : {e}")
216 }
217 Ok(r) => {
218 worker_task_vec.extend(r);
219 info!("Block latency recorder actor started successfully")
220 }
221 }
222 }
223
224 tokio::task::spawn(async move {
226 while !worker_task_vec.is_empty() {
227 let (result, _index, remaining_futures) = futures::future::select_all(worker_task_vec).await;
228 match result {
229 Ok(work_result) => match work_result {
230 Ok(s) => {
231 info!("ActorWorker {_index} finished : {s}")
232 }
233 Err(e) => {
234 error!("ActorWorker {_index} error : {e}")
235 }
236 },
237 Err(e) => {
238 error!("ActorWorker join error {_index} : {e}")
239 }
240 }
241 worker_task_vec = remaining_futures;
242 }
243 });
244
245 let mut s = blockchain.market_events_channel().subscribe();
247 loop {
248 let msg = s.recv().await;
249 if let Ok(msg) = msg {
250 match msg {
251 MarketEvents::BlockTxUpdate { block_number, block_hash } => {
252 info!("New block received {} {}", block_number, block_hash);
253 }
254 MarketEvents::BlockStateUpdate { block_hash } => {
255 info!("New block state received {}", block_hash);
256 }
257 _ => {}
258 }
259 }
260 }
261}