Skip to content

Commit ec5561c

Browse files
metalwhalesontdhust
authored andcommitted
Yahoo client fetches providers simultaneously.
1 parent 84bfe91 commit ec5561c

File tree

4 files changed

+46
-20
lines changed

4 files changed

+46
-20
lines changed

chloria-backend/chloria-job/src/execution/cases/collect_news.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl LocalCase for CollectNewsCase {
9090
let sender = sender.clone();
9191
let semaphore = Arc::clone(&semaphore);
9292
let handles = new_fetcher
93-
.fetch_news(Box::new(move |article| {
93+
.fetch_news(Arc::new(move |article| {
9494
let news = NewsEntity::new(article.id);
9595
let http_helper = Arc::clone(&http_helper);
9696
let file_storage = Arc::clone(&file_storage);

chloria-backend/chloria-job/src/execution/ports/news_fetcher.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{future::Future, pin::Pin};
1+
use std::{future::Future, pin::Pin, sync::Arc};
22

33
use anyhow::Result;
44
use async_trait::async_trait;
@@ -18,23 +18,22 @@ pub(crate) struct FetchNewsArticle {
1818
}
1919

2020
pub(crate) type FetchNewsOutput = JoinHandle<Result<()>>;
21-
pub(crate) type FetchNewsHandler = Box<dyn Fn(FetchNewsArticle) -> FetchNewsOutput + Send>;
21+
pub(crate) type FetchNewsHandler = Arc<dyn Fn(FetchNewsArticle) -> FetchNewsOutput + Send + Sync>;
2222

2323
#[async_trait]
2424
pub(crate) trait NewsFetcher: Send + Sync {
25-
async fn fetch_news(&self, handler: FetchNewsHandler) -> Vec<FetchNewsOutput>;
25+
async fn fetch_news(self: Arc<Self>, handler: FetchNewsHandler) -> Vec<FetchNewsOutput>;
2626
}
2727

2828
mock! {
2929
pub(in super::super) NewsFetcher {}
3030

3131
impl NewsFetcher for NewsFetcher {
32-
fn fetch_news<'life0, 'async_trait>(
33-
&'life0 self,
32+
fn fetch_news<'async_trait>(
33+
self: Arc<Self>,
3434
handler: FetchNewsHandler,
3535
) -> Pin<Box<dyn Future<Output = Vec<FetchNewsOutput>> + Send + 'async_trait>>
3636
where
37-
'life0: 'async_trait,
3837
Self: 'async_trait;
3938
}
4039
}

chloria-backend/chloria-job/src/infrastructure/news_fetcher/newsdata.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use anyhow::{Context, Result};
24
use async_trait::async_trait;
35
use chrono::NaiveDateTime;
@@ -88,7 +90,7 @@ impl NewsdataClient {
8890

8991
#[async_trait]
9092
impl NewsFetcher for NewsdataClient {
91-
async fn fetch_news(&self, handler: FetchNewsHandler) -> Vec<FetchNewsOutput> {
93+
async fn fetch_news(self: Arc<Self>, handler: FetchNewsHandler) -> Vec<FetchNewsOutput> {
9294
let mut outputs = vec![];
9395
let mut remaining_results_num = None;
9496
let mut page = None;

chloria-backend/chloria-job/src/infrastructure/news_fetcher/yahoo.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
use anyhow::Result;
1+
use std::sync::Arc;
2+
3+
use anyhow::{Error, Result};
24
use async_trait::async_trait;
35
use chrono::{DateTime, Local};
46
use log::{error, info};
57
use regex::Regex;
68
use reqwest::Client;
79
use serde::Deserialize;
10+
use tokio::sync::{mpsc, Semaphore};
811

912
use crate::execution::ports::news_fetcher::{FetchNewsArticle, FetchNewsHandler, FetchNewsOutput, NewsFetcher};
1013

@@ -742,7 +745,7 @@ impl YahooClient {
742745
}
743746
}
744747

745-
async fn fetch_provider(&self, provider: &str) -> Result<Vec<FetchNewsArticle>> {
748+
async fn fetch_provider(self: Arc<Self>, provider: &str) -> Result<Vec<FetchNewsArticle>> {
746749
let mut articles = vec![];
747750
let url = format!("https://news.yahoo.co.jp/rss/media/{}/all.xml", provider);
748751
let response_text = Client::new().get(url).send().await?.text().await?;
@@ -790,19 +793,41 @@ impl YahooClient {
790793

791794
#[async_trait]
792795
impl NewsFetcher for YahooClient {
793-
async fn fetch_news(&self, handler: FetchNewsHandler) -> Vec<FetchNewsOutput> {
794-
let mut outputs = vec![];
796+
async fn fetch_news(self: Arc<Self>, handler: FetchNewsHandler) -> Vec<FetchNewsOutput> {
797+
const CHANNEL_CAPACITY: usize = 100;
798+
let (sender, mut receiver) = mpsc::channel(CHANNEL_CAPACITY);
799+
let receiver_handle = tokio::spawn(async move {
800+
let mut handles = vec![];
801+
while let Some(handle) = receiver.recv().await {
802+
handles.push(handle);
803+
}
804+
handles
805+
});
806+
const PROVIDER_PERMITS_NUM: usize = 20;
807+
let semaphore = Arc::new(Semaphore::new(PROVIDER_PERMITS_NUM));
795808
for provider in &self.providers {
796-
match self.fetch_provider(provider).await {
797-
Ok(articles) => {
798-
info!("provider={}, articles.len={}", provider, articles.len());
799-
for article in articles {
800-
outputs.push(handler(article));
809+
let provider = provider.to_string();
810+
let self = Arc::clone(&self);
811+
let sender = sender.clone();
812+
let semaphore = Arc::clone(&semaphore);
813+
let handler = Arc::clone(&handler);
814+
// Use `spawn_local` to preserve the LocalSet, which is later used by the handler
815+
// Ref: `execution::cases::collect_news::CollectNewsCase`
816+
tokio::task::spawn_local(async move {
817+
let _permit = semaphore.acquire().await?;
818+
match self.fetch_provider(&provider).await {
819+
Ok(articles) => {
820+
info!("provider={}, articles.len={}", provider, articles.len());
821+
for article in articles {
822+
sender.send(handler(article)).await?;
823+
}
801824
}
825+
Err(error) => error!("provider={}, error={}", provider, error),
802826
}
803-
Err(error) => error!("provider={}, error={}", provider, error),
804-
}
827+
Ok::<(), Error>(())
828+
});
805829
}
806-
outputs
830+
drop(sender); // Drop early (before awaiting the receiver) to prevent the sender from blocking the channel from closing
831+
receiver_handle.await.unwrap_or(vec![])
807832
}
808833
}

0 commit comments

Comments
 (0)