Skip to content

Commit b7febb3

Browse files
sarahyurickayushdg
andauthored
Fuse document iterate and extract stages (#1458)
* Fuse document iterate and extract stages Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix bug Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * update docs and tutorial Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * save progress Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * update more tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * update benchmark Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * move class Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add missing import Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * update comment Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com>
1 parent c2c626c commit b7febb3

File tree

14 files changed

+402
-634
lines changed

14 files changed

+402
-634
lines changed

benchmarking/scripts/arxiv_e2e_pipeline_benchmark.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@
4747
from nemo_curator.stages.text.download.arxiv.extract import ArxivExtractor
4848
from nemo_curator.stages.text.download.arxiv.iterator import ArxivIterator
4949
from nemo_curator.stages.text.download.base import URLGenerator
50-
from nemo_curator.stages.text.download.base.extract import DocumentExtractStage
51-
from nemo_curator.stages.text.download.base.iterator import DocumentIterateStage
50+
from nemo_curator.stages.text.download.base.iterator import DocumentIterateExtractStage
5251
from nemo_curator.stages.text.download.base.url_generation import URLGenerationStage
5352
from nemo_curator.stages.text.filters import (
5453
FastTextLangId,
@@ -119,20 +118,15 @@ def __post_init__(self) -> None:
119118
limit=self.url_limit,
120119
)
121120

122-
# Iterate stage (extracts records from tar files)
123-
iterate_stage = DocumentIterateStage(
121+
# Iterate-extract stage (extracts records from tar files and cleans LaTeX to text)
122+
iterate_extract_stage = DocumentIterateExtractStage(
124123
iterator=ArxivIterator(log_frequency=self.log_frequency),
125-
record_limit=self.record_limit,
126-
add_filename_column=self.add_filename_column,
127-
)
128-
129-
# Extract stage (cleans LaTeX to text)
130-
extract_stage = DocumentExtractStage(
131124
extractor=ArxivExtractor(),
125+
record_limit=self.record_limit,
132126
add_filename_column=self.add_filename_column,
133127
)
134128

135-
self.stages = [url_stage, iterate_stage, extract_stage]
129+
self.stages = [url_stage, iterate_extract_stage]
136130
self.name = "local_arxiv_extract"
137131
super().__init__()
138132

docs/about/concepts/text/data-acquisition-concepts.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ This guide covers the core concepts for acquiring and processing text data from
1616

1717
## Overview
1818

19-
Data acquisition in NeMo Curator follows a four-stage architecture:
19+
Data acquisition in NeMo Curator follows a three-stage architecture:
2020

2121
1. **Generate URLs**: Discover and generate download URLs from minimal input
2222
2. **Download**: Retrieve raw data files from remote sources
23-
3. **Iterate**: Extract individual records from downloaded containers
24-
4. **Extract**: Convert raw content to clean, structured text
23+
3. **Iterate** and **Extract**: Extract individual records from downloaded containers and convert raw content to clean, structured text
2524

2625
This process transforms diverse remote data sources into a standardized `DocumentBatch` that can be used throughout the text curation pipeline.
2726

docs/curate-text/load-data/custom.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Create custom data loading pipelines using Curator. This guide shows how to buil
1616

1717
## How It Works
1818

19-
Curator uses the same **4-step pipeline pattern** described in {ref}`Data Acquisition Concepts <about-concepts-text-data-acquisition>` for custom data loading. Each step uses an abstract base class with corresponding processing stages that compose into pipelines.
19+
Curator uses the same **3-step pipeline pattern** described in {ref}`Data Acquisition Concepts <about-concepts-text-data-acquisition>` for custom data loading. Each step uses an abstract base class with corresponding processing stages that compose into pipelines.
2020

2121
---
2222

nemo_curator/stages/text/download/README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22

33
## 📁 Structure Overview
44

5-
The framework follows a **4-step pipeline pattern** where each step is implemented as an abstract base class with corresponding stages:
5+
The framework follows a **3-step pipeline pattern** where each step is implemented as an abstract base class with corresponding stages:
66

77
```
88
1. URLGenerator → URLGenerationStage (URLs from config/input)
99
2. DocumentDownloader → DocumentDownloadStage (local files from URLs)
10-
3. DocumentIterator → DocumentIterateStage (raw records from files)
11-
4. DocumentExtractor → DocumentExtractStage (structured data from records)
10+
3. DocumentIterator and DocumentExtractor → DocumentIterateExtractStage (structured data from files)
1211
```
1312

1413
## 🛠️ Implementation Steps
Lines changed: 1 addition & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
1+
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -13,16 +13,8 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from dataclasses import dataclass
1716
from typing import Any
1817

19-
import pandas as pd
20-
from loguru import logger
21-
22-
from nemo_curator.stages.base import ProcessingStage
23-
from nemo_curator.tasks import DocumentBatch
24-
from nemo_curator.utils.column_utils import resolve_filename_column
25-
2618

2719
class DocumentExtractor(ABC):
2820
"""Abstract base class for document extractors.
@@ -45,67 +37,3 @@ def input_columns(self) -> list[str]:
4537
def output_columns(self) -> list[str]:
4638
"""Define output columns - produces DocumentBatch with records."""
4739
...
48-
49-
50-
@dataclass
51-
class DocumentExtractStage(ProcessingStage[DocumentBatch, DocumentBatch]):
52-
"""Stage that extracts structured content from raw records.
53-
54-
Takes DocumentBatch with raw content and produces DocumentBatch with extracted content.
55-
This is for cases where iteration and extraction are separate steps.
56-
"""
57-
58-
extractor: DocumentExtractor
59-
add_filename_column: bool | str = True
60-
61-
def __post_init__(self):
62-
"""Initialize the stage."""
63-
self.filename_col = resolve_filename_column(self.add_filename_column)
64-
self.name = f"extract_{self.extractor.__class__.__name__.lower()}"
65-
66-
def inputs(self) -> tuple[list[str], list[str]]:
67-
"""Define input requirements - expects DocumentBatch with dict records."""
68-
return (["data"], self.extractor.input_columns() + ([self.filename_col] if self.add_filename_column else [])) # type: ignore[reportReturnType]
69-
70-
def outputs(self) -> tuple[list[str], list[str]]:
71-
"""Define output - produces DocumentBatch with processed records."""
72-
return (["data"], self.extractor.output_columns() + ([self.filename_col] if self.add_filename_column else [])) # type: ignore[reportReturnType]
73-
74-
def process(self, task: DocumentBatch) -> DocumentBatch:
75-
"""Extract structured content from raw records.
76-
77-
Args:
78-
task (DocumentBatch): Batch containing records
79-
80-
Returns:
81-
DocumentBatch: Batch containing extracted records
82-
"""
83-
extracted_records = []
84-
85-
for _, row in task.data.iterrows():
86-
# Convert pandas Series to dict
87-
record_dict = row.to_dict()
88-
89-
# Extract structured content
90-
extracted = self.extractor.extract(record_dict)
91-
if extracted is not None:
92-
if self.add_filename_column:
93-
if self.filename_col in extracted:
94-
msg = f"Since add_filename_col is specified, we'll overwrite ({self.filename_col}) from the input data."
95-
logger.warning(msg)
96-
97-
extracted[self.filename_col] = record_dict[self.filename_col] # type: ignore[reportReturnType]
98-
extracted_records.append(extracted)
99-
100-
# Convert to DataFrame
101-
df = pd.DataFrame(extracted_records)
102-
103-
return DocumentBatch(
104-
task_id=task.task_id,
105-
dataset_name=task.dataset_name,
106-
data=df,
107-
_metadata={
108-
**task._metadata,
109-
},
110-
_stage_perf=task._stage_perf,
111-
)

nemo_curator/stages/text/download/base/iterator.py

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
1+
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@
2525
from nemo_curator.tasks import DocumentBatch, FileGroupTask
2626
from nemo_curator.utils.column_utils import resolve_filename_column
2727

28+
from .extract import DocumentExtractor
29+
2830

2931
class DocumentIterator(ABC):
3032
"""Abstract base class for document iterators.
@@ -45,54 +47,77 @@ def output_columns(self) -> list[str]:
4547

4648

4749
@dataclass
48-
class DocumentIterateStage(ProcessingStage[FileGroupTask, DocumentBatch]):
49-
"""Stage that iterates through downloaded files and extracts records.
50+
class DocumentIterateExtractStage(ProcessingStage[FileGroupTask, DocumentBatch]):
51+
"""Stage that iterates through downloaded files with DocumentIterator,
52+
then extracts structured content from raw records with DocumentExtractor.
5053
51-
Takes local file paths and produces a DocumentBatch with records.
52-
All iterators yield dict[str, str] records uniformly.
54+
Takes local file paths and produces a DocumentBatch with extracted content.
55+
If DocumentIterator produces the final format, then DocumentExtractor is not needed.
5356
"""
5457

5558
iterator: DocumentIterator
59+
extractor: DocumentExtractor | None = None
5660
record_limit: int | None = None
5761
add_filename_column: bool | str = True
5862

5963
def __post_init__(self):
6064
"""Initialize the stage."""
6165
self.filename_col = resolve_filename_column(self.add_filename_column)
62-
self.name = f"iterate_{self.iterator.__class__.__name__.lower()}"
66+
if self.extractor:
67+
self.name = f"iterate_extract_{self.iterator.__class__.__name__.lower()}_{self.extractor.__class__.__name__.lower()}"
68+
else:
69+
self.name = f"iterate_{self.iterator.__class__.__name__.lower()}"
6370

6471
def inputs(self) -> tuple[list[str], list[str]]:
6572
"""Define input requirements - expects FileGroupTask with local file paths."""
6673
return (["data"], [])
6774

6875
def outputs(self) -> tuple[list[str], list[str]]:
69-
"""Define output - produces DocumentBatch with records."""
70-
return (["data"], self.iterator.output_columns() + ([self.filename_col] if self.add_filename_column else [])) # type: ignore[reportReturnType]
76+
"""Define output - produces DocumentBatch with processed records."""
77+
if self.extractor:
78+
return (["data"], self.extractor.output_columns() + ([self.filename_col] if self.add_filename_column else []))
79+
else:
80+
return (["data"], self.iterator.output_columns() + ([self.filename_col] if self.add_filename_column else []))
7181

7282
def process(self, task: FileGroupTask) -> DocumentBatch:
73-
"""Iterate through files and extract records.
83+
"""Iterate through files and extract structured content.
7484
7585
Args:
7686
task (FileGroupTask): Task containing local file paths
7787
7888
Returns:
79-
DocumentBatch: Batch containing records
89+
DocumentBatch: Batch containing extracted records
8090
"""
8191
records = []
8292

8393
for file_path in task.data:
8494
try:
8595
record_count = 0
8696
iterator_result = self.iterator.iterate(file_path)
87-
if iterator_result is not None:
88-
for record_dict in iterator_result:
89-
if self.record_limit and record_count >= self.record_limit:
90-
break
91-
if self.add_filename_column:
92-
# TODO: Support cloud storage https://github.com/NVIDIA-NeMo/Curator/issues/779
93-
record_dict[self.filename_col] = os.path.basename(file_path) # type: ignore[reportReturnType]
94-
records.append(record_dict)
95-
record_count += 1
97+
98+
if iterator_result is None:
99+
continue
100+
101+
for record_dict in iterator_result:
102+
if self.record_limit and record_count >= self.record_limit:
103+
break
104+
105+
# Add filename early
106+
if self.add_filename_column:
107+
record_dict[self.filename_col] = os.path.basename(file_path)
108+
109+
# Extract structured content
110+
extracted = self.extractor.extract(record_dict) if self.extractor else record_dict
111+
112+
if extracted is None:
113+
continue
114+
115+
# Ensure filename is preserved
116+
if self.add_filename_column:
117+
extracted[self.filename_col] = record_dict[self.filename_col]
118+
119+
records.append(extracted)
120+
record_count += 1
96121

97122
except Exception as e: # noqa: BLE001
98123
logger.error(f"Error iterating {file_path}: {e}")

nemo_curator/stages/text/download/base/stage.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
1+
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -18,20 +18,19 @@
1818
from nemo_curator.tasks import DocumentBatch, _EmptyTask
1919

2020
from .download import DocumentDownloader, DocumentDownloadStage
21-
from .extract import DocumentExtractor, DocumentExtractStage
22-
from .iterator import DocumentIterateStage, DocumentIterator
21+
from .extract import DocumentExtractor
22+
from .iterator import DocumentIterateExtractStage, DocumentIterator
2323
from .url_generation import URLGenerationStage, URLGenerator
2424

2525

2626
@dataclass
2727
class DocumentDownloadExtractStage(CompositeStage[_EmptyTask, DocumentBatch]):
28-
"""Composite stage that combines URL generation, download, iterate, and extract stages.
28+
"""Composite stage that combines URL generation, download, and iterate-extract stages.
2929
30-
This supports the full 4-step pipeline pattern like Common Crawl:
30+
This supports the full 3-step pipeline pattern like Common Crawl:
3131
1. Generate URLs from minimal input
3232
2. Download files from URLs
33-
3. Iterate through files to extract raw records
34-
4. Extract structured content from raw records
33+
3. Iterate through files to extract structured content
3534
3635
"""
3736

@@ -56,22 +55,15 @@ def __post_init__(self):
5655
downloader=self.downloader,
5756
)
5857

59-
# Iterate stage
60-
iterate_stage = DocumentIterateStage(
58+
# Iterate-extract stage
59+
iterate_extract_stage = DocumentIterateExtractStage(
6160
iterator=self.iterator,
61+
extractor=self.extractor,
6262
record_limit=self.record_limit,
6363
add_filename_column=self.add_filename_column,
6464
)
6565

66-
# Extract stage (if extractor provided)
67-
stages = [url_stage, download_stage, iterate_stage]
68-
if self.extractor:
69-
extract_stage = DocumentExtractStage(
70-
extractor=self.extractor,
71-
add_filename_column=self.add_filename_column,
72-
)
73-
stages.append(extract_stage)
74-
66+
stages = [url_stage, download_stage, iterate_extract_stage]
7567
self.stages = stages
7668

7769
url_generator_name = self.url_generator.__class__.__name__.lower()

0 commit comments

Comments
 (0)