kabu_core_blockchain/
blockchain.rs1use crate::blockchain_tokens::add_default_tokens_to_market;
2use alloy::primitives::BlockHash;
3use alloy::primitives::ChainId;
4use influxdb::WriteQuery;
5use kabu_core_actors::{Broadcaster, SharedState};
6use kabu_types_blockchain::{ChainParameters, Mempool};
7use kabu_types_blockchain::{KabuDataTypes, KabuDataTypesEthereum};
8use kabu_types_entities::{AccountNonceAndBalanceState, LatestBlock, Market};
9use kabu_types_events::{
10 LoomTask, MarketEvents, MempoolEvents, MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageHealthEvent,
11 MessageMempoolDataUpdate, MessageTxCompose,
12};
13use tracing::error;
14
15#[derive(Clone)]
16pub struct Blockchain<LDT: KabuDataTypes + 'static = KabuDataTypesEthereum> {
17 chain_id: ChainId,
18 chain_parameters: ChainParameters,
19 market: SharedState<Market>,
20 latest_block: SharedState<LatestBlock<LDT>>,
21 mempool: SharedState<Mempool<LDT>>,
22 account_nonce_and_balance: SharedState<AccountNonceAndBalanceState>,
23
24 new_block_headers_channel: Broadcaster<MessageBlockHeader<LDT>>,
25 new_block_with_tx_channel: Broadcaster<MessageBlock<LDT>>,
26 new_block_state_update_channel: Broadcaster<MessageBlockStateUpdate<LDT>>,
27 new_block_logs_channel: Broadcaster<MessageBlockLogs<LDT>>,
28 new_mempool_tx_channel: Broadcaster<MessageMempoolDataUpdate<LDT>>,
29 market_events_channel: Broadcaster<MarketEvents>,
30 mempool_events_channel: Broadcaster<MempoolEvents>,
31 tx_compose_channel: Broadcaster<MessageTxCompose<LDT>>,
32
33 pool_health_monitor_channel: Broadcaster<MessageHealthEvent>,
34 influxdb_write_channel: Broadcaster<WriteQuery>,
35 tasks_channel: Broadcaster<LoomTask>,
36}
37
38impl Blockchain<KabuDataTypesEthereum> {
39 pub fn new(chain_id: ChainId) -> Blockchain<KabuDataTypesEthereum> {
40 let new_block_headers_channel: Broadcaster<MessageBlockHeader> = Broadcaster::new(10);
41 let new_block_with_tx_channel: Broadcaster<MessageBlock> = Broadcaster::new(10);
42 let new_block_state_update_channel: Broadcaster<MessageBlockStateUpdate> = Broadcaster::new(10);
43 let new_block_logs_channel: Broadcaster<MessageBlockLogs> = Broadcaster::new(10);
44
45 let new_mempool_tx_channel: Broadcaster<MessageMempoolDataUpdate> = Broadcaster::new(5000);
46
47 let market_events_channel: Broadcaster<MarketEvents> = Broadcaster::new(100);
48 let mempool_events_channel: Broadcaster<MempoolEvents> = Broadcaster::new(2000);
49 let tx_compose_channel: Broadcaster<MessageTxCompose> = Broadcaster::new(2000);
50
51 let pool_health_monitor_channel: Broadcaster<MessageHealthEvent> = Broadcaster::new(1000);
52 let influx_write_channel: Broadcaster<WriteQuery> = Broadcaster::new(1000);
53 let tasks_channel: Broadcaster<LoomTask> = Broadcaster::new(1000);
54
55 let mut market_instance = Market::default();
56
57 if let Err(error) = add_default_tokens_to_market(&mut market_instance, chain_id) {
58 error!(%error, "Failed to add default tokens to market");
59 }
60
61 Blockchain {
62 chain_id,
63 chain_parameters: ChainParameters::ethereum(),
64 market: SharedState::new(market_instance),
65 mempool: SharedState::new(Mempool::<KabuDataTypesEthereum>::new()),
66 latest_block: SharedState::new(LatestBlock::new(0, BlockHash::ZERO)),
67 account_nonce_and_balance: SharedState::new(AccountNonceAndBalanceState::new()),
68 new_block_headers_channel,
69 new_block_with_tx_channel,
70 new_block_state_update_channel,
71 new_block_logs_channel,
72 new_mempool_tx_channel,
73 market_events_channel,
74 mempool_events_channel,
75 pool_health_monitor_channel,
76 tx_compose_channel,
77 influxdb_write_channel: influx_write_channel,
78 tasks_channel,
79 }
80 }
81}
82
83impl<LDT: KabuDataTypes> Blockchain<LDT> {
84 pub fn chain_id(&self) -> u64 {
85 self.chain_id
86 }
87
88 pub fn chain_parameters(&self) -> ChainParameters {
89 self.chain_parameters.clone()
90 }
91
92 pub fn market(&self) -> SharedState<Market> {
93 self.market.clone()
94 }
95
96 pub fn latest_block(&self) -> SharedState<LatestBlock<LDT>> {
97 self.latest_block.clone()
98 }
99
100 pub fn mempool(&self) -> SharedState<Mempool<LDT>> {
101 self.mempool.clone()
102 }
103
104 pub fn nonce_and_balance(&self) -> SharedState<AccountNonceAndBalanceState> {
105 self.account_nonce_and_balance.clone()
106 }
107
108 pub fn new_block_headers_channel(&self) -> Broadcaster<MessageBlockHeader<LDT>> {
109 self.new_block_headers_channel.clone()
110 }
111
112 pub fn new_block_with_tx_channel(&self) -> Broadcaster<MessageBlock<LDT>> {
113 self.new_block_with_tx_channel.clone()
114 }
115
116 pub fn new_block_state_update_channel(&self) -> Broadcaster<MessageBlockStateUpdate<LDT>> {
117 self.new_block_state_update_channel.clone()
118 }
119
120 pub fn new_block_logs_channel(&self) -> Broadcaster<MessageBlockLogs<LDT>> {
121 self.new_block_logs_channel.clone()
122 }
123
124 pub fn new_mempool_tx_channel(&self) -> Broadcaster<MessageMempoolDataUpdate<LDT>> {
125 self.new_mempool_tx_channel.clone()
126 }
127
128 pub fn market_events_channel(&self) -> Broadcaster<MarketEvents> {
129 self.market_events_channel.clone()
130 }
131
132 pub fn mempool_events_channel(&self) -> Broadcaster<MempoolEvents> {
133 self.mempool_events_channel.clone()
134 }
135
136 pub fn tx_compose_channel(&self) -> Broadcaster<MessageTxCompose<LDT>> {
137 self.tx_compose_channel.clone()
138 }
139
140 pub fn health_monitor_channel(&self) -> Broadcaster<MessageHealthEvent> {
141 self.pool_health_monitor_channel.clone()
142 }
143
144 pub fn influxdb_write_channel(&self) -> Broadcaster<WriteQuery> {
145 self.influxdb_write_channel.clone()
146 }
147
148 pub fn tasks_channel(&self) -> Broadcaster<LoomTask> {
149 self.tasks_channel.clone()
150 }
151}