kabu_backrun/
main.rs

1use alloy::providers::Provider;
2use eyre::Result;
3use tracing::{error, info};
4
5use kabu::core::actors::{Accessor, Actor, Consumer, Producer};
6use kabu::core::router::SwapRouterActor;
7use kabu::core::topology::{Topology, TopologyConfig};
8use kabu::defi::health_monitor::{MetricsRecorderActor, StateHealthMonitorActor, StuffingTxMonitorActor};
9use kabu::evm::db::KabuDBType;
10use kabu::execution::multicaller::MulticallerSwapEncoder;
11use kabu::metrics::InfluxDbWriterActor;
12use kabu::strategy::backrun::{BackrunConfig, BackrunConfigSection, StateChangeArbActor};
13use kabu::strategy::merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
14use kabu::types::entities::strategy_config::load_from_file;
15use kabu::types::events::MarketEvents;
16
17#[tokio::main]
18async fn main() -> Result<()> {
19    env_logger::Builder::from_env(
20        env_logger::Env::default().default_filter_or("debug,tokio_tungstenite=off,tungstenite=off,alloy_rpc_client=off"),
21    )
22    .format_timestamp_micros()
23    .init();
24
25    let topology_config = TopologyConfig::load_from_file("config.toml".to_string())?;
26    let influxdb_config = topology_config.influxdb.clone();
27
28    let encoder = MulticallerSwapEncoder::default();
29
30    let topology =
31        Topology::<KabuDBType>::from_config(topology_config).with_swap_encoder(encoder).build_blockchains().start_clients().await?;
32
33    let mut worker_task_vec = topology.start_actors().await?;
34
35    //mut worker_task_vec = topology.start_actors().await;
36
37    //let (topology, mut worker_task_vec) = Topology::<KabuDBType>::from(topology_config, encoder).await?;
38
39    let client = topology.get_client(Some("local".to_string()).as_ref())?;
40    let blockchain = topology.get_blockchain(Some("mainnet".to_string()).as_ref())?;
41    let blockchain_state = topology.get_blockchain_state(Some("mainnet".to_string()).as_ref())?;
42    let strategy = topology.get_strategy(Some("mainnet".to_string()).as_ref())?;
43
44    let tx_signers = topology.get_signers(Some("env_signer".to_string()).as_ref())?;
45
46    let backrun_config: BackrunConfigSection = load_from_file("./config.toml".to_string().into()).await?;
47    let backrun_config: BackrunConfig = backrun_config.backrun_strategy;
48
49    let block_nr = client.get_block_number().await?;
50    info!("Block : {}", block_nr);
51
52    info!("Creating shared state");
53
54    info!("Starting state change arb actor");
55    let mut state_change_arb_actor = StateChangeArbActor::new(client.clone(), true, true, backrun_config);
56    match state_change_arb_actor
57        .access(blockchain.mempool())
58        .access(blockchain.latest_block())
59        .access(blockchain.market())
60        .access(blockchain_state.market_state())
61        .access(blockchain_state.block_history())
62        .consume(blockchain.market_events_channel())
63        .consume(blockchain.mempool_events_channel())
64        .produce(strategy.swap_compose_channel())
65        .produce(blockchain.health_monitor_channel())
66        .produce(blockchain.influxdb_write_channel())
67        .start()
68    {
69        Err(e) => {
70            error!("{}", e)
71        }
72        Ok(r) => {
73            worker_task_vec.extend(r);
74            info!("State change arb actor started successfully")
75        }
76    }
77
78    let multicaller_address = topology.get_multicaller_address(None)?;
79    info!("Starting swap path encoder actor with multicaller at : {}", multicaller_address);
80
81    let mut swap_path_encoder_actor = SwapRouterActor::new();
82
83    match swap_path_encoder_actor
84        .access(tx_signers.clone())
85        .access(blockchain.nonce_and_balance())
86        .consume(strategy.swap_compose_channel())
87        .produce(strategy.swap_compose_channel())
88        .produce(blockchain.tx_compose_channel())
89        .start()
90    {
91        Ok(r) => {
92            worker_task_vec.extend(r);
93            info!("Swap path encoder actor started successfully")
94        }
95        Err(e) => {
96            panic!("ArbSwapPathEncoderActor {e}")
97        }
98    }
99
100    info!("Starting swap path merger actor");
101
102    let mut swap_path_merger_actor = ArbSwapPathMergerActor::new(multicaller_address);
103
104    match swap_path_merger_actor
105        .access(blockchain.latest_block())
106        .consume(blockchain.market_events_channel())
107        .consume(strategy.swap_compose_channel())
108        .produce(strategy.swap_compose_channel())
109        .start()
110    {
111        Ok(r) => {
112            worker_task_vec.extend(r);
113            info!("Swap path merger actor started successfully")
114        }
115        Err(e) => {
116            panic!("{}", e)
117        }
118    }
119
120    let mut same_path_merger_actor = SamePathMergerActor::new(client.clone());
121
122    match same_path_merger_actor
123        .access(blockchain_state.market_state())
124        .access(blockchain.latest_block())
125        .consume(blockchain.market_events_channel())
126        .consume(strategy.swap_compose_channel())
127        .produce(strategy.swap_compose_channel())
128        .start()
129    {
130        Ok(r) => {
131            worker_task_vec.extend(r);
132            info!("Same path merger actor started successfully")
133        }
134        Err(e) => {
135            panic!("{}", e)
136        }
137    }
138
139    // Merger
140    let mut diff_path_merger_actor = DiffPathMergerActor::new();
141
142    match diff_path_merger_actor
143        .consume(blockchain.market_events_channel())
144        .consume(strategy.swap_compose_channel())
145        .produce(strategy.swap_compose_channel())
146        .start()
147    {
148        Ok(r) => {
149            worker_task_vec.extend(r);
150            info!("Diff path merger actor started successfully")
151        }
152        Err(e) => {
153            panic!("{}", e)
154        }
155    }
156
157    // Monitoring pool state health, disabling pool if there are problems
158    let mut state_health_monitor_actor = StateHealthMonitorActor::new(client.clone());
159
160    match state_health_monitor_actor
161        .access(blockchain_state.market_state())
162        .consume(blockchain.tx_compose_channel())
163        .consume(blockchain.market_events_channel())
164        .start()
165    {
166        Err(e) => {
167            panic!("State health monitor actor failed : {e}")
168        }
169        Ok(r) => {
170            worker_task_vec.extend(r);
171            info!("State health monitor actor started successfully")
172        }
173    }
174
175    // Monitoring transactions we tried to attach to.
176    let mut stuffing_txs_monitor_actor = StuffingTxMonitorActor::new(client.clone());
177    match stuffing_txs_monitor_actor
178        .access(blockchain.latest_block())
179        .consume(blockchain.tx_compose_channel())
180        .consume(blockchain.market_events_channel())
181        .produce(blockchain.influxdb_write_channel())
182        .start()
183    {
184        Err(e) => {
185            panic!("Stuffing txs monitor actor failed : {e}")
186        }
187        Ok(r) => {
188            worker_task_vec.extend(r);
189            info!("Stuffing txs monitor actor started successfully")
190        }
191    }
192
193    // Recording InfluxDB metrics
194    if let Some(influxdb_config) = influxdb_config {
195        let mut influxdb_writer_actor = InfluxDbWriterActor::new(influxdb_config.url, influxdb_config.database, influxdb_config.tags);
196        match influxdb_writer_actor.consume(blockchain.influxdb_write_channel()).start() {
197            Err(e) => {
198                panic!("InfluxDB writer actor failed : {e}")
199            }
200            Ok(r) => {
201                worker_task_vec.extend(r);
202                info!("InfluxDB writer actor started successfully")
203            }
204        }
205
206        let mut block_latency_recorder_actor = MetricsRecorderActor::new();
207        match block_latency_recorder_actor
208            .access(blockchain.market())
209            .access(blockchain_state.market_state())
210            .consume(blockchain.new_block_headers_channel())
211            .produce(blockchain.influxdb_write_channel())
212            .start()
213        {
214            Err(e) => {
215                panic!("Block latency recorder actor failed : {e}")
216            }
217            Ok(r) => {
218                worker_task_vec.extend(r);
219                info!("Block latency recorder actor started successfully")
220            }
221        }
222    }
223
224    // Checking workers, logging if some close
225    tokio::task::spawn(async move {
226        while !worker_task_vec.is_empty() {
227            let (result, _index, remaining_futures) = futures::future::select_all(worker_task_vec).await;
228            match result {
229                Ok(work_result) => match work_result {
230                    Ok(s) => {
231                        info!("ActorWorker {_index} finished : {s}")
232                    }
233                    Err(e) => {
234                        error!("ActorWorker {_index} error : {e}")
235                    }
236                },
237                Err(e) => {
238                    error!("ActorWorker join error {_index} : {e}")
239                }
240            }
241            worker_task_vec = remaining_futures;
242        }
243    });
244
245    // listening to MarketEvents in an infinite loop
246    let mut s = blockchain.market_events_channel().subscribe();
247    loop {
248        let msg = s.recv().await;
249        if let Ok(msg) = msg {
250            match msg {
251                MarketEvents::BlockTxUpdate { block_number, block_hash } => {
252                    info!("New block received {} {}", block_number, block_hash);
253                }
254                MarketEvents::BlockStateUpdate { block_hash } => {
255                    info!("New block state received {}", block_hash);
256                }
257                _ => {}
258            }
259        }
260    }
261}