kabu_node_debug_provider/
httpcached.rs

1use std::collections::HashMap;
2use std::ops::RangeInclusive;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use alloy::primitives::{Address, U256};
8use alloy::rpc::types::trace::geth::GethDebugTracingCallOptions;
9use alloy::{
10    primitives::{BlockHash, BlockNumber, B128, B256, U128},
11    rpc::{
12        json_rpc::{Id, Request, RequestPacket, Response, ResponsePacket, ResponsePayload, SerializedRequest},
13        types::{trace::geth::GethDebugTracingOptions, Block, BlockNumberOrTag, TransactionRequest},
14    },
15    transports::{
16        http::{Http, ReqwestTransport},
17        TransportError, TransportErrorKind, TransportFut,
18    },
19};
20use eyre::{eyre, Result};
21use reqwest::Client;
22use serde_json::value::RawValue;
23use tokio::sync::RwLock;
24use tower::Service;
25use tracing::{debug, error, trace};
26use url::Url;
27
28use crate::cachefolder::CacheFolder;
29
30#[derive(Clone)]
31pub struct HttpCachedTransport {
32    client: Http<Client>,
33    block_number: Arc<AtomicU64>,
34    block_filters: Arc<RwLock<HashMap<U128, BlockNumber>>>,
35    block_hashes: Arc<RwLock<HashMap<BlockNumber, B256>>>,
36    cache_folder: Option<CacheFolder>,
37}
38
39impl HttpCachedTransport {
40    pub async fn new(url: Url, cache_path: Option<&str>) -> Self {
41        let client = ReqwestTransport::new(url);
42        let cache_folder = match cache_path {
43            Some(path) => Some(CacheFolder::new(path).await),
44            None => None,
45        };
46        Self {
47            client,
48            block_number: Arc::new(AtomicU64::new(0)),
49            block_filters: Arc::new(RwLock::new(HashMap::new())),
50            block_hashes: Arc::new(RwLock::new(HashMap::new())),
51            cache_folder,
52        }
53    }
54
55    pub fn set_block_number(&self, block_number: u64) -> u64 {
56        self.block_number.swap(block_number, Ordering::Relaxed)
57    }
58
59    fn convert_block_number(&self, number_or_tag: BlockNumberOrTag) -> Result<BlockNumberOrTag> {
60        let current_block = self.read_block_number();
61        match number_or_tag {
62            BlockNumberOrTag::Number(x) => {
63                if x > current_block {
64                    Err(eyre!("INCORRECT_BLOCK_NUMBER"))
65                } else {
66                    Ok(BlockNumberOrTag::Number(x))
67                }
68            }
69            BlockNumberOrTag::Earliest => Ok(BlockNumberOrTag::Earliest),
70            _ => Ok(BlockNumberOrTag::Number(current_block)),
71        }
72    }
73
74    fn convert_block_number_next(&self, number_or_tag: BlockNumberOrTag) -> Result<BlockNumberOrTag> {
75        let current_block = self.read_block_number();
76        match number_or_tag {
77            BlockNumberOrTag::Number(x) => {
78                if x > current_block {
79                    Err(eyre!("INCORRECT_BLOCK_NUMBER"))
80                } else {
81                    Ok(BlockNumberOrTag::Number(x))
82                }
83            }
84            BlockNumberOrTag::Earliest => Ok(BlockNumberOrTag::Earliest),
85            _ => Ok(BlockNumberOrTag::Number(current_block + 1)),
86        }
87    }
88
89    pub async fn read_cached(&self, method: String, params_hash: B256) -> Result<String> {
90        match &self.cache_folder {
91            Some(cf) => cf.read(method, params_hash).await,
92            None => Err(eyre!("NO_CACHE")),
93        }
94    }
95
96    pub async fn write_cached(&self, method: String, params_hash: B256, data: String) -> Result<()> {
97        match &self.cache_folder {
98            Some(cf) => cf.write(method, params_hash, data).await,
99            None => Err(eyre!("NO_CACHE")),
100        }
101    }
102
103    pub fn next_block_number(&self) -> BlockNumber {
104        self.block_number.fetch_add(1, Ordering::Relaxed)
105    }
106
107    pub async fn fetch_next_block(&self) -> Result<BlockNumber, TransportError> {
108        let next_block_number = self.read_block_number() + 1;
109
110        let new_req = Request::<(BlockNumberOrTag, bool)>::new(
111            "eth_getBlockByNumber",
112            Id::None,
113            (BlockNumberOrTag::Number(next_block_number), false),
114        );
115
116        let new_req: SerializedRequest = new_req.try_into().map_err(TransportError::SerError)?;
117
118        if let Ok(new_block_packet) = self.cached_or_execute(new_req).await {
119            trace!("fetch_next_block : {:?}", new_block_packet);
120            if let ResponsePacket::Single(new_block_response) = new_block_packet {
121                let response: Block = serde_json::from_str(new_block_response.payload.as_success().unwrap().get())
122                    .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
123                self.block_hashes.write().await.insert(next_block_number, response.header.hash);
124                self.set_block_number(next_block_number);
125            }
126        }
127
128        Ok(next_block_number)
129    }
130
131    pub fn read_block_number(&self) -> u64 {
132        self.block_number.load(Ordering::Relaxed)
133    }
134
135    pub async fn create_block_filter(&self) -> U128 {
136        let filter_id = B128::random();
137        let filter_id: U128 = filter_id.into();
138        self.block_filters.write().await.insert(filter_id, self.read_block_number());
139        filter_id
140    }
141
142    pub async fn get_block_number(self) -> Result<ResponsePacket, TransportError> {
143        let block_number = self.read_block_number();
144        let value = RawValue::from_string(format!("{block_number}").to_string()).unwrap();
145        let body = Response { id: Id::None, payload: ResponsePayload::Success(value) };
146        Ok(ResponsePacket::Single(body))
147    }
148    pub async fn new_block_filter(self) -> Result<ResponsePacket, TransportError> {
149        let filter_id = self.create_block_filter().await;
150        let value = format!("\"0x{filter_id:x}\"").to_string();
151        let value = RawValue::from_string(value).unwrap();
152        let body = Response { id: Id::None, payload: ResponsePayload::Success(value) };
153        Ok(ResponsePacket::Single(body))
154    }
155
156    pub async fn get_filter_changes(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
157        let raw_value: Vec<U128> = serde_json::from_str(req.params().unwrap().get())
158            .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
159        trace!("get_filter_changes req : {:?}", raw_value);
160        let mut block_filters_guard = self.block_filters.write().await;
161        let block_hashes_guard = self.block_hashes.read().await;
162        let current_block = self.read_block_number();
163        let mut missed_blocks: Vec<BlockHash> = Vec::new();
164
165        for filter_id in raw_value {
166            if let Some(filter_block) = block_filters_guard.get(&filter_id).cloned() {
167                if filter_block < current_block {
168                    block_filters_guard.insert(filter_id, current_block);
169                    let missed_block_range = RangeInclusive::new(filter_block + 1, current_block)
170                        .map(|block_number| block_hashes_guard.get(&block_number).cloned().unwrap_or_default())
171                        .collect();
172                    missed_blocks = missed_block_range;
173                    break;
174                }
175            }
176        }
177        let resp_string = serde_json::to_string(&missed_blocks).map_err(TransportError::SerError)?;
178
179        let new_resp = RawValue::from_string(resp_string).map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
180
181        trace!("get_filter_changes resp : {:?}", new_resp);
182
183        let body = Response { id: Id::None, payload: ResponsePayload::Success(new_resp) };
184        Ok(ResponsePacket::Single(body))
185    }
186
187    pub async fn cached_or_execute(&self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
188        let req_hash = req.params_hash();
189        let method = req.method().to_string();
190        match self.read_cached(method.clone(), req_hash).await {
191            Ok(cached) => {
192                let value = RawValue::from_string(cached).unwrap();
193                let body = Response { id: Id::None, payload: ResponsePayload::Success(value) };
194                Ok(ResponsePacket::Single(body))
195            }
196            Err(_) => {
197                let mut client = self.client.clone();
198                match client.call(RequestPacket::Single(req)).await {
199                    Ok(resp) => {
200                        if let ResponsePacket::Single(resp) = resp.clone() {
201                            if let Err(e) = self.write_cached(method, req_hash, resp.payload.as_success().unwrap().to_string()).await {
202                                error!("{}", e);
203                            }
204                        }
205                        Ok(resp)
206                    }
207                    Err(e) => {
208                        error!("client.call error {e}");
209                        Err(e)
210                    }
211                }
212            }
213        }
214    }
215
216    pub async fn eth_call(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
217        let request: (TransactionRequest, BlockNumberOrTag) = serde_json::from_str(req.params().unwrap().get())
218            .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
219        debug!("eth_call req : {:?}", request);
220
221        let new_req = Request::<(TransactionRequest, BlockNumberOrTag)>::new(
222            "eth_call",
223            req.id().clone(),
224            (request.0, self.convert_block_number(request.1).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?),
225        );
226        let new_req: SerializedRequest = new_req.try_into().unwrap();
227
228        //let resp = self.cached_or_execute(new_req.clone()).await;
229        //trace!("eth_call resp : {:?}", resp);
230        //resp
231        self.client.clone().call(RequestPacket::Single(new_req)).await
232    }
233
234    pub async fn eth_get_storage_at(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
235        let request: (Address, U256, BlockNumberOrTag) = serde_json::from_str(req.params().unwrap().get())
236            .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
237        debug!("eth_get_storage_at req : {:?}", request);
238
239        let new_req = Request::<(Address, U256, BlockNumberOrTag)>::new(
240            "eth_getStorageAt",
241            req.id().clone(),
242            (
243                request.0,
244                request.1,
245                self.convert_block_number(request.2).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?,
246            ),
247        );
248        debug!("eth_get_storage_at updated req : {:?}", new_req);
249
250        let new_req: SerializedRequest = new_req.try_into().unwrap();
251
252        let resp = self.client.clone().call(RequestPacket::Single(new_req)).await;
253        trace!("eth_get_storage_at resp : {:?}", resp);
254        resp
255    }
256
257    pub async fn eth_get_block_by_number(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
258        let request: (BlockNumberOrTag, bool) = serde_json::from_str(req.params().unwrap().get())
259            .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
260        debug!("get_block_by_number : {:?}", request);
261
262        let new_req = Request::<(BlockNumberOrTag, bool)>::new(
263            "eth_getBlockByNumber",
264            req.id().clone(),
265            (self.convert_block_number(request.0).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?, request.1),
266        );
267
268        let new_req: SerializedRequest = new_req.try_into().unwrap();
269
270        self.cached_or_execute(new_req.clone()).await
271    }
272
273    pub async fn eth_get_block_by_hash(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
274        debug!("get_block_by_hash req : {:?}", req);
275        self.cached_or_execute(req.clone()).await
276    }
277
278    pub async fn debug_trace_block_by_number(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
279        let request: (BlockNumberOrTag, GethDebugTracingOptions) = serde_json::from_str(req.params().unwrap().get())
280            .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
281        debug!("debug_trace_block_by_number : {:?}", request);
282
283        let new_req = Request::<(BlockNumberOrTag, GethDebugTracingOptions)>::new(
284            "debug_traceBlockByNumber",
285            req.id().clone(),
286            (self.convert_block_number_next(request.0).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?, request.1),
287        );
288
289        let new_req: SerializedRequest = new_req.try_into().unwrap();
290
291        self.cached_or_execute(new_req.clone()).await
292    }
293
294    pub async fn debug_trace_call(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
295        let request: (TransactionRequest, BlockNumberOrTag, GethDebugTracingCallOptions) =
296            serde_json::from_str(req.params().unwrap().get())
297                .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
298        debug!("eth_debug_trace_call req : {:?}", request);
299
300        let new_req = Request::<(TransactionRequest, BlockNumberOrTag, GethDebugTracingCallOptions)>::new(
301            "debug_traceCall",
302            req.id().clone(),
303            (
304                request.0,
305                self.convert_block_number_next(request.1).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?,
306                request.2,
307            ),
308        );
309        let new_req: SerializedRequest = new_req.try_into().unwrap();
310
311        // let resp = self.cached_or_execute(new_req.clone()).await;
312        // trace!("eth_call resp : {:?}", resp);
313        self.client.clone().call(RequestPacket::Single(new_req)).await
314    }
315
316    pub async fn debug_trace_block_by_hash(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
317        debug!("debug_trace_block_by_hash req : {:?}", req);
318        self.cached_or_execute(req.clone()).await
319    }
320
321    pub async fn eth_get_logs(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
322        debug!("eth_get_logs req  : {:?}", req);
323        self.cached_or_execute(req.clone()).await
324    }
325}
326
327impl Service<RequestPacket> for HttpCachedTransport {
328    type Response = ResponsePacket;
329    type Error = TransportError;
330    type Future = TransportFut<'static>;
331
332    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
333        self.client.poll_ready(cx)
334    }
335
336    fn call(&mut self, req: RequestPacket) -> Self::Future {
337        match req {
338            RequestPacket::Single(single_req) => {
339                trace!(
340                    "Singlereq id : {} method : {} meta : {:?} params :{:?}",
341                    single_req.id(),
342                    single_req.method(),
343                    single_req.meta(),
344                    single_req.params()
345                );
346
347                let mut self_clone = self.clone();
348                match single_req.method() {
349                    "eth_blockNumber" | "get_block_number" => Box::pin(self_clone.get_block_number()),
350                    "eth_newBlockFilter" => Box::pin(self_clone.new_block_filter()),
351                    "eth_getFilterChanges" => Box::pin(self_clone.get_filter_changes(single_req)),
352                    "eth_call" => Box::pin(self_clone.eth_call(single_req)),
353                    "eth_getStorageAt" => Box::pin(self_clone.eth_get_storage_at(single_req)),
354                    "eth_getLogs" => Box::pin(self_clone.eth_get_logs(single_req)),
355                    "eth_getBlockByNumber" => Box::pin(self_clone.eth_get_block_by_number(single_req)),
356                    "eth_getBlockByHash" => Box::pin(self_clone.eth_get_block_by_hash(single_req)),
357                    "debug_traceBlockByHash" => Box::pin(self_clone.debug_trace_block_by_hash(single_req)),
358                    "debug_traceBlockByNumber" => Box::pin(self_clone.debug_trace_block_by_number(single_req)),
359                    "debug_traceCall" => Box::pin(self_clone.debug_trace_call(single_req)),
360                    _ => Box::pin(async move {
361                        match self_clone.client.call(RequestPacket::Single(single_req)).await {
362                            Ok(response) => {
363                                match &response {
364                                    ResponsePacket::Single(single_resp) => {
365                                        trace!("responsepacket response : {:?} ", single_resp);
366                                        trace!(
367                                            "responsepacket payload id : {} len {}",
368                                            single_resp.id,
369                                            single_resp.payload.as_success().unwrap().get().len()
370                                        );
371                                    }
372                                    ResponsePacket::Batch(batch_resp) => {
373                                        error!("Batch response received {}", batch_resp.len())
374                                    }
375                                }
376                                Ok(response)
377                            }
378                            Err(e) => Err(e),
379                        }
380                    }),
381                }
382            }
383            _ => self.client.call(req),
384        }
385    }
386}
387
388#[cfg(test)]
389mod test {
390    use std::env;
391    use std::time::Duration;
392
393    use alloy::primitives::address;
394    use alloy::{
395        providers::{ext::DebugApi, Provider, ProviderBuilder},
396        rpc::{
397            client::{ClientBuilder, RpcClient},
398            types::{
399                trace::geth::{GethDebugBuiltInTracerType, GethDebugTracerType, GethDebugTracingOptions, PreStateConfig},
400                BlockNumberOrTag,
401            },
402        },
403    };
404    use eyre::Result;
405    use futures::StreamExt;
406    use tokio::select;
407    use tracing::debug;
408    use url::Url;
409
410    use crate::httpcached::HttpCachedTransport;
411
412    #[tokio::test]
413    async fn test_create_service() -> Result<()> {
414        let _ = env_logger::try_init_from_env(env_logger::Env::default().default_filter_or("info"));
415
416        dotenvy::from_filename(".env.test").ok();
417        let node_url = Url::parse(env::var("MAINNET_HTTP")?.as_str())?;
418
419        let transport = HttpCachedTransport::new(node_url, Some("./.cache")).await;
420
421        let client = RpcClient::new(transport.clone(), true);
422        let provider = ProviderBuilder::new().connect_client(client);
423
424        let block_number = provider.get_block_number().await?;
425        debug!("Hello, block {block_number}");
426        assert_eq!(block_number, 0);
427        transport.set_block_number(2000001);
428        let block_number = provider.get_block_number().await?;
429        debug!("Hello, block {block_number}");
430        assert_eq!(block_number, 2000001);
431
432        Ok(())
433    }
434
435    #[ignore]
436    #[tokio::test]
437    async fn test_get_block_number() -> Result<()> {
438        let _ = env_logger::try_init_from_env(env_logger::Env::default().default_filter_or("info,alloy_rpc_client=off,"));
439
440        dotenvy::from_filename(".env.test").ok();
441        let node_url = Url::parse(env::var("MAINNET_HTTP")?.as_str())?;
442
443        let transport = HttpCachedTransport::new(node_url, Some("./.cache")).await;
444        transport.set_block_number(20179184);
445
446        let client = ClientBuilder::default().transport(transport.clone(), true).with_poll_interval(Duration::from_millis(50));
447        let provider = ProviderBuilder::new().disable_recommended_fillers().connect_client(client);
448
449        let block_number = provider.get_block_number().await?;
450        debug!("block {block_number}");
451
452        let mut blocks_watcher = provider.watch_blocks().await?.into_stream();
453
454        let weth = kabu_defi_abi::IWETH::new(address!("c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"), provider.clone());
455
456        tokio::task::spawn(async move {
457            loop {
458                select! {
459                    block = blocks_watcher.next() => {
460                        if let Some(block_vec) = block {
461                            for block_hash in block_vec {
462                                debug!("Block : {:?}", block_hash);
463                            }
464                        }else{
465                            debug!("else block : {:?}", block);
466                            break;
467                        }
468                    }
469                }
470            }
471        });
472
473        let trace_opts = GethDebugTracingOptions::default()
474            .with_tracer(GethDebugTracerType::BuiltInTracer(GethDebugBuiltInTracerType::PreStateTracer))
475            .with_prestate_config(PreStateConfig { diff_mode: Some(true), ..Default::default() });
476
477        for i in 0..10 {
478            debug!("Set next block: {}", i);
479            tokio::time::sleep(Duration::from_millis(10)).await;
480
481            let total_supply = weth.totalSupply().call().await.unwrap();
482            debug!("Total supply : {}", total_supply);
483
484            let block_by_number = provider.get_block_by_number(BlockNumberOrTag::Latest).await?.unwrap();
485            let block_by_hash = provider.get_block_by_hash(block_by_number.header.hash).await?.unwrap();
486            assert_eq!(block_by_hash.header, block_by_number.header);
487
488            let block_number = block_by_number.header.number;
489
490            let trace_block_by_hash = provider.debug_trace_block_by_hash(block_by_number.header.hash, trace_opts.clone()).await?;
491            let trace_block_by_number =
492                provider.debug_trace_block_by_number(BlockNumberOrTag::Number(block_number), trace_opts.clone()).await?;
493            assert_eq!(trace_block_by_hash, trace_block_by_number);
494        }
495
496        Ok(())
497    }
498}