kabu_defi_price/
price_actor.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::ops::{Div, Mul};
4use std::time::Duration;
5
6use alloy_network::Network;
7use alloy_primitives::{address, Address, U256};
8use alloy_provider::Provider;
9use kabu_core_actors::{Accessor, Actor, ActorResult, SharedState, WorkerResult};
10use kabu_core_actors_macros::Accessor;
11use kabu_core_blockchain::Blockchain;
12use kabu_defi_address_book::TokenAddressEth;
13use kabu_defi_pools::protocols::CurveProtocol;
14use kabu_defi_pools::CurvePool;
15use kabu_types_entities::{Market, Pool};
16use tracing::{debug, error, info};
17
18async fn price_worker<N: Network, P: Provider<N> + Clone + 'static>(client: P, market: SharedState<Market>, once: bool) -> WorkerResult {
19    let curve_tricrypto_usdc = CurveProtocol::new_u256_3_eth_to(client.clone(), address!("7F86Bf177Dd4F3494b841a37e810A34dD56c829B"));
20    let curve_tricrypto_usdt = CurveProtocol::new_u256_3_eth_to(client.clone(), address!("f5f5b97624542d72a9e06f04804bf81baa15e2b4"));
21
22    let mut coins_hash_map: HashMap<Address, CurvePool<P, N>> = HashMap::new();
23
24    let curve_tricrypto_usdc_pool = CurvePool::fetch_pool_data(client.clone(), curve_tricrypto_usdc).await?;
25
26    let curve_tricrypto_usdt_pool = CurvePool::fetch_pool_data(client.clone(), curve_tricrypto_usdt).await?;
27
28    coins_hash_map.insert(TokenAddressEth::USDC, curve_tricrypto_usdc_pool.clone());
29    coins_hash_map.insert(TokenAddressEth::WBTC, curve_tricrypto_usdc_pool.clone());
30    coins_hash_map.insert(TokenAddressEth::USDT, curve_tricrypto_usdt_pool.clone());
31
32    let one_ether = U256::from(10).pow(U256::from(18));
33    let weth_amount = one_ether.mul(U256::from(5));
34
35    match market.read().await.get_token(&TokenAddressEth::WETH) {
36        Some(token) => {
37            token.set_eth_price(Some(one_ether));
38        }
39        _ => {
40            error!("WETH_NOT_FOUND")
41        }
42    }
43
44    loop {
45        for (token_address, curve_pool) in coins_hash_map.iter() {
46            debug!("Fetching price of {} at {}", token_address, curve_pool.get_address());
47
48            match curve_pool.fetch_out_amount(TokenAddressEth::WETH, *token_address, weth_amount).await {
49                Ok(out_amount) => {
50                    let price = out_amount.mul(one_ether).div(weth_amount);
51                    info!("Price of ETH in {token_address:#20x} is {price}");
52                    match market.read().await.get_token(token_address) {
53                        Some(tkn) => {
54                            tkn.set_eth_price(Some(price));
55                            debug!("Price is set");
56                        }
57                        _ => {
58                            error!(address=%token_address, "Token not found");
59                        }
60                    }
61                }
62                Err(error) => {
63                    error!(%error, "fetch_out_amount")
64                }
65            }
66        }
67
68        let usdt_price = market.read().await.get_token_or_default(&TokenAddressEth::USDT).get_eth_price();
69        let usdc_price = market.read().await.get_token_or_default(&TokenAddressEth::USDC).get_eth_price();
70
71        let mut usd_price: Option<U256> = None;
72        if let Some(usdc_price) = usdc_price {
73            if let Some(usdt_price) = usdt_price {
74                usd_price = Some((usdc_price + usdt_price) >> 1);
75            }
76        }
77
78        if let Some(usd_price) = usd_price {
79            match market.read().await.get_token(&TokenAddressEth::DAI) {
80                Some(tkn) => {
81                    tkn.set_eth_price(Some(U256::from(10).pow(U256::from(12)).mul(usd_price)));
82                }
83                _ => {
84                    error!("Token {:#20x} not found", TokenAddressEth::DAI);
85                }
86            }
87        }
88        if once {
89            break;
90        }
91
92        let _ = tokio::time::sleep(Duration::new(60, 0)).await;
93    }
94    Ok("PriceWorker finished".to_string())
95}
96
97#[derive(Accessor)]
98pub struct PriceActor<P, N> {
99    client: P,
100    only_once: bool,
101    #[accessor]
102    market: Option<SharedState<Market>>,
103    _n: PhantomData<N>,
104}
105
106impl<P, N> PriceActor<P, N>
107where
108    N: Network,
109    P: Provider<N> + Send + Sync + Clone + 'static,
110{
111    pub fn new(client: P) -> Self {
112        Self { client, only_once: false, market: None, _n: PhantomData }
113    }
114
115    pub fn only_once(self) -> Self {
116        Self { only_once: true, ..self }
117    }
118
119    pub fn on_bc(self, bc: &Blockchain) -> Self {
120        Self { market: Some(bc.market()), ..self }
121    }
122}
123
124impl<P, N> Actor for PriceActor<P, N>
125where
126    N: Network,
127    P: Provider<N> + Send + Sync + Clone + 'static,
128{
129    fn start(&self) -> ActorResult {
130        let task = tokio::task::spawn(price_worker(self.client.clone(), self.market.clone().unwrap(), self.only_once));
131        Ok(vec![task])
132    }
133
134    fn name(&self) -> &'static str {
135        "PriceActor"
136    }
137}