diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 973f7f8ab3..6d7adfdef1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,44 +31,45 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.9"] - pytest_args: [tests] + # pytest_args: [tests] + pytest_args: [tests/workflows/test_from_csv_to_parquet.py] runtime-version: [upstream, latest, "0.2.1"] - include: - # Run stability tests on Python 3.8 - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python 3.10 - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: macos-latest + # include: + # # Run stability tests on Python 3.8 + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: upstream + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: latest + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: "0.2.1" + # os: ubuntu-latest + # # Run stability tests on Python 3.10 + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: upstream + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: latest + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: "0.2.1" + # os: ubuntu-latest + # # Run stability tests on Python Windows and MacOS (latest py39 only) + # - pytest_args: tests/stability + # python-version: "3.9" + # runtime-version: latest + # os: windows-latest + # - pytest_args: tests/stability + # python-version: "3.9" + # runtime-version: latest + # os: macos-latest steps: - name: Checkout diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index f4a5d1a3dc..98c7b2fe21 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -58,3 +58,12 @@ test_work_stealing_on_straggling_worker: test_repeated_merge_spill: n_workers: 20 worker_vm_types: [m6i.large] + +# For tests/workflows/test_from_csv_to_parquet.py +from_csv_to_parquet_cluster: + n_workers: 30 + # TODO: Remove the `m6i.xlarge` worker specification below + # once it's the default worker instance type + worker_vm_types: [t3.medium] # 2CPU, 4GiB + backend_options: + region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py new file mode 100644 index 0000000000..b2c8e861a1 --- /dev/null +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -0,0 +1,142 @@ +import os +import uuid + +import coiled +import dask.dataframe as dd +import pytest +from distributed import Client, LocalCluster, wait + + +@pytest.fixture(scope="module") +def from_csv_to_parquet_cluster( + dask_env_variables, + cluster_kwargs, + github_cluster_tags, +): + with coiled.Cluster( + f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=github_cluster_tags, + **cluster_kwargs["from_csv_to_parquet_cluster"], + ) as cluster: + yield cluster + + +@pytest.fixture +def from_csv_to_parquet_client( + from_csv_to_parquet_cluster, + cluster_kwargs, + upload_cluster_dump, + benchmark_all, +): + n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] + with Client(from_csv_to_parquet_cluster) as client: + from_csv_to_parquet_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): + yield client + + +COLUMNSV1 = { + "GlobalEventID": "Int64", + "Day": "Int64", + "MonthYear": "Int64", + "Year": "Int64", + "FractionDate": "Float64", + "Actor1Code": "string[pyarrow]", + "Actor1Name": "string[pyarrow]", + "Actor1CountryCode": "string[pyarrow]", + "Actor1KnownGroupCode": "string[pyarrow]", + "Actor1EthnicCode": "string[pyarrow]", + "Actor1Religion1Code": "string[pyarrow]", + "Actor1Religion2Code": "string[pyarrow]", + "Actor1Type1Code": "string[pyarrow]", + "Actor1Type2Code": "string[pyarrow]", + "Actor1Type3Code": "string[pyarrow]", + "Actor2Code": "string[pyarrow]", + "Actor2Name": "string[pyarrow]", + "Actor2CountryCode": "string[pyarrow]", + "Actor2KnownGroupCode": "string[pyarrow]", + "Actor2EthnicCode": "string[pyarrow]", + "Actor2Religion1Code": "string[pyarrow]", + "Actor2Religion2Code": "string[pyarrow]", + "Actor2Type1Code": "string[pyarrow]", + "Actor2Type2Code": "string[pyarrow]", + "Actor2Type3Code": "string[pyarrow]", + "IsRootEvent": "Int64", + "EventCode": "string[pyarrow]", + "EventBaseCode": "string[pyarrow]", + "EventRootCode": "string[pyarrow]", + "QuadClass": "Int64", + "GoldsteinScale": "Float64", + "NumMentions": "Int64", + "NumSources": "Int64", + "NumArticles": "Int64", + "AvgTone": "Float64", + "Actor1Geo_Type": "Int64", + "Actor1Geo_Fullname": "string[pyarrow]", + "Actor1Geo_CountryCode": "string[pyarrow]", + "Actor1Geo_ADM1Code": "string[pyarrow]", + "Actor1Geo_Lat": "Float64", + "Actor1Geo_Long": "Float64", + "Actor1Geo_FeatureID": "string[pyarrow]", + "Actor2Geo_Type": "Int64", + "Actor2Geo_Fullname": "string[pyarrow]", + "Actor2Geo_CountryCode": "string[pyarrow]", + "Actor2Geo_ADM1Code": "string[pyarrow]", + "Actor2Geo_Lat": "Float64", + "Actor2Geo_Long": "Float64", + "Actor2Geo_FeatureID": "string[pyarrow]", + "ActionGeo_Type": "Int64", + "ActionGeo_Fullname": "string[pyarrow]", + "ActionGeo_CountryCode": "string[pyarrow]", + "ActionGeo_ADM1Code": "string[pyarrow]", + "ActionGeo_Lat": "Float64", + "ActionGeo_Long": "Float64", + "ActionGeo_FeatureID": "string[pyarrow]", + "DATEADDED": "Int64", + "SOURCEURL": "string[pyarrow]", +} + + +def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): + s3 = s3_factory(anon=True) + + bad_files = [ + "gdelt-open-data/events/20161004.export.csv", + "gdelt-open-data/events/20170106.export.csv", + "gdelt-open-data/events/20170422.export.csv", + "gdelt-open-data/events/20170802.export.csv", + "gdelt-open-data/events/20170920.export.csv", + "gdelt-open-data/events/20171021.export.csv", + "gdelt-open-data/events/20180415.export.csv", + "gdelt-open-data/events/20180416.export.csv", + "gdelt-open-data/events/20180613.export.csv", + "gdelt-open-data/events/20180806.export.csv", + "gdelt-open-data/events/20190217.export.csv", + "gdelt-open-data/events/20190613.export.csv", + ] + files = s3.ls("s3://gdelt-open-data/events/")[120:] + files = [f"s3://{f}" for f in files if f not in bad_files] + + df = dd.read_csv( + files, + names=COLUMNSV1.keys(), + sep="\t", + dtype=COLUMNSV1, + storage_options=s3.storage_options, + on_bad_lines="skip", + ) + + df = df.map_partitions( + lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") + ) + df["national_paper"] = df.SOURCEURL.str.contains( + "washingtonpost|nytimes", regex=True + ) + df = df[df["national_paper"]] + + output = s3_url + "/from-csv-to-parquet/" + + df.to_parquet(output)