kabu_node_debug_provider/
archiveprovider.rs1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4
5use crate::anvilprovider::convert_u64;
6use alloy::eips::BlockId;
7use alloy::primitives::{Address, StorageValue};
8use alloy::rpc::json_rpc::RpcRecv;
9use alloy::{
10 network::Ethereum,
11 primitives::{BlockNumber, Bytes, U256, U64},
12 providers::{EthCall, EthGetBlock, Network, Provider, ProviderCall, RootProvider, RpcWithBlock},
13 rpc::{
14 client::{NoParams, RpcCall},
15 json_rpc::{Id, Request},
16 types::{Block, BlockNumberOrTag, FilterChanges},
17 },
18 transports::TransportResult,
19};
20use tokio::sync::RwLock;
21use tracing::debug;
22
23#[derive(Clone)]
24pub struct ArchiveHistoryProvider<P> {
25 provider: P,
26 current_block: Arc<AtomicU64>,
27 new_block_filter: Arc<RwLock<HashMap<U256, u64>>>,
28}
29
30impl<P> ArchiveHistoryProvider<P> where P: Provider<Ethereum> + Send + Sync + Clone + 'static {}
31
32impl<P> ArchiveHistoryProvider<P>
33where
34 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
35{
36 pub fn block_number(&self) -> u64 {
37 self.current_block.load(Ordering::Relaxed)
38 }
39
40 pub fn block_id(&self) -> BlockId {
41 BlockId::Number(BlockNumberOrTag::Number(self.block_number()))
42 }
43}
44
45#[allow(dead_code)]
46impl<P> ArchiveHistoryProvider<P>
47where
48 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
49{
50 pub fn new(provider: P, start_block: u64) -> Self {
51 Self { provider, current_block: Arc::new(AtomicU64::new(start_block)), new_block_filter: Arc::new(RwLock::new(HashMap::new())) }
52 }
53
54 pub fn next_block(&self) -> u64 {
55 let previous_block = self.current_block.load(Ordering::Relaxed);
56 let block = self.current_block.fetch_add(1, Ordering::Relaxed);
57 println!("Change block {previous_block} -> {block} ");
58 block
59 }
60}
61
62#[async_trait::async_trait]
63impl<P> Provider<Ethereum> for ArchiveHistoryProvider<P>
64where
65 P: Provider<Ethereum> + Send + Sync + Clone + 'static,
66{
67 fn root(&self) -> &RootProvider<Ethereum> {
68 self.provider.root()
69 }
70
71 #[allow(clippy::type_complexity)]
72 fn get_block_number(&self) -> ProviderCall<NoParams, U64, BlockNumber> {
73 let provider_call = ProviderCall::RpcCall(
74 RpcCall::new(Request::new("get_block_number", Id::None, [(); 0]), self.provider.client().transport().clone())
75 .map_resp(convert_u64 as fn(U64) -> u64),
76 );
77 provider_call
78 }
79
80 fn call(&self, tx: <Ethereum as Network>::TransactionRequest) -> EthCall<Ethereum, Bytes> {
81 let call = EthCall::new(self.weak_client(), "eth_call", tx).block(self.block_id());
82 debug!("call {:?}", self.block_id());
83 call
84 }
85
86 fn get_storage_at(&self, address: Address, key: U256) -> RpcWithBlock<(Address, U256), StorageValue> {
87 debug!("get_storage_at {:?}", self.block_id());
88 let rpc_call = RpcWithBlock::from(self.provider.client().request("eth_getStorageAt", (address, key)));
89 rpc_call.block_id(self.block_id())
90 }
91
92 fn get_block_by_number(&self, number: BlockNumberOrTag) -> EthGetBlock<Block>
93where {
94 self.provider.get_block_by_number(number)
95 }
96
97 async fn get_filter_changes<R: RpcRecv>(&self, id: U256) -> TransportResult<Vec<R>> {
98 println!("get_filter_changes");
99
100 self.provider.get_filter_changes(id).await
101 }
102
103 async fn get_filter_changes_dyn(&self, id: U256) -> TransportResult<FilterChanges> {
104 println!("get_filter_changes_dyn");
105
106 self.provider.get_filter_changes_dyn(id).await
107 }
108
109 async fn new_block_filter(&self) -> TransportResult<U256> {
110 let result = self.provider.new_block_filter().await;
111 let cur_block = self.block_number();
112 if let Ok(filter_id) = &result {
113 self.new_block_filter.write().await.insert(*filter_id, cur_block);
114 }
115 result
116 }
117}
118
119#[cfg(test)]
120mod test {}