-
Notifications
You must be signed in to change notification settings - Fork 144
Description
Describe the bug
I am following the code here (https://github.com/pytorch/torchrec/tree/main/torchrec/datasets/scripts/nvt) to preprocess criteo 1tb dataset using Nvtabular. I cannot execute the step workflow.fit, error "RuntimeError: Failed to categorical encode column cat_0" occurred.
Steps/Code to reproduce bug
import os
import shutil
import time
import numpy as np
import nvtabular as nvt
from merlin.io import Shuffle
from utils.dask import setup_dask
def parse_args():
parser = argparse.ArgumentParser(description="Preprocess cat_0 column in day_0 file")
parser.add_argument("--base_path", "-b", dest="base_path", help="Base path")
args = parser.parse_args()
return args
if __name__ == "__main__":
start_time = time.time()
args = parse_args()
dask_workdir = os.path.join(args.base_path, "dask_workdir")
client = setup_dask(dask_workdir)
print(client)
input_path = os.path.join(args.base_path, "criteo_parquet")
output_path = os.path.join(args.base_path, "criteo_preproc_day0_test")
os.makedirs(output_path, exist_ok=True)
day_0_file = os.path.join(input_path, "day_0.parquet")
assert os.path.exists(day_0_file), f"Day_0 parquet file {day_0_file} does not exist"
cat_features = (
["cat_0"]
>> nvt.ops.FillMissing()
>> nvt.ops.Categorify(num_buckets={"cat_0":40000000})
)
features = cat_features
workflow = nvt.Workflow(features)
train_dataset = nvt.Dataset(day_0_file, engine="parquet", part_size="256MB")
print("Fit workflow...")
workflow.fit(train_dataset) # problem occurred here
print("Transform workflow...")
output_train = os.path.join(output_path, "train")
workflow.transform(train_dataset).to_parquet(
output_path=output_train,
shuffle=Shuffle.PER_WORKER,
out_files_per_proc=1
)
print(f"Total processing time: {time.time() - start_time:.2f} seconds")
Basically, the above code followed the code here: (https://github.com/pytorch/torchrec/blob/main/torchrec/datasets/scripts/nvt/process_criteo_parquet.py), the above code try to process a small part of the full criteo 1tb dataset to reproduce the issue faster.
Update: when I change cat0 to cat1 or cat2 and try to categorify these cat_feature independently, the problem is solved, and the workflow can successfully execute. It seems like the problem only occur to the features whose NUM_EMBEDDINGS_PER_FEATURE is large.
Expected behavior
Failed to transform operator <nvtabular.ops.categorify.Categorify object at 0x7fccd061cf10>
Traceback (most recent call last):
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtabular/ops/categorify.py", line 510, in transform
encoded = _encode(
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtabular/ops/categorify.py", line 1637, in _encode
value = fetch_table_data(
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/io/worker.py", line 109, in fetch_table_data
df = reader(path, **use_kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtx/nvtx.py", line 116, in inner
result = func(*args, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/cudf/io/parquet.py", line 577, in read_parquet
df = _parquet_to_frame(
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtx/nvtx.py", line 116, in inner
result = func(*args, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/cudf/io/parquet.py", line 721, in _parquet_to_frame
return _read_parquet(
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtx/nvtx.py", line 116, in inner
result = func(*args, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/cudf/io/parquet.py", line 829, in _read_parquet
return libparquet.read_parquet(
File "parquet.pyx", line 118, in cudf._lib.parquet.read_parquet
File "parquet.pyx", line 174, in cudf._lib.parquet.read_parquet
RuntimeError: CUDF failure at:/opt/conda/condabld/work/cpp/src/io/parquet/reader_impl_preprocess.cu:300: Parquet header parsing failed with code(s) while counting page headers 0x5
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 237, in _run_node_transform
transformed_data = node.op.transform(selection, input_data)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtx/nvtx.py", line 116, in inner
result = func(*args, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtabular/ops/categorify.py", line 534, in transform
raise RuntimeError(f"Failed to categorical encode column {name}") from e
RuntimeError: Failed to categorical encode column cat_0
2024-12-16 01:12:09,485 - distributed.worker - WARNING - Compute Failed
Key: ('transform-8718b7810a9732535d4fc700373e7bd7', 0)
Function: subgraph_callable-bd252eab-03b4-41aa-bc38-474ab23a
args: ([<Node + output>], 'read-parquet-8fce583dc4d4c20eb9eb8706c6d00208', {'piece': ('/scratch/bcjw/wzhan/dataset/criteo_parquet/day_0.parquet', [0], [])})
kwargs: {}
Exception: "RuntimeError('Failed to categorical encode column cat_0')"
File "/u/wzhan/torchrec/torchrec/datasets/scripts/nvt/process_criteo_parquet.py", line 98, in <module>
workflow.fit(train_dataset)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtabular/workflow/workflow.py", line 213, in fit
self.executor.fit(dataset, self.graph)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 489, in fit
).sample_dtypes()
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/io/dataset.py", line 1262, in sample_dtypes
_real_meta = self.engine.sample_data(n=n)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/io/dataset_engine.py", line 71, in sample_data
_head = _ddf.partitions[partition_index].head(n)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/dask/dataframe/core.py", line 1590, in head
return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/dask/dataframe/core.py", line 1624, in _head
result = result.compute()
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/dask/base.py", line 379, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/dask/base.py", line 665, in compute
results = schedule(dsk, keys, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 102, in transform
transformed_data = self._execute_node(node, transformable, capture_dtypes, strict)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 116, in _execute_node
upstream_outputs = self._run_upstream_transforms(
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 130, in _run_upstream_transforms
node_output = self._execute_node(
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 122, in _execute_node
transform_output = self._run_node_transform(node, transform_input, capture_dtypes, strict)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 250, in _run_node_transform
raise exc
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/merlin/dag/executors.py", line 237, in _run_node_transform
transformed_data = node.op.transform(selection, input_data)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtx/nvtx.py", line 116, in inner
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtx/nvtx.py", line 116, in inner
result = func(*args, **kwargs)
File "/u/wzhan/.conda/envs/nvt/lib/python3.10/site-packages/nvtabular/ops/categorify.py", line 534, in transform
raise RuntimeError(f"Failed to categorical encode column {name}") from e
RuntimeError: Failed to categorical encode column cat_0
After ~10 minutes, the above error occurred.
Environment details (please complete the following information):
- Method of NVTabular install: [conda] 23.08.00
Running without docker, since docker is not available in my case.
python 3.9
cuda 11.8
cudf 24.02.02
cupy 13.3.0
dask-cuda 24.02.00
Additional context
I am happy to share any more detailed information.