1use std::collections::HashMap;
2use std::ops::RangeInclusive;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use alloy::primitives::{Address, U256};
8use alloy::rpc::types::trace::geth::GethDebugTracingCallOptions;
9use alloy::{
10 primitives::{BlockHash, BlockNumber, B128, B256, U128},
11 rpc::{
12 json_rpc::{Id, Request, RequestPacket, Response, ResponsePacket, ResponsePayload, SerializedRequest},
13 types::{trace::geth::GethDebugTracingOptions, Block, BlockNumberOrTag, TransactionRequest},
14 },
15 transports::{
16 http::{Http, ReqwestTransport},
17 TransportError, TransportErrorKind, TransportFut,
18 },
19};
20use eyre::{eyre, Result};
21use reqwest::Client;
22use serde_json::value::RawValue;
23use tokio::sync::RwLock;
24use tower::Service;
25use tracing::{debug, error, trace};
26use url::Url;
27
28use crate::cachefolder::CacheFolder;
29
30#[derive(Clone)]
31pub struct HttpCachedTransport {
32 client: Http<Client>,
33 block_number: Arc<AtomicU64>,
34 block_filters: Arc<RwLock<HashMap<U128, BlockNumber>>>,
35 block_hashes: Arc<RwLock<HashMap<BlockNumber, B256>>>,
36 cache_folder: Option<CacheFolder>,
37}
38
39impl HttpCachedTransport {
40 pub async fn new(url: Url, cache_path: Option<&str>) -> Self {
41 let client = ReqwestTransport::new(url);
42 let cache_folder = match cache_path {
43 Some(path) => Some(CacheFolder::new(path).await),
44 None => None,
45 };
46 Self {
47 client,
48 block_number: Arc::new(AtomicU64::new(0)),
49 block_filters: Arc::new(RwLock::new(HashMap::new())),
50 block_hashes: Arc::new(RwLock::new(HashMap::new())),
51 cache_folder,
52 }
53 }
54
55 pub fn set_block_number(&self, block_number: u64) -> u64 {
56 self.block_number.swap(block_number, Ordering::Relaxed)
57 }
58
59 fn convert_block_number(&self, number_or_tag: BlockNumberOrTag) -> Result<BlockNumberOrTag> {
60 let current_block = self.read_block_number();
61 match number_or_tag {
62 BlockNumberOrTag::Number(x) => {
63 if x > current_block {
64 Err(eyre!("INCORRECT_BLOCK_NUMBER"))
65 } else {
66 Ok(BlockNumberOrTag::Number(x))
67 }
68 }
69 BlockNumberOrTag::Earliest => Ok(BlockNumberOrTag::Earliest),
70 _ => Ok(BlockNumberOrTag::Number(current_block)),
71 }
72 }
73
74 fn convert_block_number_next(&self, number_or_tag: BlockNumberOrTag) -> Result<BlockNumberOrTag> {
75 let current_block = self.read_block_number();
76 match number_or_tag {
77 BlockNumberOrTag::Number(x) => {
78 if x > current_block {
79 Err(eyre!("INCORRECT_BLOCK_NUMBER"))
80 } else {
81 Ok(BlockNumberOrTag::Number(x))
82 }
83 }
84 BlockNumberOrTag::Earliest => Ok(BlockNumberOrTag::Earliest),
85 _ => Ok(BlockNumberOrTag::Number(current_block + 1)),
86 }
87 }
88
89 pub async fn read_cached(&self, method: String, params_hash: B256) -> Result<String> {
90 match &self.cache_folder {
91 Some(cf) => cf.read(method, params_hash).await,
92 None => Err(eyre!("NO_CACHE")),
93 }
94 }
95
96 pub async fn write_cached(&self, method: String, params_hash: B256, data: String) -> Result<()> {
97 match &self.cache_folder {
98 Some(cf) => cf.write(method, params_hash, data).await,
99 None => Err(eyre!("NO_CACHE")),
100 }
101 }
102
103 pub fn next_block_number(&self) -> BlockNumber {
104 self.block_number.fetch_add(1, Ordering::Relaxed)
105 }
106
107 pub async fn fetch_next_block(&self) -> Result<BlockNumber, TransportError> {
108 let next_block_number = self.read_block_number() + 1;
109
110 let new_req = Request::<(BlockNumberOrTag, bool)>::new(
111 "eth_getBlockByNumber",
112 Id::None,
113 (BlockNumberOrTag::Number(next_block_number), false),
114 );
115
116 let new_req: SerializedRequest = new_req.try_into().map_err(TransportError::SerError)?;
117
118 if let Ok(new_block_packet) = self.cached_or_execute(new_req).await {
119 trace!("fetch_next_block : {:?}", new_block_packet);
120 if let ResponsePacket::Single(new_block_response) = new_block_packet {
121 let response: Block = serde_json::from_str(new_block_response.payload.as_success().unwrap().get())
122 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
123 self.block_hashes.write().await.insert(next_block_number, response.header.hash);
124 self.set_block_number(next_block_number);
125 }
126 }
127
128 Ok(next_block_number)
129 }
130
131 pub fn read_block_number(&self) -> u64 {
132 self.block_number.load(Ordering::Relaxed)
133 }
134
135 pub async fn create_block_filter(&self) -> U128 {
136 let filter_id = B128::random();
137 let filter_id: U128 = filter_id.into();
138 self.block_filters.write().await.insert(filter_id, self.read_block_number());
139 filter_id
140 }
141
142 pub async fn get_block_number(self) -> Result<ResponsePacket, TransportError> {
143 let block_number = self.read_block_number();
144 let value = RawValue::from_string(format!("{block_number}").to_string()).unwrap();
145 let body = Response { id: Id::None, payload: ResponsePayload::Success(value) };
146 Ok(ResponsePacket::Single(body))
147 }
148 pub async fn new_block_filter(self) -> Result<ResponsePacket, TransportError> {
149 let filter_id = self.create_block_filter().await;
150 let value = format!("\"0x{filter_id:x}\"").to_string();
151 let value = RawValue::from_string(value).unwrap();
152 let body = Response { id: Id::None, payload: ResponsePayload::Success(value) };
153 Ok(ResponsePacket::Single(body))
154 }
155
156 pub async fn get_filter_changes(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
157 let raw_value: Vec<U128> = serde_json::from_str(req.params().unwrap().get())
158 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
159 trace!("get_filter_changes req : {:?}", raw_value);
160 let mut block_filters_guard = self.block_filters.write().await;
161 let block_hashes_guard = self.block_hashes.read().await;
162 let current_block = self.read_block_number();
163 let mut missed_blocks: Vec<BlockHash> = Vec::new();
164
165 for filter_id in raw_value {
166 if let Some(filter_block) = block_filters_guard.get(&filter_id).cloned() {
167 if filter_block < current_block {
168 block_filters_guard.insert(filter_id, current_block);
169 let missed_block_range = RangeInclusive::new(filter_block + 1, current_block)
170 .map(|block_number| block_hashes_guard.get(&block_number).cloned().unwrap_or_default())
171 .collect();
172 missed_blocks = missed_block_range;
173 break;
174 }
175 }
176 }
177 let resp_string = serde_json::to_string(&missed_blocks).map_err(TransportError::SerError)?;
178
179 let new_resp = RawValue::from_string(resp_string).map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
180
181 trace!("get_filter_changes resp : {:?}", new_resp);
182
183 let body = Response { id: Id::None, payload: ResponsePayload::Success(new_resp) };
184 Ok(ResponsePacket::Single(body))
185 }
186
187 pub async fn cached_or_execute(&self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
188 let req_hash = req.params_hash();
189 let method = req.method().to_string();
190 match self.read_cached(method.clone(), req_hash).await {
191 Ok(cached) => {
192 let value = RawValue::from_string(cached).unwrap();
193 let body = Response { id: Id::None, payload: ResponsePayload::Success(value) };
194 Ok(ResponsePacket::Single(body))
195 }
196 Err(_) => {
197 let mut client = self.client.clone();
198 match client.call(RequestPacket::Single(req)).await {
199 Ok(resp) => {
200 if let ResponsePacket::Single(resp) = resp.clone() {
201 if let Err(e) = self.write_cached(method, req_hash, resp.payload.as_success().unwrap().to_string()).await {
202 error!("{}", e);
203 }
204 }
205 Ok(resp)
206 }
207 Err(e) => {
208 error!("client.call error {e}");
209 Err(e)
210 }
211 }
212 }
213 }
214 }
215
216 pub async fn eth_call(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
217 let request: (TransactionRequest, BlockNumberOrTag) = serde_json::from_str(req.params().unwrap().get())
218 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
219 debug!("eth_call req : {:?}", request);
220
221 let new_req = Request::<(TransactionRequest, BlockNumberOrTag)>::new(
222 "eth_call",
223 req.id().clone(),
224 (request.0, self.convert_block_number(request.1).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?),
225 );
226 let new_req: SerializedRequest = new_req.try_into().unwrap();
227
228 self.client.clone().call(RequestPacket::Single(new_req)).await
232 }
233
234 pub async fn eth_get_storage_at(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
235 let request: (Address, U256, BlockNumberOrTag) = serde_json::from_str(req.params().unwrap().get())
236 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
237 debug!("eth_get_storage_at req : {:?}", request);
238
239 let new_req = Request::<(Address, U256, BlockNumberOrTag)>::new(
240 "eth_getStorageAt",
241 req.id().clone(),
242 (
243 request.0,
244 request.1,
245 self.convert_block_number(request.2).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?,
246 ),
247 );
248 debug!("eth_get_storage_at updated req : {:?}", new_req);
249
250 let new_req: SerializedRequest = new_req.try_into().unwrap();
251
252 let resp = self.client.clone().call(RequestPacket::Single(new_req)).await;
253 trace!("eth_get_storage_at resp : {:?}", resp);
254 resp
255 }
256
257 pub async fn eth_get_block_by_number(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
258 let request: (BlockNumberOrTag, bool) = serde_json::from_str(req.params().unwrap().get())
259 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
260 debug!("get_block_by_number : {:?}", request);
261
262 let new_req = Request::<(BlockNumberOrTag, bool)>::new(
263 "eth_getBlockByNumber",
264 req.id().clone(),
265 (self.convert_block_number(request.0).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?, request.1),
266 );
267
268 let new_req: SerializedRequest = new_req.try_into().unwrap();
269
270 self.cached_or_execute(new_req.clone()).await
271 }
272
273 pub async fn eth_get_block_by_hash(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
274 debug!("get_block_by_hash req : {:?}", req);
275 self.cached_or_execute(req.clone()).await
276 }
277
278 pub async fn debug_trace_block_by_number(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
279 let request: (BlockNumberOrTag, GethDebugTracingOptions) = serde_json::from_str(req.params().unwrap().get())
280 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
281 debug!("debug_trace_block_by_number : {:?}", request);
282
283 let new_req = Request::<(BlockNumberOrTag, GethDebugTracingOptions)>::new(
284 "debug_traceBlockByNumber",
285 req.id().clone(),
286 (self.convert_block_number_next(request.0).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?, request.1),
287 );
288
289 let new_req: SerializedRequest = new_req.try_into().unwrap();
290
291 self.cached_or_execute(new_req.clone()).await
292 }
293
294 pub async fn debug_trace_call(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
295 let request: (TransactionRequest, BlockNumberOrTag, GethDebugTracingCallOptions) =
296 serde_json::from_str(req.params().unwrap().get())
297 .map_err(|e| TransportError::DeserError { err: e, text: "err".to_string() })?;
298 debug!("eth_debug_trace_call req : {:?}", request);
299
300 let new_req = Request::<(TransactionRequest, BlockNumberOrTag, GethDebugTracingCallOptions)>::new(
301 "debug_traceCall",
302 req.id().clone(),
303 (
304 request.0,
305 self.convert_block_number_next(request.1).map_err(|e| TransportErrorKind::custom_str(e.to_string().as_str()))?,
306 request.2,
307 ),
308 );
309 let new_req: SerializedRequest = new_req.try_into().unwrap();
310
311 self.client.clone().call(RequestPacket::Single(new_req)).await
314 }
315
316 pub async fn debug_trace_block_by_hash(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
317 debug!("debug_trace_block_by_hash req : {:?}", req);
318 self.cached_or_execute(req.clone()).await
319 }
320
321 pub async fn eth_get_logs(self, req: SerializedRequest) -> Result<ResponsePacket, TransportError> {
322 debug!("eth_get_logs req : {:?}", req);
323 self.cached_or_execute(req.clone()).await
324 }
325}
326
327impl Service<RequestPacket> for HttpCachedTransport {
328 type Response = ResponsePacket;
329 type Error = TransportError;
330 type Future = TransportFut<'static>;
331
332 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
333 self.client.poll_ready(cx)
334 }
335
336 fn call(&mut self, req: RequestPacket) -> Self::Future {
337 match req {
338 RequestPacket::Single(single_req) => {
339 trace!(
340 "Singlereq id : {} method : {} meta : {:?} params :{:?}",
341 single_req.id(),
342 single_req.method(),
343 single_req.meta(),
344 single_req.params()
345 );
346
347 let mut self_clone = self.clone();
348 match single_req.method() {
349 "eth_blockNumber" | "get_block_number" => Box::pin(self_clone.get_block_number()),
350 "eth_newBlockFilter" => Box::pin(self_clone.new_block_filter()),
351 "eth_getFilterChanges" => Box::pin(self_clone.get_filter_changes(single_req)),
352 "eth_call" => Box::pin(self_clone.eth_call(single_req)),
353 "eth_getStorageAt" => Box::pin(self_clone.eth_get_storage_at(single_req)),
354 "eth_getLogs" => Box::pin(self_clone.eth_get_logs(single_req)),
355 "eth_getBlockByNumber" => Box::pin(self_clone.eth_get_block_by_number(single_req)),
356 "eth_getBlockByHash" => Box::pin(self_clone.eth_get_block_by_hash(single_req)),
357 "debug_traceBlockByHash" => Box::pin(self_clone.debug_trace_block_by_hash(single_req)),
358 "debug_traceBlockByNumber" => Box::pin(self_clone.debug_trace_block_by_number(single_req)),
359 "debug_traceCall" => Box::pin(self_clone.debug_trace_call(single_req)),
360 _ => Box::pin(async move {
361 match self_clone.client.call(RequestPacket::Single(single_req)).await {
362 Ok(response) => {
363 match &response {
364 ResponsePacket::Single(single_resp) => {
365 trace!("responsepacket response : {:?} ", single_resp);
366 trace!(
367 "responsepacket payload id : {} len {}",
368 single_resp.id,
369 single_resp.payload.as_success().unwrap().get().len()
370 );
371 }
372 ResponsePacket::Batch(batch_resp) => {
373 error!("Batch response received {}", batch_resp.len())
374 }
375 }
376 Ok(response)
377 }
378 Err(e) => Err(e),
379 }
380 }),
381 }
382 }
383 _ => self.client.call(req),
384 }
385 }
386}
387
388#[cfg(test)]
389mod test {
390 use std::env;
391 use std::time::Duration;
392
393 use alloy::primitives::address;
394 use alloy::{
395 providers::{ext::DebugApi, Provider, ProviderBuilder},
396 rpc::{
397 client::{ClientBuilder, RpcClient},
398 types::{
399 trace::geth::{GethDebugBuiltInTracerType, GethDebugTracerType, GethDebugTracingOptions, PreStateConfig},
400 BlockNumberOrTag,
401 },
402 },
403 };
404 use eyre::Result;
405 use futures::StreamExt;
406 use tokio::select;
407 use tracing::debug;
408 use url::Url;
409
410 use crate::httpcached::HttpCachedTransport;
411
412 #[tokio::test]
413 async fn test_create_service() -> Result<()> {
414 let _ = env_logger::try_init_from_env(env_logger::Env::default().default_filter_or("info"));
415
416 dotenvy::from_filename(".env.test").ok();
417 let node_url = Url::parse(env::var("MAINNET_HTTP")?.as_str())?;
418
419 let transport = HttpCachedTransport::new(node_url, Some("./.cache")).await;
420
421 let client = RpcClient::new(transport.clone(), true);
422 let provider = ProviderBuilder::new().connect_client(client);
423
424 let block_number = provider.get_block_number().await?;
425 debug!("Hello, block {block_number}");
426 assert_eq!(block_number, 0);
427 transport.set_block_number(2000001);
428 let block_number = provider.get_block_number().await?;
429 debug!("Hello, block {block_number}");
430 assert_eq!(block_number, 2000001);
431
432 Ok(())
433 }
434
435 #[ignore]
436 #[tokio::test]
437 async fn test_get_block_number() -> Result<()> {
438 let _ = env_logger::try_init_from_env(env_logger::Env::default().default_filter_or("info,alloy_rpc_client=off,"));
439
440 dotenvy::from_filename(".env.test").ok();
441 let node_url = Url::parse(env::var("MAINNET_HTTP")?.as_str())?;
442
443 let transport = HttpCachedTransport::new(node_url, Some("./.cache")).await;
444 transport.set_block_number(20179184);
445
446 let client = ClientBuilder::default().transport(transport.clone(), true).with_poll_interval(Duration::from_millis(50));
447 let provider = ProviderBuilder::new().disable_recommended_fillers().connect_client(client);
448
449 let block_number = provider.get_block_number().await?;
450 debug!("block {block_number}");
451
452 let mut blocks_watcher = provider.watch_blocks().await?.into_stream();
453
454 let weth = kabu_defi_abi::IWETH::new(address!("c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"), provider.clone());
455
456 tokio::task::spawn(async move {
457 loop {
458 select! {
459 block = blocks_watcher.next() => {
460 if let Some(block_vec) = block {
461 for block_hash in block_vec {
462 debug!("Block : {:?}", block_hash);
463 }
464 }else{
465 debug!("else block : {:?}", block);
466 break;
467 }
468 }
469 }
470 }
471 });
472
473 let trace_opts = GethDebugTracingOptions::default()
474 .with_tracer(GethDebugTracerType::BuiltInTracer(GethDebugBuiltInTracerType::PreStateTracer))
475 .with_prestate_config(PreStateConfig { diff_mode: Some(true), ..Default::default() });
476
477 for i in 0..10 {
478 debug!("Set next block: {}", i);
479 tokio::time::sleep(Duration::from_millis(10)).await;
480
481 let total_supply = weth.totalSupply().call().await.unwrap();
482 debug!("Total supply : {}", total_supply);
483
484 let block_by_number = provider.get_block_by_number(BlockNumberOrTag::Latest).await?.unwrap();
485 let block_by_hash = provider.get_block_by_hash(block_by_number.header.hash).await?.unwrap();
486 assert_eq!(block_by_hash.header, block_by_number.header);
487
488 let block_number = block_by_number.header.number;
489
490 let trace_block_by_hash = provider.debug_trace_block_by_hash(block_by_number.header.hash, trace_opts.clone()).await?;
491 let trace_block_by_number =
492 provider.debug_trace_block_by_number(BlockNumberOrTag::Number(block_number), trace_opts.clone()).await?;
493 assert_eq!(trace_block_by_hash, trace_block_by_number);
494 }
495
496 Ok(())
497 }
498}