|
1 |
| -use anyhow::Result; |
| 1 | +use std::sync::Arc; |
| 2 | + |
| 3 | +use anyhow::{Error, Result}; |
2 | 4 | use async_trait::async_trait;
|
3 | 5 | use chrono::{DateTime, Local};
|
4 | 6 | use log::{error, info};
|
5 | 7 | use reqwest::Client;
|
6 | 8 | use serde::Deserialize;
|
| 9 | +use tokio::sync::{mpsc, Semaphore}; |
7 | 10 |
|
8 | 11 | use crate::execution::ports::news_fetcher::{FetchNewsArticle, FetchNewsHandler, FetchNewsOutput, NewsFetcher};
|
9 | 12 |
|
@@ -741,7 +744,7 @@ impl YahooClient {
|
741 | 744 | }
|
742 | 745 | }
|
743 | 746 |
|
744 |
| - async fn fetch_provider(&self, provider: &str) -> Result<Vec<FetchNewsArticle>> { |
| 747 | + async fn fetch_provider(self: Arc<Self>, provider: &str) -> Result<Vec<FetchNewsArticle>> { |
745 | 748 | let mut articles = vec![];
|
746 | 749 | let url = format!("https://news.yahoo.co.jp/rss/media/{}/all.xml", provider);
|
747 | 750 | let response_text = Client::new().get(url).send().await?.text().await?;
|
@@ -791,19 +794,41 @@ impl YahooClient {
|
791 | 794 |
|
792 | 795 | #[async_trait]
|
793 | 796 | impl NewsFetcher for YahooClient {
|
794 |
| - async fn fetch_news(&self, handler: FetchNewsHandler) -> Vec<FetchNewsOutput> { |
795 |
| - let mut outputs = vec![]; |
| 797 | + async fn fetch_news(self: Arc<Self>, handler: FetchNewsHandler) -> Vec<FetchNewsOutput> { |
| 798 | + const CHANNEL_CAPACITY: usize = 100; |
| 799 | + let (sender, mut receiver) = mpsc::channel(CHANNEL_CAPACITY); |
| 800 | + let handle = tokio::spawn(async move { |
| 801 | + let mut outputs = vec![]; |
| 802 | + while let Some(handle) = receiver.recv().await { |
| 803 | + outputs.push(handle); |
| 804 | + } |
| 805 | + outputs |
| 806 | + }); |
| 807 | + const PROVIDER_PERMITS_NUM: usize = 20; |
| 808 | + let semaphore = Arc::new(Semaphore::new(PROVIDER_PERMITS_NUM)); |
796 | 809 | for provider in &self.providers {
|
797 |
| - match self.fetch_provider(provider).await { |
798 |
| - Ok(articles) => { |
799 |
| - info!("provider={}, articles.len={}", provider, articles.len()); |
800 |
| - for article in articles { |
801 |
| - outputs.push(handler(article)); |
| 810 | + let provider = provider.to_string(); |
| 811 | + let self = Arc::clone(&self); |
| 812 | + let sender = sender.clone(); |
| 813 | + let semaphore = Arc::clone(&semaphore); |
| 814 | + let handler = Arc::clone(&handler); |
| 815 | + // Use `spawn_local` to preserve the LocalSet, which is later used by the handler |
| 816 | + // Ref: `execution::cases::collect_news::CollectNewsCase` |
| 817 | + tokio::task::spawn_local(async move { |
| 818 | + let _permit = semaphore.acquire().await?; |
| 819 | + match self.fetch_provider(&provider).await { |
| 820 | + Ok(articles) => { |
| 821 | + info!("provider={}, articles.len={}", provider, articles.len()); |
| 822 | + for article in articles { |
| 823 | + sender.send(handler(article)).await?; |
| 824 | + } |
802 | 825 | }
|
| 826 | + Err(error) => error!("provider={}, error={}", provider, error), |
803 | 827 | }
|
804 |
| - Err(error) => error!("provider={}, error={}", provider, error), |
805 |
| - } |
| 828 | + Ok::<(), Error>(()) |
| 829 | + }); |
806 | 830 | }
|
807 |
| - outputs |
| 831 | + drop(sender); // Drop early (before awaiting the saving task), or the channel won't close |
| 832 | + handle.await.unwrap_or(vec![]) |
808 | 833 | }
|
809 | 834 | }
|
0 commit comments