From 899f403f7e1714dbf9e467bcae5589384322de05 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Thu, 12 Jun 2025 22:19:12 +0200 Subject: [PATCH 01/20] add `skip_publishers_disallowing_training` --- README.md | 5 ++ docs/5_advanced_topics.md | 8 +++ src/fundus/publishers/base_objects.py | 76 +++++++++++++++++++++++---- src/fundus/scraping/crawler.py | 45 +++++++++++++++- src/fundus/scraping/html.py | 15 +++--- src/fundus/scraping/session.py | 22 +++++--- src/fundus/scraping/url.py | 14 ++++- 7 files changed, 160 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index ee4e7c37c..b6c0804b5 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,11 @@ Developed at None: + def read(self) -> None: """Reads the robots.txt URL and feeds it to the parser.""" try: # noinspection PyUnresolvedReferences session = session_handler.get_session() - response = session.get_with_interrupt(self.url, headers=headers) # type: ignore[attr-defined] + response = session.get_with_interrupt(self.url, headers=self.headers) # type: ignore[attr-defined] except HTTPError as err: if err.response.status_code in (401, 403): + logger.warning( + f"Robots {self.url!r} disallowed access with status code {err.response.status_code}." # type: ignore[attr-defined] + " Defaulting to disallow all." + ) self.disallow_all = True elif 400 <= err.response.status_code < 500: self.allow_all = True + except RequestInterruptedError as err: + logger.warning(f"Request for robots {self.url!r} interrupted: {err!r}. Defaulting to disallow all.") # type: ignore[attr-defined] + self.disallow_all = True else: self.parse(response.text.splitlines()) + def parse(self, lines: Iterable[str]) -> None: + for line in lines: + if line.strip().startswith("#") and set(line.split(" ")) & self._disallow_training_keywords: + self.disallow_training = True + break + super().parse(lines) + class Robots: - def __init__(self, url: str): + def __init__(self, url: str, headers: Optional[Dict[str, str]] = None): self.url = url - self.robots_file_parser = CustomRobotFileParser(url) + self.robots_file_parser = CustomRobotFileParser(url, headers=headers) self.ready: bool = False - def read(self, headers: Optional[Dict[str, str]] = None) -> None: + def _read(self) -> None: try: - self.robots_file_parser.read(headers=headers) + self.robots_file_parser.read() except (ConnectionError, ReadTimeout): logger.warning(f"Could not load robots {self.url!r}. Ignoring robots and continuing.") self.robots_file_parser.allow_all = True self.ready = True + def ensure_ready(self) -> None: + """Ensure that the robots.txt file is read and parsed.""" + if not self.ready: + self._read() + def can_fetch(self, useragent: str, url: str) -> bool: + self.ensure_ready() return self.robots_file_parser.can_fetch(useragent, url) def crawl_delay(self, useragent: str) -> Optional[float]: + self.ensure_ready() delay = self.robots_file_parser.crawl_delay(useragent) return delay if delay is None else float(delay) + def disallows_training(self) -> bool: + self.ensure_ready() + return self.robots_file_parser.disallow_training + + def disallow_all(self) -> bool: + self.ensure_ready() + return self.robots_file_parser.disallow_all + class Publisher: __name__: str @@ -85,6 +134,7 @@ def __init__( url_filter: Optional[URLFilter] = None, request_header: Optional[Dict[str, str]] = None, deprecated: bool = False, + disallows_training: bool = False, ): """Initialization of a new Publisher object @@ -97,6 +147,10 @@ def __init__( appended to crawled URLs url_filter (Optional[URLFilter]): Regex filter to apply determining URLs to be skipped request_header (Optional[Dict[str, str]]): Request header to be used for the GET-request + deprecated (bool): If True, the publisher is deprecated and skipped by default + disallows_training (bool): If True, the publisher disallows training on its articles in it's robots.txt file. + Note that this is only an indicator and users should verify the terms of use of the publisher before + using the articles for training purposes. """ if not (name and domain and parser and sources): @@ -108,7 +162,11 @@ def __init__( self.url_filter = url_filter self.request_header = request_header self.deprecated = deprecated - self.robots = Robots(self.domain + "robots.txt" if self.domain.endswith("/") else self.domain + "/robots.txt") + self.robots = Robots( + url=self.domain + "robots.txt" if self.domain.endswith("/") else self.domain + "/robots.txt", + headers=self.request_header, + ) + self.disallows_training = disallows_training # we define the dict here manually instead of using default dict so that we can control # the order in which sources are proceeded. diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 236b61a05..9abe2c455 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -37,6 +37,7 @@ TypeVar, Union, cast, + overload, ) import dill @@ -222,6 +223,7 @@ def _build_article_iterator( extraction_filter: Optional[ExtractionFilter], url_filter: Optional[URLFilter], language_filter: Optional[List[str]], + skip_publishers_disallowing_training: bool = False, ) -> Iterator[Article]: raise NotImplementedError @@ -236,6 +238,7 @@ def crawl( language_filter: Optional[List[str]] = None, only_unique: bool = True, save_to_file: Union[None, str, Path] = None, + skip_publishers_disallowing_training: bool = False, ) -> Iterator[Article]: """Yields articles from initialized scrapers @@ -267,6 +270,9 @@ def crawl( Always returns the first encountered article. Defaults to True. save_to_file (Union[None, str, Path]): If set, the crawled articles will be collected saved to the specified file as a JSON list. + skip_publishers_disallowing_training (bool): If set to True, publishers that disallow training + are skipped. Note that this is an indicator only and users with the intention of using Fundus to gather + training data should always check the publisher's terms of use beforehand. Returns: Iterator[Article]: An iterator yielding objects of type Article. @@ -364,7 +370,12 @@ def callback() -> None: try: with Timeout(seconds=timeout, silent=True, callback=callback, disable=timeout <= 0) as timer: for article in self._build_article_iterator( - tuple(fitting_publishers), error_handling, build_extraction_filter(), url_filter, language_filter + tuple(fitting_publishers), + error_handling, + build_extraction_filter(), + url_filter, + language_filter, + skip_publishers_disallowing_training, ): if max_articles_per_publisher and article_count[article.publisher] == max_articles_per_publisher: if isinstance(self, Crawler) and not __EVENTS__.is_event_set("stop", article.publisher): @@ -465,6 +476,7 @@ def _fetch_articles( extraction_filter: Optional[ExtractionFilter] = None, url_filter: Optional[URLFilter] = None, language_filter: Optional[List[str]] = None, + skip_publisher_disallowing_training: bool = False, ) -> Iterator[Article]: def build_delay() -> Optional[Delay]: if isinstance(self.delay, float): @@ -481,6 +493,24 @@ def constant_delay() -> float: else: raise TypeError("param of ") + # register default events + __EVENTS__.register_event("stop") + + if skip_publisher_disallowing_training and ( + publisher.disallows_training or publisher.robots.disallows_training() + ): + logger.warning( + f"Skipping publisher {publisher.name!r} because it disallows training. " + f"Set to False to include it." + ) + return + if publisher.robots.disallow_all(): + logger.warning( + f"Skipping publisher {publisher.name!r} because it disallows all crawling in robots.txt. " + f"Set to True to include it." + ) + return + scraper = WebScraper( publisher, self.restrict_sources_to, @@ -523,6 +553,7 @@ def _build_article_iterator( extraction_filter: Optional[ExtractionFilter], url_filter: Optional[URLFilter], language_filter: Optional[List[str]], + skip_publisher_disallowing_training: bool = False, ) -> Iterator[Article]: article_task = partial( self._fetch_articles, @@ -530,6 +561,7 @@ def _build_article_iterator( extraction_filter=extraction_filter, url_filter=url_filter, language_filter=language_filter, + skip_publisher_disallowing_training=skip_publisher_disallowing_training, ) if self.threading: @@ -597,10 +629,19 @@ def _fetch_articles( extraction_filter: Optional[ExtractionFilter] = None, url_filter: Optional[URLFilter] = None, language_filter: Optional[List[str]] = None, + skip_publishers_disallowing_training: bool = False, bar: Optional[tqdm] = None, ) -> Iterator[Article]: retries: int = 0 while True: + if skip_publishers_disallowing_training: + publishers = tuple( + [ + publisher + for publisher in publishers + if not (publisher.disallows_training or publisher.robots.disallows_training()) + ] + ) source = CCNewsSource(*publishers, warc_path=warc_path) scraper = CCNewsScraper(source) try: @@ -731,6 +772,7 @@ def _build_article_iterator( extraction_filter: Optional[ExtractionFilter], url_filter: Optional[URLFilter], language_filter: Optional[List[str]], + skip_publishers_disallowing_training: bool = False, **kwargs, ) -> Iterator[Article]: warc_paths = tuple(self._get_warc_paths()) @@ -743,6 +785,7 @@ def _build_article_iterator( extraction_filter=extraction_filter, url_filter=url_filter, language_filter=language_filter, + skip_publishers_disallowing_training=skip_publishers_disallowing_training, bar=bar, ) diff --git a/src/fundus/scraping/html.py b/src/fundus/scraping/html.py index ba0c91709..fad3f126a 100644 --- a/src/fundus/scraping/html.py +++ b/src/fundus/scraping/html.py @@ -17,7 +17,11 @@ from fundus.publishers.base_objects import Publisher, Robots from fundus.scraping.delay import Delay from fundus.scraping.filter import URLFilter -from fundus.scraping.session import _default_header, session_handler +from fundus.scraping.session import ( + RequestInterruptedError, + _default_header, + session_handler, +) from fundus.scraping.url import URLSource from fundus.utils.events import __EVENTS__ @@ -116,15 +120,10 @@ def __init__( self.delay = delay - # register default events - __EVENTS__.register_event("stop") - # parse robots: self.robots: Optional[Robots] = None if not ignore_robots: self.robots = self.publisher.robots - if not self.robots.ready: - self.publisher.robots.read(headers=self.request_header) if not ignore_crawl_delay: if robots_delay := self.robots.crawl_delay(self.request_header.get("user-agent") or "*"): @@ -191,7 +190,9 @@ def filter_url(u: str) -> bool: if isinstance(error, HTTPError) and error.response.status_code >= 500: logger.warning(f"Skipped {self.publisher.name!r} due to server errors: {error!r}") continue - + except RequestInterruptedError as error: + logger.debug(f"Interrupt request for {url!r} executed. Stopping further requests.") + break except Exception as error: logger.error(f"Warning! Skipped requested URL {url!r} because of an unexpected error {error!r}") continue diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index 7d3f392d1..e79abefd1 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -15,6 +15,12 @@ _default_header = {"user-agent": "Fundus"} +class RequestInterruptedError(Exception): + """Raised when a request is interrupted by a stop event.""" + + pass + + class InterruptableSession(requests.Session): def __init__(self, timeout: Optional[int] = None): super().__init__() @@ -25,7 +31,7 @@ def get_with_interrupt(self, *args, **kwargs) -> requests.Response: This function hands over the request to another thread and checks every second for an interrupt event. If there was an interrupt event, this function raises - a requests.exceptions.Timeout error. + a RequestInterruptedError exception. Args: *args: requests.Session.get(*) arguments. @@ -33,6 +39,9 @@ def get_with_interrupt(self, *args, **kwargs) -> requests.Response: Returns: The response. + + Raises: + RequestInterruptedError: If the request is interrupted by a stop event. """ def _req(): @@ -53,15 +62,14 @@ def _req(): while True: try: response = response_queue.get(timeout=1) - except Empty: - if __EVENTS__.is_event_set("stop"): - logger.debug(f"Interrupt request for {url!r}") - response_queue.task_done() - exit(1) - else: if isinstance(response, Exception): raise response return response + except Empty: + if __EVENTS__.is_event_set("stop"): + logger.debug(f"Interrupt request for {url!r}") + # Raise an exception instead of calling exit() to avoid deadlocks + raise RequestInterruptedError(f"Request to {url} was interrupted by stop event") @dataclass diff --git a/src/fundus/scraping/url.py b/src/fundus/scraping/url.py index 6bd42416a..631c0c9b7 100644 --- a/src/fundus/scraping/url.py +++ b/src/fundus/scraping/url.py @@ -18,7 +18,11 @@ from fundus.logging import create_logger from fundus.parser.utility import generic_nodes_to_text from fundus.scraping.filter import URLFilter, inverse -from fundus.scraping.session import _default_header, session_handler +from fundus.scraping.session import ( + RequestInterruptedError, + _default_header, + session_handler, +) logger = create_logger(__name__) @@ -142,6 +146,10 @@ def __iter__(self) -> Iterator[str]: logger.warning(f"Warning! Couldn't parse rss feed {self.url!r} because of {err}") return + except RequestInterruptedError: + logger.debug(f"Interrupt request for RSS feed {self.url!r} was executed") + return + except Exception as error: logger.error(f"Warning! Couldn't parse rss feed {self.url!r} because of an unexpected error {error!r}") return @@ -179,6 +187,10 @@ def yield_recursive(sitemap_url: str) -> Iterator[str]: logger.warning(f"Warning! Couldn't reach sitemap {sitemap_url!r} because of {error!r}") return + except RequestInterruptedError: + logger.debug(f"Interrupt request for sitemap {sitemap_url!r} was executed") + return + except Exception as error: logger.error( f"Warning! Couldn't reach sitemap {sitemap_url!r} because of an unexpected error {error!r}" From a30892febfbb0d52f01230d0559c23cba2f7c91d Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Thu, 12 Jun 2025 22:28:01 +0200 Subject: [PATCH 02/20] minor fixes --- src/fundus/publishers/base_objects.py | 15 ++++++++------- src/fundus/scraping/crawler.py | 1 - src/fundus/scraping/session.py | 1 - 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index 4adca2e17..adf61e4d1 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -43,9 +43,10 @@ class CustomRobotFileParser(RobotFileParser): } def __init__(self, url: str, headers: Optional[Dict[str, str]] = None): - super().__init__(url) self.headers = headers - self.disallow_training: bool = False + self.disallows_training: bool = False + self.url = url + super().__init__(url) # noinspection PyAttributeOutsideInit def read(self) -> None: @@ -53,18 +54,18 @@ def read(self) -> None: try: # noinspection PyUnresolvedReferences session = session_handler.get_session() - response = session.get_with_interrupt(self.url, headers=self.headers) # type: ignore[attr-defined] + response = session.get_with_interrupt(self.url, headers=self.headers) except HTTPError as err: if err.response.status_code in (401, 403): logger.warning( - f"Robots {self.url!r} disallowed access with status code {err.response.status_code}." # type: ignore[attr-defined] + f"Robots {self.url!r} disallowed access with status code {err.response.status_code}." " Defaulting to disallow all." ) self.disallow_all = True elif 400 <= err.response.status_code < 500: self.allow_all = True except RequestInterruptedError as err: - logger.warning(f"Request for robots {self.url!r} interrupted: {err!r}. Defaulting to disallow all.") # type: ignore[attr-defined] + logger.warning(f"Request for robots {self.url!r} interrupted: {err!r}. Defaulting to disallow all.") # self.disallow_all = True else: self.parse(response.text.splitlines()) @@ -72,7 +73,7 @@ def read(self) -> None: def parse(self, lines: Iterable[str]) -> None: for line in lines: if line.strip().startswith("#") and set(line.split(" ")) & self._disallow_training_keywords: - self.disallow_training = True + self.disallows_training = True break super().parse(lines) @@ -107,7 +108,7 @@ def crawl_delay(self, useragent: str) -> Optional[float]: def disallows_training(self) -> bool: self.ensure_ready() - return self.robots_file_parser.disallow_training + return self.robots_file_parser.disallows_training def disallow_all(self) -> bool: self.ensure_ready() diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 9abe2c455..ece8f1ba1 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -37,7 +37,6 @@ TypeVar, Union, cast, - overload, ) import dill diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index e79abefd1..1f6f43c40 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -68,7 +68,6 @@ def _req(): except Empty: if __EVENTS__.is_event_set("stop"): logger.debug(f"Interrupt request for {url!r}") - # Raise an exception instead of calling exit() to avoid deadlocks raise RequestInterruptedError(f"Request to {url} was interrupted by stop event") From 3d836288258e60d587e068d4273ada330c9f9fb8 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Thu, 31 Jul 2025 18:04:32 +0200 Subject: [PATCH 03/20] remove code from try --- src/fundus/scraping/session.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index 1f6f43c40..27b584462 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -62,13 +62,14 @@ def _req(): while True: try: response = response_queue.get(timeout=1) - if isinstance(response, Exception): - raise response - return response except Empty: if __EVENTS__.is_event_set("stop"): logger.debug(f"Interrupt request for {url!r}") raise RequestInterruptedError(f"Request to {url} was interrupted by stop event") + else: + if isinstance(response, Exception): + raise response + return response @dataclass From 583b021fbec5e5f676ba08a157e8e492980b0c33 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Thu, 31 Jul 2025 18:07:08 +0200 Subject: [PATCH 04/20] black --- src/fundus/publishers/base_objects.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index 92facaa70..cd1e88c98 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -175,7 +175,7 @@ def __init__( # implementation with https://github.com/seomoz/reppy if suppress_robots: self.robots.robots_file_parser.allow_all = True - + # we define the dict here manually instead of using default dict so that we can control # the order in which sources are proceeded. source_mapping: Dict[Type[URLSource], List[URLSource]] = defaultdict(list) From 8a789129052aff20754fe6b8bddb7566bcbd2267 Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Mon, 4 Aug 2025 12:38:21 +0200 Subject: [PATCH 05/20] use default request header --- src/fundus/publishers/base_objects.py | 8 ++++++-- src/fundus/scraping/html.py | 6 +++--- src/fundus/scraping/session.py | 2 +- src/fundus/scraping/url.py | 4 ++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index cd1e88c98..9a4e3dd84 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -11,7 +11,11 @@ from fundus.logging import create_logger from fundus.parser.base_parser import ParserProxy from fundus.scraping.filter import URLFilter -from fundus.scraping.session import RequestInterruptedError, session_handler +from fundus.scraping.session import ( + RequestInterruptedError, + default_header, + session_handler, +) from fundus.scraping.url import NewsMap, RSSFeed, Sitemap, URLSource from fundus.utils.iteration import iterate_all_subclasses @@ -133,7 +137,7 @@ def __init__( sources: List[URLSource], query_parameter: Optional[Dict[str, str]] = None, url_filter: Optional[URLFilter] = None, - request_header: Optional[Dict[str, str]] = None, + request_header: Optional[Dict[str, str]] = default_header, deprecated: bool = False, disallows_training: bool = False, suppress_robots: bool = False, diff --git a/src/fundus/scraping/html.py b/src/fundus/scraping/html.py index fad3f126a..d77d71a47 100644 --- a/src/fundus/scraping/html.py +++ b/src/fundus/scraping/html.py @@ -19,7 +19,7 @@ from fundus.scraping.filter import URLFilter from fundus.scraping.session import ( RequestInterruptedError, - _default_header, + default_header, session_handler, ) from fundus.scraping.url import URLSource @@ -113,7 +113,7 @@ def __init__( self.url_source = url_source self.publisher = publisher self.url_filter = url_filter - self.request_header = request_header or _default_header + self.request_header = request_header or default_header self.query_parameters = query_parameters or {} if isinstance(url_source, URLSource): url_source.set_header(self.request_header) @@ -231,7 +231,7 @@ class CCNewsSource: def __init__(self, *publishers: Publisher, warc_path: str, headers: Optional[Dict[str, str]] = None): self.publishers = publishers self.warc_path = warc_path - self.headers = headers or _default_header + self.headers = headers or default_header self._publisher_mapping: Dict[str, Publisher] = { urlparse(publisher.domain).netloc: publisher for publisher in self.publishers } diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index 27b584462..3013c1349 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -12,7 +12,7 @@ logger = create_logger(__name__) -_default_header = {"user-agent": "Fundus"} +default_header = {"user-agent": "Fundus"} class RequestInterruptedError(Exception): diff --git a/src/fundus/scraping/url.py b/src/fundus/scraping/url.py index 631c0c9b7..82574b61d 100644 --- a/src/fundus/scraping/url.py +++ b/src/fundus/scraping/url.py @@ -20,7 +20,7 @@ from fundus.scraping.filter import URLFilter, inverse from fundus.scraping.session import ( RequestInterruptedError, - _default_header, + default_header, session_handler, ) @@ -108,7 +108,7 @@ class URLSource(Iterable[str], ABC): def __post_init__(self): if not self._request_header: - self._request_header = _default_header + self._request_header = default_header if not validators.url(self.url, strict_query=False): logger.error(f"{type(self).__name__} initialized with invalid URL {self.url}") From 2896c6158f3b3b8572c5be88c472495aa6c8cdd9 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Mon, 25 Aug 2025 19:34:56 +0200 Subject: [PATCH 06/20] introduce `CrashThread` exception --- src/fundus/publishers/base_objects.py | 9 +-------- src/fundus/scraping/crawler.py | 8 +++----- src/fundus/scraping/html.py | 9 +-------- src/fundus/scraping/session.py | 10 +++++----- src/fundus/scraping/url.py | 16 +--------------- src/fundus/utils/events.py | 4 ++++ 6 files changed, 15 insertions(+), 41 deletions(-) diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index 9a4e3dd84..e84b7d0fa 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -11,11 +11,7 @@ from fundus.logging import create_logger from fundus.parser.base_parser import ParserProxy from fundus.scraping.filter import URLFilter -from fundus.scraping.session import ( - RequestInterruptedError, - default_header, - session_handler, -) +from fundus.scraping.session import default_header, session_handler from fundus.scraping.url import NewsMap, RSSFeed, Sitemap, URLSource from fundus.utils.iteration import iterate_all_subclasses @@ -68,9 +64,6 @@ def read(self) -> None: self.disallow_all = True elif 400 <= err.response.status_code < 500: self.allow_all = True - except RequestInterruptedError as err: - logger.warning(f"Request for robots {self.url!r} interrupted: {err!r}. Defaulting to disallow all.") # - self.disallow_all = True else: self.parse(response.text.splitlines()) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 15a3903f0..8a5e21b30 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -57,7 +57,7 @@ from fundus.scraping.filter import ExtractionFilter, Requires, RequiresAll, URLFilter from fundus.scraping.html import CCNewsSource from fundus.scraping.scraper import CCNewsScraper, WebScraper -from fundus.scraping.session import session_handler +from fundus.scraping.session import CrashThread, session_handler from fundus.scraping.url import URLSource from fundus.utils.events import __EVENTS__ from fundus.utils.timeout import Timeout @@ -183,6 +183,8 @@ def pool_queue_iter(handle: MapResult[Any], queue: Queue[Union[_T, Exception]]) while True: try: if isinstance(nxt := queue.get_nowait(), Exception): + if isinstance(nxt, CrashThread): + return raise Exception("There was an exception occurring in a remote thread/process") from nxt yield nxt except Empty: @@ -212,7 +214,6 @@ def __init__(self, *publishers: PublisherType): raise ValueError("param of must include at least one publisher.") __EVENTS__.alias("main-thread") - __EVENTS__.register_event("stop") @abstractmethod def _build_article_iterator( @@ -492,9 +493,6 @@ def constant_delay() -> float: else: raise TypeError("param of ") - # register default events - __EVENTS__.register_event("stop") - if skip_publisher_disallowing_training and ( publisher.disallows_training or publisher.robots.disallows_training() ): diff --git a/src/fundus/scraping/html.py b/src/fundus/scraping/html.py index d77d71a47..4ccf2dcf8 100644 --- a/src/fundus/scraping/html.py +++ b/src/fundus/scraping/html.py @@ -17,11 +17,7 @@ from fundus.publishers.base_objects import Publisher, Robots from fundus.scraping.delay import Delay from fundus.scraping.filter import URLFilter -from fundus.scraping.session import ( - RequestInterruptedError, - default_header, - session_handler, -) +from fundus.scraping.session import default_header, session_handler from fundus.scraping.url import URLSource from fundus.utils.events import __EVENTS__ @@ -190,9 +186,6 @@ def filter_url(u: str) -> bool: if isinstance(error, HTTPError) and error.response.status_code >= 500: logger.warning(f"Skipped {self.publisher.name!r} due to server errors: {error!r}") continue - except RequestInterruptedError as error: - logger.debug(f"Interrupt request for {url!r} executed. Stopping further requests.") - break except Exception as error: logger.error(f"Warning! Skipped requested URL {url!r} because of an unexpected error {error!r}") continue diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index 3013c1349..e3bd7ecad 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -15,8 +15,8 @@ default_header = {"user-agent": "Fundus"} -class RequestInterruptedError(Exception): - """Raised when a request is interrupted by a stop event.""" +class CrashThread(Exception): + """Is raised to end a thread without relying on the thread ending naturally""" pass @@ -31,7 +31,7 @@ def get_with_interrupt(self, *args, **kwargs) -> requests.Response: This function hands over the request to another thread and checks every second for an interrupt event. If there was an interrupt event, this function raises - a RequestInterruptedError exception. + a CrashThread exception. Args: *args: requests.Session.get(*) arguments. @@ -41,7 +41,7 @@ def get_with_interrupt(self, *args, **kwargs) -> requests.Response: The response. Raises: - RequestInterruptedError: If the request is interrupted by a stop event. + CrashThread: If the request is interrupted by a stop event. """ def _req(): @@ -65,7 +65,7 @@ def _req(): except Empty: if __EVENTS__.is_event_set("stop"): logger.debug(f"Interrupt request for {url!r}") - raise RequestInterruptedError(f"Request to {url} was interrupted by stop event") + raise CrashThread(f"Request to {url} was interrupted by stop event") else: if isinstance(response, Exception): raise response diff --git a/src/fundus/scraping/url.py b/src/fundus/scraping/url.py index 82574b61d..2a03ca8c2 100644 --- a/src/fundus/scraping/url.py +++ b/src/fundus/scraping/url.py @@ -18,11 +18,7 @@ from fundus.logging import create_logger from fundus.parser.utility import generic_nodes_to_text from fundus.scraping.filter import URLFilter, inverse -from fundus.scraping.session import ( - RequestInterruptedError, - default_header, - session_handler, -) +from fundus.scraping.session import default_header, session_handler logger = create_logger(__name__) @@ -145,11 +141,6 @@ def __iter__(self) -> Iterator[str]: except (HTTPError, ConnectionError, ReadTimeout) as err: logger.warning(f"Warning! Couldn't parse rss feed {self.url!r} because of {err}") return - - except RequestInterruptedError: - logger.debug(f"Interrupt request for RSS feed {self.url!r} was executed") - return - except Exception as error: logger.error(f"Warning! Couldn't parse rss feed {self.url!r} because of an unexpected error {error!r}") return @@ -186,11 +177,6 @@ def yield_recursive(sitemap_url: str) -> Iterator[str]: except (HTTPError, ConnectionError, ReadTimeout) as error: logger.warning(f"Warning! Couldn't reach sitemap {sitemap_url!r} because of {error!r}") return - - except RequestInterruptedError: - logger.debug(f"Interrupt request for sitemap {sitemap_url!r} was executed") - return - except Exception as error: logger.error( f"Warning! Couldn't reach sitemap {sitemap_url!r} because of an unexpected error {error!r}" diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index cfff2867c..154b20624 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -6,6 +6,8 @@ logger = create_logger(__name__) +_default_events = ["stop"] + class EventDict: """A thread-safe event dictionary. @@ -24,6 +26,8 @@ def __init__(self): self._events: Dict[int, Dict[str, threading.Event]] = defaultdict(dict) self._aliases: Dict[Any, int] = {} self._lock = threading.Lock() + for event in _default_events: + self.register_event(event) @staticmethod def _get_identifier() -> int: From 3f781455896d28b0fefc33e02247eca3f0cd1f36 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Tue, 26 Aug 2025 14:05:25 +0200 Subject: [PATCH 07/20] introduce `ThreadEventDict` --- src/fundus/utils/events.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index 154b20624..7b2caec4a 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -6,7 +6,22 @@ logger = create_logger(__name__) -_default_events = ["stop"] + +class ThreadEventDict(dict[str, threading.Event]): + """A dictionary that creates threading.Event() objects on demand for certain keys. + This essentially mocks the behavior of defaultdict, but only for certain keys.""" + + _default_events: list[str] = ["stop"] + + def __getitem__(self, item: str) -> threading.Event: + try: + return super().__getitem__(item) + except KeyError as e: + if item in self._default_events: + event = threading.Event() + self[item] = event + return event + raise e class EventDict: @@ -23,11 +38,9 @@ class EventDict: """ def __init__(self): - self._events: Dict[int, Dict[str, threading.Event]] = defaultdict(dict) + self._events: Dict[int, ThreadEventDict] = defaultdict(ThreadEventDict) self._aliases: Dict[Any, int] = {} self._lock = threading.Lock() - for event in _default_events: - self.register_event(event) @staticmethod def _get_identifier() -> int: From d09d9daf71f56fab7ea194fbce5dfe542a9a6a38 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Tue, 26 Aug 2025 14:05:56 +0200 Subject: [PATCH 08/20] move publisher robots verification to background --- src/fundus/scraping/crawler.py | 106 +++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 31 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 8a5e21b30..09c98b780 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -12,10 +12,13 @@ import traceback from abc import ABC, abstractmethod from collections import defaultdict +from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import TimeoutError as FuturesTimeoutError +from concurrent.futures import as_completed from datetime import datetime from functools import lru_cache, partial, wraps from multiprocessing import Manager -from multiprocessing.context import TimeoutError +from multiprocessing.context import TimeoutError as MPTimeoutError from multiprocessing.managers import BaseManager from multiprocessing.pool import MapResult, Pool, ThreadPool from pathlib import Path @@ -69,6 +72,8 @@ PublisherType: TypeAlias = Union[Publisher, PublisherGroup] +_shared_executor = ThreadPoolExecutor(max_workers=10) + class RemoteException(Exception): pass @@ -190,7 +195,7 @@ def pool_queue_iter(handle: MapResult[Any], queue: Queue[Union[_T, Exception]]) except Empty: try: handle.get(timeout=0.1) - except TimeoutError: + except MPTimeoutError: if __EVENTS__.is_event_set("stop"): __EVENTS__.clear_event("stop") break @@ -207,6 +212,45 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: return wrapper +def verify_publishers( + publishers: Tuple["Publisher", ...], + max_workers: Optional[int] = None, +) -> Tuple["Publisher", ...]: + publishers = tuple(publishers) + if not publishers: + return tuple() + + max_workers = max_workers if max_workers is not None else min(len(publishers), 5) + + verified: List["Publisher"] = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_publisher = { + executor.submit(publisher.robots.disallows_training): publisher for publisher in publishers + } + + for future in as_completed(future_to_publisher.keys()): + publisher = future_to_publisher[future] + try: + disallows = future.result() + if not disallows: + verified.append(publisher) + else: + logger.warning(f"Skipping publisher {publisher.name!r} because it disallows training.") + except FuturesTimeoutError: + logger.warning(f"Robots.txt check timed out for {publisher.name!r}", exc_info=False) + except Exception as exc: + logger.warning(f"Could not verify training policy for {publisher.name!r}: {exc}", exc_info=True) + + return tuple(verified) + + +def _async_publisher_verification( + publishers: Tuple["Publisher", ...], + max_workers: Optional[int] = None, +) -> Future[Tuple["Publisher", ...]]: + return _shared_executor.submit(verify_publishers, publishers, max_workers) + + class CrawlerBase(ABC): def __init__(self, *publishers: PublisherType): self.publishers: List[Union[Publisher, FilteredPublisher]] = list(set(more_itertools.collapse(publishers))) @@ -476,7 +520,6 @@ def _fetch_articles( extraction_filter: Optional[ExtractionFilter] = None, url_filter: Optional[URLFilter] = None, language_filter: Optional[List[str]] = None, - skip_publisher_disallowing_training: bool = False, ) -> Iterator[Article]: def build_delay() -> Optional[Delay]: if isinstance(self.delay, float): @@ -493,21 +536,6 @@ def constant_delay() -> float: else: raise TypeError("param of ") - if skip_publisher_disallowing_training and ( - publisher.disallows_training or publisher.robots.disallows_training() - ): - logger.warning( - f"Skipping publisher {publisher.name!r} because it disallows training. " - f"Set to False to include it." - ) - return - if publisher.robots.disallow_all(): - logger.warning( - f"Skipping publisher {publisher.name!r} because it disallows all crawling in robots.txt. " - f"Set to True to include it." - ) - return - scraper = WebScraper( publisher, self.restrict_sources_to, @@ -556,15 +584,27 @@ def _build_article_iterator( extraction_filter: Optional[ExtractionFilter], url_filter: Optional[URLFilter], language_filter: Optional[List[str]], - skip_publisher_disallowing_training: bool = False, + skip_publishers_disallowing_training: bool = False, ) -> Iterator[Article]: + if skip_publishers_disallowing_training: + verified_publishers_future = _async_publisher_verification( + publishers, max_workers=1 if not self.threading else None + ) + + try: + verified_publishers = verified_publishers_future.result(timeout=30) + except FuturesTimeoutError: + logger.warning("Publisher verification timed out, proceeding with all publishers") + verified_publishers = publishers + + publishers = verified_publishers + _shared_executor.shutdown(wait=False) article_task = partial( self._fetch_articles, error_handling=error_handling, extraction_filter=extraction_filter, url_filter=url_filter, language_filter=language_filter, - skip_publisher_disallowing_training=skip_publisher_disallowing_training, ) if self.threading: @@ -632,19 +672,10 @@ def _fetch_articles( extraction_filter: Optional[ExtractionFilter] = None, url_filter: Optional[URLFilter] = None, language_filter: Optional[List[str]] = None, - skip_publishers_disallowing_training: bool = False, bar: Optional[tqdm] = None, ) -> Iterator[Article]: retries: int = 0 while True: - if skip_publishers_disallowing_training: - publishers = tuple( - [ - publisher - for publisher in publishers - if not (publisher.disallows_training or publisher.robots.disallows_training()) - ] - ) source = CCNewsSource(*publishers, warc_path=warc_path) scraper = CCNewsScraper(source) try: @@ -778,7 +809,21 @@ def _build_article_iterator( skip_publishers_disallowing_training: bool = False, **kwargs, ) -> Iterator[Article]: - warc_paths = tuple(self._get_warc_paths()) + if skip_publishers_disallowing_training: + verified_publishers_future = _async_publisher_verification(publishers, max_workers=self.processes) + + warc_paths = tuple(self._get_warc_paths()) + + try: + verified_publishers = verified_publishers_future.result(timeout=30) + except FuturesTimeoutError: + logger.warning("Publisher verification timed out, proceeding with all publishers") + verified_publishers = publishers + + publishers = verified_publishers + else: + warc_paths = tuple(self._get_warc_paths()) + _shared_executor.shutdown(wait=False) with get_proxy_tqdm(total=len(warc_paths), desc="Process WARC files", disable=self.disable_tqdm) as bar: article_task = partial( @@ -788,7 +833,6 @@ def _build_article_iterator( extraction_filter=extraction_filter, url_filter=url_filter, language_filter=language_filter, - skip_publishers_disallowing_training=skip_publishers_disallowing_training, bar=bar, ) From 3ad9a63e9851daf80d406a5db775f078032d8244 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Tue, 26 Aug 2025 16:27:17 +0200 Subject: [PATCH 09/20] fix typing --- src/fundus/utils/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index 7b2caec4a..a2c0254a7 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -7,7 +7,7 @@ logger = create_logger(__name__) -class ThreadEventDict(dict[str, threading.Event]): +class ThreadEventDict(Dict[str, threading.Event]): """A dictionary that creates threading.Event() objects on demand for certain keys. This essentially mocks the behavior of defaultdict, but only for certain keys.""" From ca44d3347cb6fb31885f4e7359a066bac14ec1e1 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Tue, 26 Aug 2025 16:29:41 +0200 Subject: [PATCH 10/20] fix typing --- src/fundus/utils/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index a2c0254a7..9e1ed1f4a 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -1,6 +1,6 @@ import threading from collections import defaultdict -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union from fundus.logging import create_logger @@ -11,7 +11,7 @@ class ThreadEventDict(Dict[str, threading.Event]): """A dictionary that creates threading.Event() objects on demand for certain keys. This essentially mocks the behavior of defaultdict, but only for certain keys.""" - _default_events: list[str] = ["stop"] + _default_events: List[str] = ["stop"] def __getitem__(self, item: str) -> threading.Event: try: From f52c6c484514862d5976d71200d50bce7988bf44 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Wed, 24 Sep 2025 18:55:59 +0200 Subject: [PATCH 11/20] rename `default_header` to match master --- src/fundus/publishers/base_objects.py | 4 ++-- src/fundus/scraping/html.py | 6 +++--- src/fundus/scraping/session.py | 2 +- src/fundus/scraping/url.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index e00459a5e..03830f4be 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -11,7 +11,7 @@ from fundus.logging import create_logger from fundus.parser.base_parser import ParserProxy from fundus.scraping.filter import URLFilter -from fundus.scraping.session import default_header, session_handler +from fundus.scraping.session import _default_header, session_handler from fundus.scraping.url import NewsMap, RSSFeed, Sitemap, URLSource from fundus.utils.iteration import iterate_all_subclasses @@ -130,7 +130,7 @@ def __init__( sources: List[URLSource], query_parameter: Optional[Dict[str, str]] = None, url_filter: Optional[URLFilter] = None, - request_header: Optional[Dict[str, str]] = default_header, + request_header: Optional[Dict[str, str]] = _default_header, deprecated: bool = False, disallows_training: bool = False, suppress_robots: bool = False, diff --git a/src/fundus/scraping/html.py b/src/fundus/scraping/html.py index 4ccf2dcf8..6a17fdccb 100644 --- a/src/fundus/scraping/html.py +++ b/src/fundus/scraping/html.py @@ -17,7 +17,7 @@ from fundus.publishers.base_objects import Publisher, Robots from fundus.scraping.delay import Delay from fundus.scraping.filter import URLFilter -from fundus.scraping.session import default_header, session_handler +from fundus.scraping.session import _default_header, session_handler from fundus.scraping.url import URLSource from fundus.utils.events import __EVENTS__ @@ -109,7 +109,7 @@ def __init__( self.url_source = url_source self.publisher = publisher self.url_filter = url_filter - self.request_header = request_header or default_header + self.request_header = request_header or _default_header self.query_parameters = query_parameters or {} if isinstance(url_source, URLSource): url_source.set_header(self.request_header) @@ -224,7 +224,7 @@ class CCNewsSource: def __init__(self, *publishers: Publisher, warc_path: str, headers: Optional[Dict[str, str]] = None): self.publishers = publishers self.warc_path = warc_path - self.headers = headers or default_header + self.headers = headers or _default_header self._publisher_mapping: Dict[str, Publisher] = { urlparse(publisher.domain).netloc: publisher for publisher in self.publishers } diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index c5000441e..f2b377beb 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -12,7 +12,7 @@ logger = create_logger(__name__) -default_header = {"user-agent": "Fundus/2.0 (contact: github.com/flairnlp/fundus)"} +_default_header = {"user-agent": "Fundus/2.0 (contact: github.com/flairnlp/fundus)"} class CrashThread(Exception): diff --git a/src/fundus/scraping/url.py b/src/fundus/scraping/url.py index 1e2da2ed7..9989bf01b 100644 --- a/src/fundus/scraping/url.py +++ b/src/fundus/scraping/url.py @@ -16,7 +16,7 @@ from fundus.logging import create_logger from fundus.scraping.filter import URLFilter, inverse -from fundus.scraping.session import default_header, session_handler +from fundus.scraping.session import _default_header, session_handler logger = create_logger(__name__) @@ -102,7 +102,7 @@ class URLSource(Iterable[str], ABC): def __post_init__(self): if not self._request_header: - self._request_header = default_header + self._request_header = _default_header if not validators.url(self.url, strict_query=False): logger.error(f"{type(self).__name__} initialized with invalid URL {self.url}") From 7516db50fafdef00918361054ccfcdc525dbc207 Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Wed, 15 Oct 2025 14:28:27 +0200 Subject: [PATCH 12/20] improve exception handling by inheriting from `BaseException` add functionality to silence `Exceptions` --- src/fundus/scraping/crawler.py | 11 +++++++++-- src/fundus/scraping/session.py | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 09c98b780..3a0a248db 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -139,12 +139,17 @@ def get_execution_context(): return thread.name, thread.ident -def queue_wrapper(queue: Queue[Union[_T, Exception]], target: Callable[_P, Iterator[_T]]) -> Callable[_P, None]: +def queue_wrapper( + queue: Queue[Union[_T, Exception]], + target: Callable[_P, Iterator[_T]], + silenced_exceptions: Tuple[Type[BaseException], ...] = (), +) -> Callable[_P, None]: """Wraps the target callable to add its results to the queue instead of returning them directly. Args: queue: The buffer queue. target: A target callable. + silenced_exceptions: Exception types that should be silenced Returns: (Callable[_P, None]) The wrapped target. @@ -155,6 +160,8 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: try: for obj in target(*args, **kwargs): queue.put(obj) + except silenced_exceptions: + pass except Exception as err: tb_str = "".join(traceback.TracebackException.from_exception(err).format()) context, ident = get_execution_context() @@ -562,7 +569,7 @@ def _threaded_crawl( self, publishers: Tuple[Publisher, ...], article_task: Callable[[Publisher], Iterator[Article]] ) -> Iterator[Article]: result_queue: Queue[Union[Article, Exception]] = Queue(len(publishers)) - wrapped_article_task = queue_wrapper(result_queue, article_task) + wrapped_article_task = queue_wrapper(result_queue, article_task, silenced_exceptions=(CrashThread,)) pool = ThreadPool(processes=len(publishers) or None) try: with session_handler.context( diff --git a/src/fundus/scraping/session.py b/src/fundus/scraping/session.py index f2b377beb..c889da5cf 100644 --- a/src/fundus/scraping/session.py +++ b/src/fundus/scraping/session.py @@ -15,7 +15,7 @@ _default_header = {"user-agent": "Fundus/2.0 (contact: github.com/flairnlp/fundus)"} -class CrashThread(Exception): +class CrashThread(BaseException): """Is raised to end a thread without relying on the thread ending naturally""" pass From df9aeadeebab1492caef8bfe3d1118e5f7fd81ee Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Wed, 15 Oct 2025 14:31:09 +0200 Subject: [PATCH 13/20] fix a race condition where events where aliased after the main-thread crashed --- src/fundus/scraping/crawler.py | 9 ++++++++- src/fundus/scraping/scraper.py | 2 -- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 3a0a248db..9ddf60678 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -429,7 +429,9 @@ def callback() -> None: skip_publishers_disallowing_training, ): if max_articles_per_publisher and article_count[article.publisher] == max_articles_per_publisher: - if isinstance(self, Crawler) and not __EVENTS__.is_event_set("stop", article.publisher): + if (isinstance(self, Crawler) and self.threading) and not __EVENTS__.is_event_set( + "stop", article.publisher + ): __EVENTS__.set_event("stop", article.publisher) if sum(article_count.values()) == len(self.publishers) * max_articles_per_publisher: break @@ -543,6 +545,11 @@ def constant_delay() -> float: else: raise TypeError("param of ") + # we "register" the thread in the event dict as soon as possible to avoid that a + # thread crashes before + if self.threading: + __EVENTS__.alias(publisher.name) + scraper = WebScraper( publisher, self.restrict_sources_to, diff --git a/src/fundus/scraping/scraper.py b/src/fundus/scraping/scraper.py index e903b0c29..6dfb973fb 100644 --- a/src/fundus/scraping/scraper.py +++ b/src/fundus/scraping/scraper.py @@ -107,8 +107,6 @@ def __init__( parser_mapping: Dict[str, ParserProxy] = {publisher.name: publisher.parser} super().__init__(*html_sources, parser_mapping=parser_mapping) - __EVENTS__.alias(publisher.name) - class CCNewsScraper(BaseScraper): def __init__(self, source: CCNewsSource): From 17a286e47129cda29653cd5d93bb9bceb28bec8b Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Wed, 15 Oct 2025 14:38:21 +0200 Subject: [PATCH 14/20] fix a bug where no events where registered when aliasing; use bidirectional events mapping --- src/fundus/scraping/crawler.py | 7 ++-- src/fundus/utils/events.py | 65 ++++++++++++++++++++++------------ 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 9ddf60678..7e854618d 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -167,11 +167,12 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: context, ident = get_execution_context() queue.put( RemoteException( - f"There was a(n) {type(err).__name__!r} occurring in {context} with ident {ident}\n{tb_str}" + f"There was a(n) {type(err).__name__!r} occurring in {context} " + f"with ident {ident} ({__EVENTS__.get_alias(ident)})\n{tb_str}" ) ) - logger.debug(f"Encountered remote exception: {err!r}") + logger.debug(f"Encountered remote exception in thread {ident} ({__EVENTS__.get_alias(ident)}): {err!r}") # prevents a race condition where the ThreadPool shuts down before the exception is pulled from the queue time.sleep(0.2) @@ -195,8 +196,6 @@ def pool_queue_iter(handle: MapResult[Any], queue: Queue[Union[_T, Exception]]) while True: try: if isinstance(nxt := queue.get_nowait(), Exception): - if isinstance(nxt, CrashThread): - return raise Exception("There was an exception occurring in a remote thread/process") from nxt yield nxt except Empty: diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index 9e1ed1f4a..35cafbad9 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -1,17 +1,23 @@ import threading from collections import defaultdict -from typing import Any, Dict, List, Optional, Union +from typing import Dict, List, Optional, Union + +from bidict import bidict from fundus.logging import create_logger logger = create_logger(__name__) +__DEFAULT_EVENTS__: List[str] = ["stop"] + class ThreadEventDict(Dict[str, threading.Event]): """A dictionary that creates threading.Event() objects on demand for certain keys. This essentially mocks the behavior of defaultdict, but only for certain keys.""" - _default_events: List[str] = ["stop"] + def __init__(self, default_events: Optional[List[str]] = None): + super().__init__() + self._default_events = default_events or [] def __getitem__(self, item: str) -> threading.Event: try: @@ -37,10 +43,10 @@ class EventDict: "BR" to the thread's identifier. """ - def __init__(self): - self._events: Dict[int, ThreadEventDict] = defaultdict(ThreadEventDict) - self._aliases: Dict[Any, int] = {} - self._lock = threading.Lock() + def __init__(self, default_events: Optional[List[str]] = None): + self._events: Dict[int, ThreadEventDict] = defaultdict(lambda: ThreadEventDict(default_events)) + self._aliases: bidict[str, int] = bidict() + self._lock = threading.RLock() @staticmethod def _get_identifier() -> int: @@ -63,8 +69,18 @@ def _resolve(self, key: Union[int, str, None]) -> int: return key return self._aliases[key] + def _pretty_resolve(self, key: Union[int, str, None]) -> str: + resolved = self._resolve(key) + alias = f" ({self._aliases.inv[resolved]})" if resolved in self._aliases.values() else "" + return f"{resolved:<6}{alias}" + def _alias(self, alias: str, key: Optional[int] = None): self._aliases[alias] = key if key else self._get_identifier() + if (ident := self._resolve(alias)) not in self._events: + # noinspection PyStatementEffect + # Since defaultdict doesn't provide a direct way to create defaults, + # we simulate it by accessing the key. + self._events[ident] logger.debug(f"Registered alias {alias} -> {self._aliases[alias]}") def register_event(self, event: str, key: Union[int, str, None] = None): @@ -73,17 +89,17 @@ def register_event(self, event: str, key: Union[int, str, None] = None): self._alias(key) if (resolved := self._resolve(key)) not in self._events: self._events[resolved][event] = threading.Event() - logger.debug(f"Registered event {event!r} for {resolved}") + logger.debug(f"Registered event {event!r} for {self._pretty_resolve(key)}") def set_event(self, event: str, key: Union[int, str, None] = None): with self._lock: self._events[self._resolve(key)][event].set() - logger.debug(f"Set event {event!r} for {self._resolve(key)}") + logger.debug(f"Set event {event!r} for {self._pretty_resolve(key)}") def clear_event(self, event: str, key: Union[int, str, None] = None): with self._lock: self._events[self._resolve(key)][event].clear() - logger.debug(f"Cleared event {event!r} for {self._resolve(key)}") + logger.debug(f"Cleared event {event!r} for {self._pretty_resolve(key)}") def set_for_all(self, event: Optional[str] = None): """Set for all registered keys @@ -96,12 +112,13 @@ def set_for_all(self, event: Optional[str] = None): None """ with self._lock: - for events in self._events.values(): - if event is not None and event in events: - events[event].set() - else: - for flag in events.values(): - flag.set() + if event is None: + for ident, events in self._events.items(): + for name in events: + self.set_event(name, ident) + else: + for ident in self._events: + self.set_event(event, ident) def clear_for_all(self, event: Optional[str] = None): """Clear for all registered keys @@ -114,12 +131,13 @@ def clear_for_all(self, event: Optional[str] = None): None """ with self._lock: - for events in self._events.values(): - if event is not None and event in events: - events[event].clear() - else: - for flag in events.values(): - flag.clear() + if event is None: + for ident, events in self._events.items(): + for name in events: + self.clear_event(name, ident) + else: + for ident in self._events: + self.clear_event(event, ident) def is_event_set(self, event: str, key: Union[int, str, None] = None) -> bool: with self._lock: @@ -129,6 +147,9 @@ def alias(self, alias: str, key: Optional[int] = None): with self._lock: self._alias(alias, key) + def get_alias(self, ident: int) -> str: + return self._aliases.inv[ident] + def remove_alias(self, alias: str): with self._lock: self._aliases.pop(alias, None) @@ -138,4 +159,4 @@ def get(self, event: str, key: Optional[Union[int, str, None]] = None) -> thread return self._events[self._resolve(key)][event] -__EVENTS__: EventDict = EventDict() +__EVENTS__: EventDict = EventDict(default_events=__DEFAULT_EVENTS__) From d3c79f70b2e28ef22470d098d8fcae14c36fb975 Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Wed, 15 Oct 2025 14:42:24 +0200 Subject: [PATCH 15/20] add autogenerated doc strings --- src/fundus/utils/events.py | 179 +++++++++++++++++++++++++++++++------ 1 file changed, 154 insertions(+), 25 deletions(-) diff --git a/src/fundus/utils/events.py b/src/fundus/utils/events.py index 35cafbad9..f82993c2d 100644 --- a/src/fundus/utils/events.py +++ b/src/fundus/utils/events.py @@ -13,13 +13,43 @@ class ThreadEventDict(Dict[str, threading.Event]): """A dictionary that creates threading.Event() objects on demand for certain keys. - This essentially mocks the behavior of defaultdict, but only for certain keys.""" + + This class behaves like a standard dictionary but automatically creates + `threading.Event` objects when specific keys (provided via `default_events`) + are accessed. This is similar to `defaultdict`, but the auto-creation only + applies to those specific keys. + + Attributes: + _default_events (List[str]): List of event names for which Events will be auto-created. + """ def __init__(self, default_events: Optional[List[str]] = None): + """ + Initialize a new ThreadEventDict. + + Args: + default_events: A list of event names for which Event objects + should be automatically created when accessed. + """ super().__init__() self._default_events = default_events or [] def __getitem__(self, item: str) -> threading.Event: + """ + Get the Event associated with the given item. + + If the key does not exist and is in `_default_events`, a new + `threading.Event` is created, stored, and returned. + + Args: + item: The event name to retrieve. + + Returns: + threading.Event: The event associated with the key. + + Raises: + KeyError: If the key is not present and not in `_default_events`. + """ try: return super().__getitem__(item) except KeyError as e: @@ -31,37 +61,53 @@ def __getitem__(self, item: str) -> threading.Event: class EventDict: - """A thread-safe event dictionary. + """A thread-safe event registry for managing thread-local events with optional aliases. + + This class maintains per-thread event dictionaries, allowing threads to + register, set, and clear named `threading.Event` objects in an isolated + and synchronized manner. - Events are registered by name and stored per thread in a dictionary, using the - thread's identifier as the key. For example, calling `register_event("stop")` - registers a "stop" event for the current thread's identifier. + Aliases can be assigned to thread identifiers for convenience. Each alias + maps uniquely to a thread ID, allowing event access via human-readable names. - To enhance usability, threads can be assigned aliases. Calling - `register_event("stop", "BR")` registers the "stop" event (if it is not already - registered) for the current thread and automatically creates an alias mapping - "BR" to the thread's identifier. + Attributes: + _events (Dict[int, ThreadEventDict]): Mapping of thread IDs to their events. + _aliases (bidict[str, int]): Bidirectional mapping of aliases to thread IDs. + _lock (threading.RLock): A re-entrant lock to ensure thread safety. """ def __init__(self, default_events: Optional[List[str]] = None): + """ + Initialize a new EventDict. + + Args: + default_events: A list of event names that are automatically available + for all threads (e.g., ["stop"]). + """ self._events: Dict[int, ThreadEventDict] = defaultdict(lambda: ThreadEventDict(default_events)) self._aliases: bidict[str, int] = bidict() self._lock = threading.RLock() @staticmethod def _get_identifier() -> int: + """ + Get the current thread's unique identifier. + + Returns: + int: The current thread's identifier. + """ return threading.get_ident() def _resolve(self, key: Union[int, str, None]) -> int: - """Resolves a given key to a thread identifier + """Resolve a key (thread ID, alias, or None) to a thread identifier. - Should only be used within a Lock! + Should only be called while holding the internal lock. Args: - key: Key to resolve + key: The key to resolve. May be a thread ID, alias, or None. Returns: - Resolved thread identifier + int: The resolved thread identifier. """ if key is None: return self._get_identifier() @@ -70,11 +116,32 @@ def _resolve(self, key: Union[int, str, None]) -> int: return self._aliases[key] def _pretty_resolve(self, key: Union[int, str, None]) -> str: + """ + Resolve a key to a human-readable identifier string, including alias if available. + + Should only be called while holding the internal lock. + + Args: + key: Thread ID, alias, or None. + + Returns: + str: A formatted string of the form " (alias)". + """ resolved = self._resolve(key) alias = f" ({self._aliases.inv[resolved]})" if resolved in self._aliases.values() else "" return f"{resolved:<6}{alias}" def _alias(self, alias: str, key: Optional[int] = None): + """ + Register an alias for a given thread identifier. + + Should only be called while holding the internal lock. + + Args: + alias: The alias to assign. + key: The thread identifier to associate with this alias. + If None, the current thread's identifier is used. + """ self._aliases[alias] = key if key else self._get_identifier() if (ident := self._resolve(alias)) not in self._events: # noinspection PyStatementEffect @@ -84,6 +151,15 @@ def _alias(self, alias: str, key: Optional[int] = None): logger.debug(f"Registered alias {alias} -> {self._aliases[alias]}") def register_event(self, event: str, key: Union[int, str, None] = None): + """ + Register a new event for the specified thread or alias. + + If the alias does not exist, it is automatically created. + + Args: + event: The name of the event to register. + key: Thread ID, alias, or None (defaults to the current thread). + """ with self._lock: if isinstance(key, str) and key not in self._aliases: self._alias(key) @@ -92,24 +168,36 @@ def register_event(self, event: str, key: Union[int, str, None] = None): logger.debug(f"Registered event {event!r} for {self._pretty_resolve(key)}") def set_event(self, event: str, key: Union[int, str, None] = None): + """ + Set (trigger) an event for the specified thread. + + Args: + event: The name of the event to set. + key: Thread ID, alias, or None (defaults to the current thread). + """ with self._lock: self._events[self._resolve(key)][event].set() logger.debug(f"Set event {event!r} for {self._pretty_resolve(key)}") def clear_event(self, event: str, key: Union[int, str, None] = None): + """ + Clear (reset) an event for the specified thread. + + Args: + event: The name of the event to clear. + key: Thread ID, alias, or None (defaults to the current thread). + """ with self._lock: self._events[self._resolve(key)][event].clear() logger.debug(f"Cleared event {event!r} for {self._pretty_resolve(key)}") def set_for_all(self, event: Optional[str] = None): - """Set for all registered keys + """Set an event for all registered threads. - If is None, all events for every registered key will be set. - Args: - event: The event to set. Defaults to None. + If `event` is None, all events for every registered thread are set. - Returns: - None + Args: + event: The event name to set. If None, all events are set. """ with self._lock: if event is None: @@ -121,14 +209,12 @@ def set_for_all(self, event: Optional[str] = None): self.set_event(event, ident) def clear_for_all(self, event: Optional[str] = None): - """Clear for all registered keys + """Clear an event for all registered threads. - If is None, all events for every registered key will be cleared. - Args: - event: The event to clear. Defaults to None. + If `event` is None, all events for every registered thread are cleared. - Returns: - None + Args: + event: The event name to clear. If None, all events are cleared. """ with self._lock: if event is None: @@ -140,21 +226,64 @@ def clear_for_all(self, event: Optional[str] = None): self.clear_event(event, ident) def is_event_set(self, event: str, key: Union[int, str, None] = None) -> bool: + """ + Check if a specific event is set for a given thread. + + Args: + event: The name of the event to check. + key: Thread ID, alias, or None (defaults to the current thread). + + Returns: + bool: True if the event is set, False otherwise. + """ with self._lock: return self._events[self._resolve(key)][event].is_set() def alias(self, alias: str, key: Optional[int] = None): + """ + Public wrapper to register an alias for a thread. + + Args: + alias: The alias name to register. + key: Optional thread identifier to associate with the alias. + Defaults to the current thread if not provided. + """ with self._lock: self._alias(alias, key) def get_alias(self, ident: int) -> str: + """ + Get the alias associated with a thread identifier. + + Args: + ident: The thread identifier. + + Returns: + str: The alias associated with the identifier. + """ return self._aliases.inv[ident] def remove_alias(self, alias: str): + """ + Remove an alias from the alias mapping. + + Args: + alias: The alias to remove. + """ with self._lock: self._aliases.pop(alias, None) def get(self, event: str, key: Optional[Union[int, str, None]] = None) -> threading.Event: + """ + Get the event object associated with the given event name and thread. + + Args: + event: The name of the event to retrieve. + key: Thread ID, alias, or None (defaults to the current thread). + + Returns: + threading.Event: The event object. + """ with self._lock: return self._events[self._resolve(key)][event] From f6a9c8b0154986b887cf0bf8c56316fcb52dedab Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Wed, 15 Oct 2025 15:21:48 +0200 Subject: [PATCH 16/20] add missing `bidict` dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 004e979df..5265b6fa1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "dill>=0.3, <1", "dict2xml>=1.7.6, <2", "xmltodict>=0.13.0, <1", + "bidict>=0.23, <1" ] [project.urls] From d3d7a64da02938ac3311c3eea5dab2427cc12ee1 Mon Sep 17 00:00:00 2001 From: Adrian Breiding Date: Thu, 16 Oct 2025 00:24:37 +0200 Subject: [PATCH 17/20] simplify ThreadPoolExecutor Usage --- src/fundus/scraping/crawler.py | 92 ++++++++++++---------------------- 1 file changed, 31 insertions(+), 61 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 09c98b780..434f0141f 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -12,7 +12,7 @@ import traceback from abc import ABC, abstractmethod from collections import defaultdict -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor from concurrent.futures import TimeoutError as FuturesTimeoutError from concurrent.futures import as_completed from datetime import datetime @@ -72,8 +72,6 @@ PublisherType: TypeAlias = Union[Publisher, PublisherGroup] -_shared_executor = ThreadPoolExecutor(max_workers=10) - class RemoteException(Exception): pass @@ -212,45 +210,6 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: return wrapper -def verify_publishers( - publishers: Tuple["Publisher", ...], - max_workers: Optional[int] = None, -) -> Tuple["Publisher", ...]: - publishers = tuple(publishers) - if not publishers: - return tuple() - - max_workers = max_workers if max_workers is not None else min(len(publishers), 5) - - verified: List["Publisher"] = [] - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_publisher = { - executor.submit(publisher.robots.disallows_training): publisher for publisher in publishers - } - - for future in as_completed(future_to_publisher.keys()): - publisher = future_to_publisher[future] - try: - disallows = future.result() - if not disallows: - verified.append(publisher) - else: - logger.warning(f"Skipping publisher {publisher.name!r} because it disallows training.") - except FuturesTimeoutError: - logger.warning(f"Robots.txt check timed out for {publisher.name!r}", exc_info=False) - except Exception as exc: - logger.warning(f"Could not verify training policy for {publisher.name!r}: {exc}", exc_info=True) - - return tuple(verified) - - -def _async_publisher_verification( - publishers: Tuple["Publisher", ...], - max_workers: Optional[int] = None, -) -> Future[Tuple["Publisher", ...]]: - return _shared_executor.submit(verify_publishers, publishers, max_workers) - - class CrawlerBase(ABC): def __init__(self, *publishers: PublisherType): self.publishers: List[Union[Publisher, FilteredPublisher]] = list(set(more_itertools.collapse(publishers))) @@ -520,7 +479,14 @@ def _fetch_articles( extraction_filter: Optional[ExtractionFilter] = None, url_filter: Optional[URLFilter] = None, language_filter: Optional[List[str]] = None, + skip_publishers_disallowing_training: bool = False, ) -> Iterator[Article]: + if skip_publishers_disallowing_training and ( + publisher.disallows_training or publisher.robots.disallows_training() + ): + logger.info(f"Skipping publisher {publisher.name} because it disallows training.") + return + def build_delay() -> Optional[Delay]: if isinstance(self.delay, float): delay = self.delay @@ -586,25 +552,13 @@ def _build_article_iterator( language_filter: Optional[List[str]], skip_publishers_disallowing_training: bool = False, ) -> Iterator[Article]: - if skip_publishers_disallowing_training: - verified_publishers_future = _async_publisher_verification( - publishers, max_workers=1 if not self.threading else None - ) - - try: - verified_publishers = verified_publishers_future.result(timeout=30) - except FuturesTimeoutError: - logger.warning("Publisher verification timed out, proceeding with all publishers") - verified_publishers = publishers - - publishers = verified_publishers - _shared_executor.shutdown(wait=False) article_task = partial( self._fetch_articles, error_handling=error_handling, extraction_filter=extraction_filter, url_filter=url_filter, language_filter=language_filter, + skip_publishers_disallowing_training=skip_publishers_disallowing_training, ) if self.threading: @@ -810,20 +764,36 @@ def _build_article_iterator( **kwargs, ) -> Iterator[Article]: if skip_publishers_disallowing_training: - verified_publishers_future = _async_publisher_verification(publishers, max_workers=self.processes) - + max_workers = self.processes if self.processes > 0 else min(len(publishers), 5) + verified_publishers: List["Publisher"] = [] + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_publisher = { + executor.submit( + lambda: publisher.disallows_training or publisher.robots.disallows_training() + ): publisher + for publisher in publishers + } warc_paths = tuple(self._get_warc_paths()) try: - verified_publishers = verified_publishers_future.result(timeout=30) + for future in as_completed(future_to_publisher.keys(), timeout=30): + publisher = future_to_publisher[future] + try: + if not future.result(): + verified_publishers.append(publisher) + else: + logger.warning(f"Skipping publisher {publisher.name!r} because it disallows training.") + except FuturesTimeoutError: + logger.warning(f"Robots.txt check timed out for {publisher.name!r}", exc_info=False) + except Exception as exc: + logger.warning(f"Could not verify training policy for {publisher.name!r}: {exc}", exc_info=True) + publishers = tuple(verified_publishers) except FuturesTimeoutError: logger.warning("Publisher verification timed out, proceeding with all publishers") - verified_publishers = publishers - publishers = verified_publishers else: warc_paths = tuple(self._get_warc_paths()) - _shared_executor.shutdown(wait=False) with get_proxy_tqdm(total=len(warc_paths), desc="Process WARC files", disable=self.disable_tqdm) as bar: article_task = partial( From 847e0500ff68b8a5ef8c32db8c2199f1408ae7ca Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Tue, 21 Oct 2025 18:06:52 +0200 Subject: [PATCH 18/20] fix lambda expression and indentation --- src/fundus/scraping/crawler.py | 43 ++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 434f0141f..97155229b 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -767,30 +767,33 @@ def _build_article_iterator( max_workers = self.processes if self.processes > 0 else min(len(publishers), 5) verified_publishers: List["Publisher"] = [] + def run_disallow_training(publisher: Publisher) -> bool: + return publisher.disallows_training or publisher.robots.disallows_training() + with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_publisher = { - executor.submit( - lambda: publisher.disallows_training or publisher.robots.disallows_training() - ): publisher - for publisher in publishers + executor.submit(run_disallow_training, publisher=publisher): publisher for publisher in publishers } - warc_paths = tuple(self._get_warc_paths()) - try: - for future in as_completed(future_to_publisher.keys(), timeout=30): - publisher = future_to_publisher[future] - try: - if not future.result(): - verified_publishers.append(publisher) - else: - logger.warning(f"Skipping publisher {publisher.name!r} because it disallows training.") - except FuturesTimeoutError: - logger.warning(f"Robots.txt check timed out for {publisher.name!r}", exc_info=False) - except Exception as exc: - logger.warning(f"Could not verify training policy for {publisher.name!r}: {exc}", exc_info=True) - publishers = tuple(verified_publishers) - except FuturesTimeoutError: - logger.warning("Publisher verification timed out, proceeding with all publishers") + warc_paths = tuple(self._get_warc_paths()) + + try: + for future in as_completed(future_to_publisher.keys(), timeout=30): + publisher = future_to_publisher[future] + try: + if not future.result(): + verified_publishers.append(publisher) + else: + logger.warning(f"Skipping publisher {publisher.name!r} because it disallows training.") + except FuturesTimeoutError: + logger.warning(f"Robots.txt check timed out for {publisher.name!r}", exc_info=False) + except Exception as exc: + logger.warning( + f"Could not verify training policy for {publisher.name!r}: {exc}", exc_info=True + ) + publishers = tuple(verified_publishers) + except FuturesTimeoutError: + logger.warning("Publisher verification timed out, proceeding with all publishers") else: warc_paths = tuple(self._get_warc_paths()) From b0cc0fa3a26b8c53395431aa690c5807be17fb30 Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Tue, 21 Oct 2025 18:23:09 +0200 Subject: [PATCH 19/20] add `disallow_training` as property --- src/fundus/publishers/base_objects.py | 6 +++++- src/fundus/scraping/crawler.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/fundus/publishers/base_objects.py b/src/fundus/publishers/base_objects.py index 03830f4be..404ca6927 100644 --- a/src/fundus/publishers/base_objects.py +++ b/src/fundus/publishers/base_objects.py @@ -165,7 +165,7 @@ def __init__( url=self.domain + "robots.txt" if self.domain.endswith("/") else self.domain + "/robots.txt", headers=self.request_header, ) - self.disallows_training = disallows_training + self._disallows_training = disallows_training # Temporary fix to compensate for a bug in the RobotsFileParser treating rule lines # like /? as / disallowing the entire site. we could think about replacing the urllib @@ -187,6 +187,10 @@ def __init__( self._source_mapping = dict(sorted(source_mapping.items(), key=lambda item: self.__SOURCE_ORDER__[item[0]])) + @property + def disallows_training(self) -> bool: + return self._disallows_training or self.robots.disallows_training() + @property def source_mapping(self) -> Dict[Type[URLSource], List[URLSource]]: return self._source_mapping diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index 97155229b..b23d0af90 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -768,7 +768,7 @@ def _build_article_iterator( verified_publishers: List["Publisher"] = [] def run_disallow_training(publisher: Publisher) -> bool: - return publisher.disallows_training or publisher.robots.disallows_training() + return publisher.disallows_training with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_publisher = { From 941e8169509844b728605376d574c4772a8fc3cc Mon Sep 17 00:00:00 2001 From: Max Dallabetta Date: Fri, 24 Oct 2025 10:30:27 +0200 Subject: [PATCH 20/20] add missing documentation and replace condition with `disallows_training` property --- src/fundus/scraping/crawler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/fundus/scraping/crawler.py b/src/fundus/scraping/crawler.py index fa0d6c739..63a37d348 100644 --- a/src/fundus/scraping/crawler.py +++ b/src/fundus/scraping/crawler.py @@ -489,9 +489,7 @@ def _fetch_articles( language_filter: Optional[List[str]] = None, skip_publishers_disallowing_training: bool = False, ) -> Iterator[Article]: - if skip_publishers_disallowing_training and ( - publisher.disallows_training or publisher.robots.disallows_training() - ): + if skip_publishers_disallowing_training and publisher.disallows_training: logger.info(f"Skipping publisher {publisher.name} because it disallows training.") return @@ -510,8 +508,8 @@ def constant_delay() -> float: else: raise TypeError("param of ") - # we "register" the thread in the event dict as soon as possible to avoid that a - # thread crashes before + # we "register" the thread in the event dict as soon as possible to avoid that a thread is registered + # after the pool already is shutting down if self.threading: __EVENTS__.alias(publisher.name)