kabu_node_debug_provider/
archiveprovider.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4
5use crate::anvilprovider::convert_u64;
6use alloy::eips::BlockId;
7use alloy::primitives::{Address, StorageValue};
8use alloy::rpc::json_rpc::RpcRecv;
9use alloy::{
10    network::Ethereum,
11    primitives::{BlockNumber, Bytes, U256, U64},
12    providers::{EthCall, EthGetBlock, Network, Provider, ProviderCall, RootProvider, RpcWithBlock},
13    rpc::{
14        client::{NoParams, RpcCall},
15        json_rpc::{Id, Request},
16        types::{Block, BlockNumberOrTag, FilterChanges},
17    },
18    transports::TransportResult,
19};
20use tokio::sync::RwLock;
21use tracing::debug;
22
23#[derive(Clone)]
24pub struct ArchiveHistoryProvider<P> {
25    provider: P,
26    current_block: Arc<AtomicU64>,
27    new_block_filter: Arc<RwLock<HashMap<U256, u64>>>,
28}
29
30impl<P> ArchiveHistoryProvider<P> where P: Provider<Ethereum> + Send + Sync + Clone + 'static {}
31
32impl<P> ArchiveHistoryProvider<P>
33where
34    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
35{
36    pub fn block_number(&self) -> u64 {
37        self.current_block.load(Ordering::Relaxed)
38    }
39
40    pub fn block_id(&self) -> BlockId {
41        BlockId::Number(BlockNumberOrTag::Number(self.block_number()))
42    }
43}
44
45#[allow(dead_code)]
46impl<P> ArchiveHistoryProvider<P>
47where
48    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
49{
50    pub fn new(provider: P, start_block: u64) -> Self {
51        Self { provider, current_block: Arc::new(AtomicU64::new(start_block)), new_block_filter: Arc::new(RwLock::new(HashMap::new())) }
52    }
53
54    pub fn next_block(&self) -> u64 {
55        let previous_block = self.current_block.load(Ordering::Relaxed);
56        let block = self.current_block.fetch_add(1, Ordering::Relaxed);
57        println!("Change block {previous_block} -> {block} ");
58        block
59    }
60}
61
62#[async_trait::async_trait]
63impl<P> Provider<Ethereum> for ArchiveHistoryProvider<P>
64where
65    P: Provider<Ethereum> + Send + Sync + Clone + 'static,
66{
67    fn root(&self) -> &RootProvider<Ethereum> {
68        self.provider.root()
69    }
70
71    #[allow(clippy::type_complexity)]
72    fn get_block_number(&self) -> ProviderCall<NoParams, U64, BlockNumber> {
73        let provider_call = ProviderCall::RpcCall(
74            RpcCall::new(Request::new("get_block_number", Id::None, [(); 0]), self.provider.client().transport().clone())
75                .map_resp(convert_u64 as fn(U64) -> u64),
76        );
77        provider_call
78    }
79
80    fn call(&self, tx: <Ethereum as Network>::TransactionRequest) -> EthCall<Ethereum, Bytes> {
81        let call = EthCall::new(self.weak_client(), "eth_call", tx).block(self.block_id());
82        debug!("call {:?}", self.block_id());
83        call
84    }
85
86    fn get_storage_at(&self, address: Address, key: U256) -> RpcWithBlock<(Address, U256), StorageValue> {
87        debug!("get_storage_at {:?}", self.block_id());
88        let rpc_call = RpcWithBlock::from(self.provider.client().request("eth_getStorageAt", (address, key)));
89        rpc_call.block_id(self.block_id())
90    }
91
92    fn get_block_by_number(&self, number: BlockNumberOrTag) -> EthGetBlock<Block>
93where {
94        self.provider.get_block_by_number(number)
95    }
96
97    async fn get_filter_changes<R: RpcRecv>(&self, id: U256) -> TransportResult<Vec<R>> {
98        println!("get_filter_changes");
99
100        self.provider.get_filter_changes(id).await
101    }
102
103    async fn get_filter_changes_dyn(&self, id: U256) -> TransportResult<FilterChanges> {
104        println!("get_filter_changes_dyn");
105
106        self.provider.get_filter_changes_dyn(id).await
107    }
108
109    async fn new_block_filter(&self) -> TransportResult<U256> {
110        let result = self.provider.new_block_filter().await;
111        let cur_block = self.block_number();
112        if let Ok(filter_id) = &result {
113            self.new_block_filter.write().await.insert(*filter_id, cur_block);
114        }
115        result
116    }
117}
118
119#[cfg(test)]
120mod test {}