kabu_defi_market/
logs_parser.rs1use alloy_network::Network;
2use alloy_provider::Provider;
3use alloy_rpc_types::Log;
4use eyre::Result;
5use std::collections::HashMap;
6
7use kabu_core_actors::{run_sync, Broadcaster};
8use kabu_types_entities::PoolLoaders;
9use kabu_types_events::LoomTask;
10
11pub async fn process_log_entries<P, N>(
12 log_entries: Vec<Log>,
13 pool_loaders: &PoolLoaders<P, N>,
14 tasks_tx: Broadcaster<LoomTask>,
15) -> Result<()>
16where
17 N: Network,
18 P: Provider<N> + Send + Sync + Clone + 'static,
19{
20 let mut pool_to_fetch = Vec::new();
21 let mut processed_pools = HashMap::new();
22
23 for log_entry in log_entries.into_iter() {
24 if let Some((pool_id, pool_class)) = pool_loaders.determine_pool_class(&log_entry) {
25 if processed_pools.insert(log_entry.address(), true).is_some() {
27 continue;
28 }
29
30 pool_to_fetch.push((pool_id, pool_class));
31 }
32 }
33
34 run_sync!(tasks_tx.send(LoomTask::FetchAndAddPools(pool_to_fetch)));
35 Ok(())
36}