kabu_core_blockchain/
blockchain.rs

1use crate::blockchain_tokens::add_default_tokens_to_market;
2use alloy::primitives::BlockHash;
3use alloy::primitives::ChainId;
4use influxdb::WriteQuery;
5use kabu_core_actors::{Broadcaster, SharedState};
6use kabu_types_blockchain::{ChainParameters, Mempool};
7use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
8use kabu_types_entities::{AccountNonceAndBalanceState, LatestBlock, Market};
9use kabu_types_events::{
10    LoomTask, MarketEvents, MempoolEvents, MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageHealthEvent,
11    MessageMempoolDataUpdate, MessageTxCompose,
12};
13use tracing::error;
14
15#[derive(Clone)]
16pub struct Blockchain<LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
17    chain_id: ChainId,
18    chain_parameters: ChainParameters,
19    market: SharedState<Market>,
20    latest_block: SharedState<LatestBlock<LDT>>,
21    mempool: SharedState<Mempool<LDT>>,
22    account_nonce_and_balance: SharedState<AccountNonceAndBalanceState>,
23
24    new_block_headers_channel: Broadcaster<MessageBlockHeader<LDT>>,
25    new_block_with_tx_channel: Broadcaster<MessageBlock<LDT>>,
26    new_block_state_update_channel: Broadcaster<MessageBlockStateUpdate<LDT>>,
27    new_block_logs_channel: Broadcaster<MessageBlockLogs<LDT>>,
28    new_mempool_tx_channel: Broadcaster<MessageMempoolDataUpdate<LDT>>,
29    market_events_channel: Broadcaster<MarketEvents>,
30    mempool_events_channel: Broadcaster<MempoolEvents>,
31    tx_compose_channel: Broadcaster<MessageTxCompose<LDT>>,
32
33    pool_health_monitor_channel: Broadcaster<MessageHealthEvent>,
34    influxdb_write_channel: Broadcaster<WriteQuery>,
35    tasks_channel: Broadcaster<LoomTask>,
36}
37
38impl Blockchain<KabuDataTypesEthereum> {
39    pub fn new(chain_id: ChainId) -> Blockchain<KabuDataTypesEthereum> {
40        let new_block_headers_channel: Broadcaster<MessageBlockHeader> = Broadcaster::new(10);
41        let new_block_with_tx_channel: Broadcaster<MessageBlock> = Broadcaster::new(10);
42        let new_block_state_update_channel: Broadcaster<MessageBlockStateUpdate> = Broadcaster::new(10);
43        let new_block_logs_channel: Broadcaster<MessageBlockLogs> = Broadcaster::new(10);
44
45        let new_mempool_tx_channel: Broadcaster<MessageMempoolDataUpdate> = Broadcaster::new(5000);
46
47        let market_events_channel: Broadcaster<MarketEvents> = Broadcaster::new(100);
48        let mempool_events_channel: Broadcaster<MempoolEvents> = Broadcaster::new(2000);
49        let tx_compose_channel: Broadcaster<MessageTxCompose> = Broadcaster::new(2000);
50
51        let pool_health_monitor_channel: Broadcaster<MessageHealthEvent> = Broadcaster::new(1000);
52        let influx_write_channel: Broadcaster<WriteQuery> = Broadcaster::new(1000);
53        let tasks_channel: Broadcaster<LoomTask> = Broadcaster::new(1000);
54
55        let mut market_instance = Market::default();
56
57        if let Err(error) = add_default_tokens_to_market(&mut market_instance, chain_id) {
58            error!(%error, "Failed to add default tokens to market");
59        }
60
61        Blockchain {
62            chain_id,
63            chain_parameters: ChainParameters::ethereum(),
64            market: SharedState::new(market_instance),
65            mempool: SharedState::new(Mempool::<KabuDataTypesEthereum>::new()),
66            latest_block: SharedState::new(LatestBlock::new(0, BlockHash::ZERO)),
67            account_nonce_and_balance: SharedState::new(AccountNonceAndBalanceState::new()),
68            new_block_headers_channel,
69            new_block_with_tx_channel,
70            new_block_state_update_channel,
71            new_block_logs_channel,
72            new_mempool_tx_channel,
73            market_events_channel,
74            mempool_events_channel,
75            pool_health_monitor_channel,
76            tx_compose_channel,
77            influxdb_write_channel: influx_write_channel,
78            tasks_channel,
79        }
80    }
81}
82
83impl<LDT: KabuDataTypes> Blockchain<LDT> {
84    pub fn chain_id(&self) -> u64 {
85        self.chain_id
86    }
87
88    pub fn chain_parameters(&self) -> ChainParameters {
89        self.chain_parameters.clone()
90    }
91
92    pub fn market(&self) -> SharedState<Market> {
93        self.market.clone()
94    }
95
96    pub fn latest_block(&self) -> SharedState<LatestBlock<LDT>> {
97        self.latest_block.clone()
98    }
99
100    pub fn mempool(&self) -> SharedState<Mempool<LDT>> {
101        self.mempool.clone()
102    }
103
104    pub fn nonce_and_balance(&self) -> SharedState<AccountNonceAndBalanceState> {
105        self.account_nonce_and_balance.clone()
106    }
107
108    pub fn new_block_headers_channel(&self) -> Broadcaster<MessageBlockHeader<LDT>> {
109        self.new_block_headers_channel.clone()
110    }
111
112    pub fn new_block_with_tx_channel(&self) -> Broadcaster<MessageBlock<LDT>> {
113        self.new_block_with_tx_channel.clone()
114    }
115
116    pub fn new_block_state_update_channel(&self) -> Broadcaster<MessageBlockStateUpdate<LDT>> {
117        self.new_block_state_update_channel.clone()
118    }
119
120    pub fn new_block_logs_channel(&self) -> Broadcaster<MessageBlockLogs<LDT>> {
121        self.new_block_logs_channel.clone()
122    }
123
124    pub fn new_mempool_tx_channel(&self) -> Broadcaster<MessageMempoolDataUpdate<LDT>> {
125        self.new_mempool_tx_channel.clone()
126    }
127
128    pub fn market_events_channel(&self) -> Broadcaster<MarketEvents> {
129        self.market_events_channel.clone()
130    }
131
132    pub fn mempool_events_channel(&self) -> Broadcaster<MempoolEvents> {
133        self.mempool_events_channel.clone()
134    }
135
136    pub fn tx_compose_channel(&self) -> Broadcaster<MessageTxCompose<LDT>> {
137        self.tx_compose_channel.clone()
138    }
139
140    pub fn health_monitor_channel(&self) -> Broadcaster<MessageHealthEvent> {
141        self.pool_health_monitor_channel.clone()
142    }
143
144    pub fn influxdb_write_channel(&self) -> Broadcaster<WriteQuery> {
145        self.influxdb_write_channel.clone()
146    }
147
148    pub fn tasks_channel(&self) -> Broadcaster<LoomTask> {
149        self.tasks_channel.clone()
150    }
151}