kabu_defi_market/
logs_parser.rs

1use alloy_network::Network;
2use alloy_provider::Provider;
3use alloy_rpc_types::Log;
4use eyre::Result;
5use std::collections::HashMap;
6
7use kabu_core_actors::{run_sync, Broadcaster};
8use kabu_types_entities::PoolLoaders;
9use kabu_types_events::LoomTask;
10
11pub async fn process_log_entries<P, N>(
12    log_entries: Vec<Log>,
13    pool_loaders: &PoolLoaders<P, N>,
14    tasks_tx: Broadcaster<LoomTask>,
15) -> Result<()>
16where
17    N: Network,
18    P: Provider<N> + Send + Sync + Clone + 'static,
19{
20    let mut pool_to_fetch = Vec::new();
21    let mut processed_pools = HashMap::new();
22
23    for log_entry in log_entries.into_iter() {
24        if let Some((pool_id, pool_class)) = pool_loaders.determine_pool_class(&log_entry) {
25            // was this pool already processed?
26            if processed_pools.insert(log_entry.address(), true).is_some() {
27                continue;
28            }
29
30            pool_to_fetch.push((pool_id, pool_class));
31        }
32    }
33
34    run_sync!(tasks_tx.send(LoomTask::FetchAndAddPools(pool_to_fetch)));
35    Ok(())
36}