kabu_defi_price/
price_actor.rs1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::ops::{Div, Mul};
4use std::time::Duration;
5
6use alloy_network::Network;
7use alloy_primitives::{address, Address, U256};
8use alloy_provider::Provider;
9use kabu_core_actors::{Accessor, Actor, ActorResult, SharedState, WorkerResult};
10use kabu_core_actors_macros::Accessor;
11use kabu_core_blockchain::Blockchain;
12use kabu_defi_address_book::TokenAddressEth;
13use kabu_defi_pools::protocols::CurveProtocol;
14use kabu_defi_pools::CurvePool;
15use kabu_types_entities::{Market, Pool};
16use tracing::{debug, error, info};
17
18async fn price_worker<N: Network, P: Provider<N> + Clone + 'static>(client: P, market: SharedState<Market>, once: bool) -> WorkerResult {
19 let curve_tricrypto_usdc = CurveProtocol::new_u256_3_eth_to(client.clone(), address!("7F86Bf177Dd4F3494b841a37e810A34dD56c829B"));
20 let curve_tricrypto_usdt = CurveProtocol::new_u256_3_eth_to(client.clone(), address!("f5f5b97624542d72a9e06f04804bf81baa15e2b4"));
21
22 let mut coins_hash_map: HashMap<Address, CurvePool<P, N>> = HashMap::new();
23
24 let curve_tricrypto_usdc_pool = CurvePool::fetch_pool_data(client.clone(), curve_tricrypto_usdc).await?;
25
26 let curve_tricrypto_usdt_pool = CurvePool::fetch_pool_data(client.clone(), curve_tricrypto_usdt).await?;
27
28 coins_hash_map.insert(TokenAddressEth::USDC, curve_tricrypto_usdc_pool.clone());
29 coins_hash_map.insert(TokenAddressEth::WBTC, curve_tricrypto_usdc_pool.clone());
30 coins_hash_map.insert(TokenAddressEth::USDT, curve_tricrypto_usdt_pool.clone());
31
32 let one_ether = U256::from(10).pow(U256::from(18));
33 let weth_amount = one_ether.mul(U256::from(5));
34
35 match market.read().await.get_token(&TokenAddressEth::WETH) {
36 Some(token) => {
37 token.set_eth_price(Some(one_ether));
38 }
39 _ => {
40 error!("WETH_NOT_FOUND")
41 }
42 }
43
44 loop {
45 for (token_address, curve_pool) in coins_hash_map.iter() {
46 debug!("Fetching price of {} at {}", token_address, curve_pool.get_address());
47
48 match curve_pool.fetch_out_amount(TokenAddressEth::WETH, *token_address, weth_amount).await {
49 Ok(out_amount) => {
50 let price = out_amount.mul(one_ether).div(weth_amount);
51 info!("Price of ETH in {token_address:#20x} is {price}");
52 match market.read().await.get_token(token_address) {
53 Some(tkn) => {
54 tkn.set_eth_price(Some(price));
55 debug!("Price is set");
56 }
57 _ => {
58 error!(address=%token_address, "Token not found");
59 }
60 }
61 }
62 Err(error) => {
63 error!(%error, "fetch_out_amount")
64 }
65 }
66 }
67
68 let usdt_price = market.read().await.get_token_or_default(&TokenAddressEth::USDT).get_eth_price();
69 let usdc_price = market.read().await.get_token_or_default(&TokenAddressEth::USDC).get_eth_price();
70
71 let mut usd_price: Option<U256> = None;
72 if let Some(usdc_price) = usdc_price {
73 if let Some(usdt_price) = usdt_price {
74 usd_price = Some((usdc_price + usdt_price) >> 1);
75 }
76 }
77
78 if let Some(usd_price) = usd_price {
79 match market.read().await.get_token(&TokenAddressEth::DAI) {
80 Some(tkn) => {
81 tkn.set_eth_price(Some(U256::from(10).pow(U256::from(12)).mul(usd_price)));
82 }
83 _ => {
84 error!("Token {:#20x} not found", TokenAddressEth::DAI);
85 }
86 }
87 }
88 if once {
89 break;
90 }
91
92 let _ = tokio::time::sleep(Duration::new(60, 0)).await;
93 }
94 Ok("PriceWorker finished".to_string())
95}
96
97#[derive(Accessor)]
98pub struct PriceActor<P, N> {
99 client: P,
100 only_once: bool,
101 #[accessor]
102 market: Option<SharedState<Market>>,
103 _n: PhantomData<N>,
104}
105
106impl<P, N> PriceActor<P, N>
107where
108 N: Network,
109 P: Provider<N> + Send + Sync + Clone + 'static,
110{
111 pub fn new(client: P) -> Self {
112 Self { client, only_once: false, market: None, _n: PhantomData }
113 }
114
115 pub fn only_once(self) -> Self {
116 Self { only_once: true, ..self }
117 }
118
119 pub fn on_bc(self, bc: &Blockchain) -> Self {
120 Self { market: Some(bc.market()), ..self }
121 }
122}
123
124impl<P, N> Actor for PriceActor<P, N>
125where
126 N: Network,
127 P: Provider<N> + Send + Sync + Clone + 'static,
128{
129 fn start(&self) -> ActorResult {
130 let task = tokio::task::spawn(price_worker(self.client.clone(), self.market.clone().unwrap(), self.only_once));
131 Ok(vec![task])
132 }
133
134 fn name(&self) -> &'static str {
135 "PriceActor"
136 }
137}