1use eyre::{eyre, Result};
2use kabu_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Producer, SharedState, WorkerResult};
3use kabu_core_actors_macros::{Accessor, Consumer, Producer};
4use kabu_core_blockchain::{Blockchain, Strategy};
5use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
6use kabu_types_entities::{AccountNonceAndBalanceState, TxSigners};
7use kabu_types_events::{MessageSwapCompose, MessageTxCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
8use revm::DatabaseRef;
9use tokio::sync::broadcast::error::RecvError;
10use tokio::sync::broadcast::Receiver;
11use tracing::{debug, error, info};
12
13async fn router_task_prepare<DB, LDT>(
15 route_request: SwapComposeData<DB, LDT>,
16 compose_channel_tx: Broadcaster<MessageSwapCompose<DB, LDT>>,
17 signers: SharedState<TxSigners<LDT>>,
18 account_monitor: SharedState<AccountNonceAndBalanceState>,
19) -> Result<()>
20where
21 DB: DatabaseRef + Send + Sync + Clone + 'static,
22 LDT: KabuDataTypes,
23{
24 debug!("router_task_prepare started {}", route_request.swap);
25
26 let signer = match route_request.tx_compose.eoa {
27 Some(eoa) => signers.read().await.get_signer_by_address(&eoa)?,
28 None => signers.read().await.get_random_signer().ok_or(eyre!("NO_SIGNER"))?,
29 };
30
31 let nonce = account_monitor.read().await.get_account(&signer.address()).unwrap().get_nonce();
32 let eth_balance = account_monitor.read().await.get_account(&signer.address()).unwrap().get_eth_balance();
33
34 if route_request.tx_compose.next_block_base_fee == 0 {
35 error!("Block base fee is not set");
36 return Err(eyre!("NO_BLOCK_GAS_FEE"));
37 }
38
39 let gas = (route_request.swap.pre_estimate_gas()) * 2;
40
41 let estimate_request = SwapComposeData::<DB, LDT> {
42 tx_compose: TxComposeData::<LDT> { signer: Some(signer), nonce, eth_balance, gas, ..route_request.tx_compose },
43 ..route_request
44 };
45 let estimate_request = MessageSwapCompose::estimate(estimate_request);
46
47 match compose_channel_tx.send(estimate_request) {
48 Err(_) => {
49 error!("compose_channel_tx.send(estimate_request)");
50 Err(eyre!("ERROR_SENDING_REQUEST"))
51 }
52 Ok(_) => Ok(()),
53 }
54}
55
56async fn router_task_broadcast<DB: DatabaseRef + Send + Sync + Clone + 'static, LDT: KabuDataTypes>(
57 route_request: SwapComposeData<DB, LDT>,
58 tx_compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
59) -> Result<()> {
60 debug!("router_task_broadcast started {}", route_request.swap);
61
62 let tx_compose = TxComposeData { swap: Some(route_request.swap), tips: route_request.tips, ..route_request.tx_compose };
63
64 match tx_compose_channel_tx.send(MessageTxCompose::sign(tx_compose)) {
65 Err(_) => {
66 error!("compose_channel_tx.send(estimate_request)");
67 Err(eyre!("ERROR_SENDING_REQUEST"))
68 }
69 Ok(_) => Ok(()),
70 }
71}
72
73async fn swap_router_worker<DB: DatabaseRef + Clone + Send + Sync + 'static, LDT: KabuDataTypes>(
74 signers: SharedState<TxSigners<LDT>>,
75 account_monitor: SharedState<AccountNonceAndBalanceState>,
76 swap_compose_channel_rx: Broadcaster<MessageSwapCompose<DB, LDT>>,
77 swap_compose_channel_tx: Broadcaster<MessageSwapCompose<DB, LDT>>,
78 tx_compose_channel_tx: Broadcaster<MessageTxCompose<LDT>>,
79) -> WorkerResult {
80 let mut compose_channel_rx: Receiver<MessageSwapCompose<DB, LDT>> = swap_compose_channel_rx.subscribe();
81
82 info!("swap router worker started");
83
84 loop {
85 tokio::select! {
86 msg = compose_channel_rx.recv() => {
87 let msg : Result<MessageSwapCompose<DB, LDT>, RecvError> = msg;
88 match msg {
89 Ok(compose_request) => {
90 match compose_request.inner {
91 SwapComposeMessage::Prepare(swap_compose_request)=>{
92 debug!("MessageSwapComposeRequest::Prepare received. stuffing: {:?} swap: {}", swap_compose_request.tx_compose.stuffing_txs_hashes, swap_compose_request.swap);
93 tokio::task::spawn(
94 router_task_prepare(
95 swap_compose_request,
96 swap_compose_channel_tx.clone(),
97 signers.clone(),
98 account_monitor.clone(),
99 )
100 );
101 }
102 SwapComposeMessage::Ready(swap_compose_request)=>{
103 debug!("MessageSwapComposeRequest::Ready received. stuffing: {:?} swap: {}", swap_compose_request.tx_compose.stuffing_txs_hashes, swap_compose_request.swap);
104 tokio::task::spawn(
105 router_task_broadcast(
106 swap_compose_request,
107 tx_compose_channel_tx.clone(),
108 )
109 );
110 }
111 _=>{
112 error!("Unexpected message type received in swap router worker");
113 }
114
115 }
116 }
117 Err(e)=>{error!("compose_channel_rx {}",e)}
118 }
119 }
120 }
121 }
122}
123
124#[derive(Consumer, Producer, Accessor, Default)]
125pub struct SwapRouterActor<DB: Send + Sync + Clone + 'static, LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
126 #[accessor]
127 signers: Option<SharedState<TxSigners<LDT>>>,
128 #[accessor]
129 account_nonce_balance: Option<SharedState<AccountNonceAndBalanceState>>,
130 #[consumer]
131 swap_compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
132 #[producer]
133 swap_compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB, LDT>>>,
134 #[producer]
135 tx_compose_channel_tx: Option<Broadcaster<MessageTxCompose<LDT>>>,
136}
137
138impl<DB, LDT> SwapRouterActor<DB, LDT>
139where
140 DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
141 LDT: KabuDataTypes,
142{
143 pub fn new() -> SwapRouterActor<DB, LDT> {
144 SwapRouterActor {
145 signers: None,
146 account_nonce_balance: None,
147 swap_compose_channel_rx: None,
148 swap_compose_channel_tx: None,
149 tx_compose_channel_tx: None,
150 }
151 }
152
153 pub fn with_signers(self, signers: SharedState<TxSigners<LDT>>) -> Self {
154 Self { signers: Some(signers), ..self }
155 }
156
157 pub fn on_bc(self, bc: &Blockchain<LDT>, strategy: &Strategy<DB, LDT>) -> Self {
158 Self {
159 swap_compose_channel_rx: Some(strategy.swap_compose_channel()),
160 swap_compose_channel_tx: Some(strategy.swap_compose_channel()),
161 account_nonce_balance: Some(bc.nonce_and_balance()),
162 tx_compose_channel_tx: Some(bc.tx_compose_channel()),
163 ..self
164 }
165 }
166}
167
168impl<DB, LDT> Actor for SwapRouterActor<DB, LDT>
169where
170 DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
171 LDT: KabuDataTypes,
172{
173 fn start(&self) -> ActorResult {
174 let task = tokio::task::spawn(swap_router_worker(
175 self.signers.clone().unwrap(),
176 self.account_nonce_balance.clone().unwrap(),
177 self.swap_compose_channel_rx.clone().unwrap(),
178 self.swap_compose_channel_tx.clone().unwrap(),
179 self.tx_compose_channel_tx.clone().unwrap(),
180 ));
181 Ok(vec![task])
182 }
183
184 fn name(&self) -> &'static str {
185 "SwapRouterActor"
186 }
187}