-
Notifications
You must be signed in to change notification settings - Fork 19
Workflow: CSV to parquet #841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
3a76080
5aedaca
e185e5d
95585d4
10a529b
4504020
96e66f5
0779b48
7fb6792
437eb0b
e4851df
2657885
6eb04d6
6f4c04e
222c695
fc40687
670e3cc
16b0277
b557bc5
ccedaf8
37120c3
b3cfbaa
ca7df93
cdca5de
6a93130
e2b070e
5b58925
a753aea
6e9fc7f
db73e85
ed1d9e3
e3145c6
19606da
e701cc1
530d35c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
import uuid | ||
from collections import OrderedDict | ||
|
||
import coiled | ||
import dask.dataframe as dd | ||
import pytest | ||
from distributed import Client | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def from_csv_to_parquet_cluster( | ||
dask_env_variables, | ||
cluster_kwargs, | ||
github_cluster_tags, | ||
): | ||
with coiled.Cluster( | ||
f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", | ||
environ=dask_env_variables, | ||
tags=github_cluster_tags, | ||
**cluster_kwargs["from_csv_to_parquet_cluster"], | ||
) as cluster: | ||
yield cluster | ||
|
||
|
||
@pytest.fixture | ||
def from_csv_to_parquet_client( | ||
from_csv_to_parquet_cluster, | ||
cluster_kwargs, | ||
upload_cluster_dump, | ||
benchmark_all, | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] | ||
with Client(from_csv_to_parquet_cluster) as client: | ||
from_csv_to_parquet_cluster.scale(n_workers) | ||
client.wait_for_workers(n_workers) | ||
client.restart() | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
with upload_cluster_dump(client), benchmark_all(client): | ||
yield client | ||
|
||
|
||
SCHEMA = OrderedDict( | ||
[ | ||
("GlobalEventID", "Int64"), | ||
("Day", "Int64"), | ||
("MonthYear", "Int64"), | ||
("Year", "Int64"), | ||
("FractionDate", "Float64"), | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
("Actor1Code", "string[pyarrow]"), | ||
("Actor1Name", "string[pyarrow]"), | ||
("Actor1CountryCode", "string[pyarrow]"), | ||
("Actor1KnownGroupCode", "string[pyarrow]"), | ||
("Actor1EthnicCode", "string[pyarrow]"), | ||
("Actor1Religion1Code", "string[pyarrow]"), | ||
("Actor1Religion2Code", "string[pyarrow]"), | ||
("Actor1Type1Code", "string[pyarrow]"), | ||
("Actor1Type2Code", "string[pyarrow]"), | ||
("Actor1Type3Code", "string[pyarrow]"), | ||
("Actor2Code", "string[pyarrow]"), | ||
("Actor2Name", "string[pyarrow]"), | ||
("Actor2CountryCode", "string[pyarrow]"), | ||
("Actor2KnownGroupCode", "string[pyarrow]"), | ||
("Actor2EthnicCode", "string[pyarrow]"), | ||
("Actor2Religion1Code", "string[pyarrow]"), | ||
("Actor2Religion2Code", "string[pyarrow]"), | ||
("Actor2Type1Code", "string[pyarrow]"), | ||
("Actor2Type2Code", "string[pyarrow]"), | ||
("Actor2Type3Code", "string[pyarrow]"), | ||
("IsRootEvent", "Int64"), | ||
("EventCode", "string[pyarrow]"), | ||
("EventBaseCode", "string[pyarrow]"), | ||
("EventRootCode", "string[pyarrow]"), | ||
("QuadClass", "Int64"), | ||
("GoldsteinScale", "Float64"), | ||
("NumMentions", "Int64"), | ||
("NumSources", "Int64"), | ||
("NumArticles", "Int64"), | ||
("AvgTone", "Float64"), | ||
("Actor1Geo_Type", "Int64"), | ||
("Actor1Geo_Fullname", "string[pyarrow]"), | ||
("Actor1Geo_CountryCode", "string[pyarrow]"), | ||
("Actor1Geo_ADM1Code", "string[pyarrow]"), | ||
("Actor1Geo_Lat", "Float64"), | ||
("Actor1Geo_Long", "Float64"), | ||
("Actor1Geo_FeatureID", "string[pyarrow]"), | ||
("Actor2Geo_Type", "Int64"), | ||
("Actor2Geo_Fullname", "string[pyarrow]"), | ||
("Actor2Geo_CountryCode", "string[pyarrow]"), | ||
("Actor2Geo_ADM1Code", "string[pyarrow]"), | ||
("Actor2Geo_Lat", "Float64"), | ||
("Actor2Geo_Long", "Float64"), | ||
("Actor2Geo_FeatureID", "string[pyarrow]"), | ||
("ActionGeo_Type", "Int64"), | ||
("ActionGeo_Fullname", "string[pyarrow]"), | ||
("ActionGeo_CountryCode", "string[pyarrow]"), | ||
("ActionGeo_ADM1Code", "string[pyarrow]"), | ||
("ActionGeo_Lat", "Float64"), | ||
("ActionGeo_Long", "Float64"), | ||
("ActionGeo_FeatureID", "string[pyarrow]"), | ||
("DATEADDED", "Int64"), | ||
("SOURCEURL", "string[pyarrow]"), | ||
] | ||
) | ||
|
||
|
||
def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s3 = s3_factory(anon=True) | ||
files = s3.ls("s3://gdelt-open-data/events/")[:1000] | ||
files = [f"s3://{f}" for f in files] | ||
|
||
df = dd.read_csv( | ||
files, | ||
sep="\t", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes read_csv fall back to the python engine. 2 things:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose it's quite likely the average user (including me) would not know this. Therefore, IMO, I'd think it ought to stay this way so when it changes in the future, the bump in performance would be evident historically because it's what the average user might expect. The second option I think would be a slightly hacky work-around that the benchmarks aren't designed for. My understanding is these workflows should represent somewhat realistic use cases and not as a means for maximizing performance itself. Could be mistaken though. cc @fjetter @jrbourbeau There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally fine by me if this is intended. Just wanted to check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a follow-up ticket, we could parametrize this test with comma-separated and tab-separated files if we care about the performance of the different engines. |
||
names=SCHEMA.keys(), | ||
# 'dtype' and 'converters' cannot overlap | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dtype={ | ||
col: dtype for col, dtype in SCHEMA.items() if dtype == "string[pyarrow]" | ||
}, | ||
storage_options=s3.storage_options, | ||
on_bad_lines="skip", | ||
# Some bad files have '#' in numeric values | ||
converters={ | ||
col: lambda v: float(v.replace("#", "") or "NaN") | ||
for col, dtype in SCHEMA.items() | ||
if dtype != "string[pyarrow]" | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}, | ||
) | ||
|
||
# Now we can safely convert the numeric columns | ||
df = df.astype( | ||
j-bennet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{col: dtype for col, dtype in SCHEMA.items() if dtype != "string[pyarrow]"} | ||
) | ||
|
||
df = df.map_partitions( | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") | ||
) | ||
df["national_paper"] = df.SOURCEURL.str.contains( | ||
"washingtonpost|nytimes", regex=True | ||
) | ||
df = df[df["national_paper"]] | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
df = df.persist() | ||
assert len(df) | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False) | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.