kabu_core_router/
swap_router_actor.rs

1use eyre::{eyre, Result};
2use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
3use kabu_core_actors_macros::{Accessor, Consumer, Producer};
4use kabu_core_blockchain::{Blockchain, Strategy};
5use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
6use kabu_types_entities::{AccountNonceAndBalanceState, TxSigners};
7use kabu_types_events::{MessageSwapCompose, MessageTxCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
8use revm::DatabaseRef;
9use tokio::sync::broadcast::error::RecvError;
10use tokio::sync::broadcast::Receiver;
11use tracing::{debug, error, info};
12
13/// encoder task performs initial routing for swap request
14async fn router_task_prepare<DB, LDT>(
15    route_request: SwapComposeData<DB, LDT>,
16    compose_channel_tx: Broadcaster<MessageSwapCompose<DB, LDT>>,
17    signers: SharedState<TxSigners<LDT>>,
18    account_monitor: SharedState<AccountNonceAndBalanceState>,
19) -> Result<()>
20where
21    DB: DatabaseRef + Send + Sync + Clone + 'static,
22    LDT: KabuDataTypes,
23{
24    debug!("router_task_prepare started {}", route_request.swap);
25
26    let signer = match route_request.tx_compose.eoa {
27        Some(eoa) => signers.read().await.get_signer_by_address(&eoa)?,
28        None => signers.read().await.get_random_signer().ok_or(eyre!("NO_SIGNER"))?,
29    };
30
31    let nonce = account_monitor.read().await.get_account(&signer.address()).unwrap().get_nonce();
32    let eth_balance = account_monitor.read().await.get_account(&signer.address()).unwrap().get_eth_balance();
33
34    if route_request.tx_compose.next_block_base_fee == 0 {
35        error!("Block base fee is not set");
36        return Err(eyre!("NO_BLOCK_GAS_FEE"));
37    }
38
39    let gas = (route_request.swap.pre_estimate_gas()) * 2;
40
41    let estimate_request = SwapComposeData::<DB, LDT> {
42        tx_compose: TxComposeData::<LDT> { signer: Some(signer), nonce, eth_balance, gas, ..route_request.tx_compose },
43        ..route_request
44    };
45    let estimate_request = MessageSwapCompose::estimate(estimate_request);
46
47    match compose_channel_tx.send(estimate_request) {
48        Err(_) => {
49            error!("compose_channel_tx.send(estimate_request)");
50            Err(eyre!("ERROR_SENDING_REQUEST"))
51        }
52        Ok(_) => Ok(()),
53    }
54}
55
56async fn router_task_broadcast<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypes>(
57    route_request: SwapComposeData<DB, LDT>,
58    tx_compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
59) -> Result<()> {
60    debug!("router_task_broadcast started {}", route_request.swap);
61
62    let tx_compose = TxComposeData { swap: Some(route_request.swap), tips: route_request.tips, ..route_request.tx_compose };
63
64    match tx_compose_channel_tx.send(MessageTxCompose::sign(tx_compose)) {
65        Err(_) => {
66            error!("compose_channel_tx.send(estimate_request)");
67            Err(eyre!("ERROR_SENDING_REQUEST"))
68        }
69        Ok(_) => Ok(()),
70    }
71}
72
73async fn swap_router_worker<DB: DatabaseRef + Clone + Send + Sync + 'static, LDT: KabuDataTypes>(
74    signers: SharedState<TxSigners<LDT>>,
75    account_monitor: SharedState<AccountNonceAndBalanceState>,
76    swap_compose_channel_rx: Broadcaster<MessageSwapCompose<DB, LDT>>,
77    swap_compose_channel_tx: Broadcaster<MessageSwapCompose<DB, LDT>>,
78    tx_compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
79) -> WorkerResult {
80    let mut compose_channel_rx: Receiver<MessageSwapCompose<DB, LDT>> = swap_compose_channel_rx.subscribe();
81
82    info!("swap router worker started");
83
84    loop {
85        tokio::select! {
86            msg = compose_channel_rx.recv() => {
87                let msg : Result<MessageSwapCompose<DB, LDT>, RecvError> = msg;
88                match msg {
89                    Ok(compose_request) => {
90                        match compose_request.inner {
91                            SwapComposeMessage::Prepare(swap_compose_request)=>{
92                                debug!("MessageSwapComposeRequest::Prepare received. stuffing: {:?} swap: {}", swap_compose_request.tx_compose.stuffing_txs_hashes, swap_compose_request.swap);
93                                tokio::task::spawn(
94                                    router_task_prepare(
95                                        swap_compose_request,
96                                        swap_compose_channel_tx.clone(),
97                                        signers.clone(),
98                                        account_monitor.clone(),
99                                    )
100                                );
101                            }
102                            SwapComposeMessage::Ready(swap_compose_request)=>{
103                                debug!("MessageSwapComposeRequest::Ready received. stuffing: {:?} swap: {}", swap_compose_request.tx_compose.stuffing_txs_hashes, swap_compose_request.swap);
104                                tokio::task::spawn(
105                                    router_task_broadcast(
106                                        swap_compose_request,
107                                        tx_compose_channel_tx.clone(),
108                                    )
109                                );
110                            }
111                            _=>{
112                                error!("Unexpected message type received in swap router worker");
113                            }
114
115                        }
116                    }
117                    Err(e)=>{error!("compose_channel_rx {}",e)}
118                }
119            }
120        }
121    }
122}
123
124#[derive(Consumer, Producer, Accessor, Default)]
125pub struct SwapRouterActor<DB: Send + Sync + Clone + 'static, LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
126    #[accessor]
127    signers: Option<SharedState<TxSigners<LDT>>>,
128    #[accessor]
129    account_nonce_balance: Option<SharedState<AccountNonceAndBalanceState>>,
130    #[consumer]
131    swap_compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
132    #[producer]
133    swap_compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
134    #[producer]
135    tx_compose_channel_tx: Option<Broadcaster<MessageTxCompose<LDT>>>,
136}
137
138impl<DB, LDT> SwapRouterActor<DB, LDT>
139where
140    DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
141    LDT: KabuDataTypes,
142{
143    pub fn new() -> SwapRouterActor<DB, LDT> {
144        SwapRouterActor {
145            signers: None,
146            account_nonce_balance: None,
147            swap_compose_channel_rx: None,
148            swap_compose_channel_tx: None,
149            tx_compose_channel_tx: None,
150        }
151    }
152
153    pub fn with_signers(self, signers: SharedState<TxSigners<LDT>>) -> Self {
154        Self { signers: Some(signers), ..self }
155    }
156
157    pub fn on_bc(self, bc: &Blockchain<LDT>, strategy: &Strategy<DB, LDT>) -> Self {
158        Self {
159            swap_compose_channel_rx: Some(strategy.swap_compose_channel()),
160            swap_compose_channel_tx: Some(strategy.swap_compose_channel()),
161            account_nonce_balance: Some(bc.nonce_and_balance()),
162            tx_compose_channel_tx: Some(bc.tx_compose_channel()),
163            ..self
164        }
165    }
166}
167
168impl<DB, LDT> Actor for SwapRouterActor<DB, LDT>
169where
170    DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
171    LDT: KabuDataTypes,
172{
173    fn start(&self) -> ActorResult {
174        let task = tokio::task::spawn(swap_router_worker(
175            self.signers.clone().unwrap(),
176            self.account_nonce_balance.clone().unwrap(),
177            self.swap_compose_channel_rx.clone().unwrap(),
178            self.swap_compose_channel_tx.clone().unwrap(),
179            self.tx_compose_channel_tx.clone().unwrap(),
180        ));
181        Ok(vec![task])
182    }
183
184    fn name(&self) -> &'static str {
185        "SwapRouterActor"
186    }
187}