kabu_strategy_merger/
diffpath_merger_actor.rs1use alloy_network::TransactionResponse;
2use alloy_primitives::{Address, TxHash};
3use alloy_rpc_types::Transaction;
4use eyre::{OptionExt, Result};
5use lazy_static::lazy_static;
6use revm::{Database, DatabaseCommit, DatabaseRef};
7use tokio::sync::broadcast::error::RecvError;
8use tokio::sync::broadcast::Receiver;
9use tracing::{debug, error, info};
10
11use kabu_core_actors::{Actor, ActorResult, Broadcaster, Consumer, Producer, WorkerResult};
12use kabu_core_actors_macros::{Accessor, Consumer, Producer};
13use kabu_core_blockchain::{Blockchain, Strategy};
14use kabu_evm_utils::NWETH;
15use kabu_types_entities::{MarketState, Swap};
16use kabu_types_events::{MarketEvents, MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
17
18lazy_static! {
19 static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
20}
21
22fn get_merge_list<'a, DB: Clone + Send + Sync + 'static>(
23 request: &SwapComposeData<DB>,
24 swap_paths: &'a [SwapComposeData<DB>],
25) -> Vec<&'a SwapComposeData<DB>> {
26 let mut ret: Vec<&SwapComposeData<DB>> = Vec::new();
27 let mut pools = request.swap.get_pool_id_vec();
28 for p in swap_paths.iter() {
29 if !p.cross_pools(&pools) {
30 pools.extend(p.swap.get_pool_id_vec());
31 ret.push(p);
32 }
33 }
34 ret
35}
36
37async fn diff_path_merger_worker<DB>(
38 market_events_rx: Broadcaster<MarketEvents>,
39 compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
40 compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
41) -> WorkerResult
42where
43 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + 'static,
44{
45 let mut market_events_rx: Receiver<MarketEvents> = market_events_rx.subscribe();
46
47 let mut compose_channel_rx: Receiver<MessageSwapCompose<DB>> = compose_channel_rx.subscribe();
48
49 let mut swap_paths: Vec<SwapComposeData<DB>> = Vec::new();
50
51 loop {
52 tokio::select! {
53 msg = market_events_rx.recv() => {
54 if let Ok(msg) = msg {
55 let market_event_msg : MarketEvents = msg;
56 if let MarketEvents::BlockHeaderUpdate{block_number, block_hash, timestamp, base_fee, next_base_fee} = market_event_msg {
57 debug!("Block header update {} {} ts {} base_fee {} next {} ", block_number, block_hash, timestamp, base_fee, next_base_fee);
58 swap_paths = Vec::new();
63
64 }
74 }
75 }
76
77
78 msg = compose_channel_rx.recv() => {
79 let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
80 match msg {
81 Ok(compose_request)=>{
82 if let SwapComposeMessage::Ready(sign_request) = compose_request.inner() {
83 if matches!( sign_request.swap, Swap::BackrunSwapLine(_)) || matches!( sign_request.swap, Swap::BackrunSwapSteps(_)) {
84 let mut merge_list = get_merge_list(sign_request, &swap_paths);
85
86 if !merge_list.is_empty() {
87 let swap_vec : Vec<Swap> = merge_list.iter().map(|x|x.swap.clone()).collect();
88 info!("Merging started {:?}", swap_vec );
89
90 let mut state = MarketState::new(sign_request.poststate.clone().unwrap().clone());
91
92 for dbs in merge_list.iter() {
93 state.apply_geth_update_vec( dbs.poststate_update.clone().ok_or_eyre("NO_STATE_UPDATE")?);
94 }
95
96 merge_list.push(sign_request);
97
98 let mut stuffing_txs_hashes : Vec<TxHash> = Vec::new();
99 let mut stuffing_txs : Vec<Transaction> = Vec::new();
100
101 for req in merge_list.iter() {
102 for tx in req.tx_compose.stuffing_txs.iter() {
103 if !stuffing_txs_hashes.contains(&tx.tx_hash()) {
104 stuffing_txs_hashes.push(tx.tx_hash());
105 stuffing_txs.push(tx.clone());
106 }
107 }
108 }
109
110 let encode_request = MessageSwapCompose::prepare(
111 SwapComposeData {
112 tx_compose : TxComposeData {
113 stuffing_txs_hashes,
114 stuffing_txs,
115 ..sign_request.tx_compose.clone()
116 },
117 swap : Swap::Multiple( merge_list.iter().map(|i| i.swap.clone() ).collect()) ,
118 origin : Some("diffpath_merger".to_string()),
119 tips_pct : Some(9000),
120 poststate : Some(state.state_db),
121 ..sign_request.clone()
122 }
123 );
124 info!("+++ Calculation finished. Merge list : {} profit : {}",merge_list.len(), NWETH::to_float(encode_request.inner.swap.arb_profit_eth()) );
125
126 if let Err(e) = compose_channel_tx.send(encode_request) {
127 error!("{}",e)
128 }
129 }
130
131 swap_paths.push(sign_request.clone());
132 swap_paths.sort_by(|a, b| b.swap.arb_profit_eth().cmp(&a.swap.arb_profit_eth() ) )
133 }
134 }
135 }
136 Err(e)=>{error!("{e}")}
137 }
138
139 }
140
141
142 }
143 }
144}
145
146#[derive(Consumer, Producer, Accessor, Default)]
147pub struct DiffPathMergerActor<DB: Clone + Send + Sync + 'static> {
148 #[consumer]
149 market_events: Option<Broadcaster<MarketEvents>>,
150 #[consumer]
151 compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
152 #[producer]
153 compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
154}
155
156impl<DB> DiffPathMergerActor<DB>
157where
158 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + Default + 'static,
159{
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn on_bc(self, bc: &Blockchain) -> Self {
165 Self { market_events: Some(bc.market_events_channel()), ..self }
166 }
167
168 pub fn on_strategy(self, strategy: &Strategy<DB>) -> Self {
169 Self {
170 compose_channel_tx: Some(strategy.swap_compose_channel()),
171 compose_channel_rx: Some(strategy.swap_compose_channel()),
172 ..self
173 }
174 }
175}
176
177impl<DB> Actor for DiffPathMergerActor<DB>
178where
179 DB: DatabaseRef + Database + DatabaseCommit + Send + Sync + Clone + Default + 'static,
180{
181 fn start(&self) -> ActorResult {
182 let task = tokio::task::spawn(diff_path_merger_worker(
183 self.market_events.clone().unwrap(),
184 self.compose_channel_rx.clone().unwrap(),
185 self.compose_channel_tx.clone().unwrap(),
186 ));
187 Ok(vec![task])
188 }
189
190 fn name(&self) -> &'static str {
191 "DiffPathMergerActor"
192 }
193}