Skip to content

Commit 9e0b0c5

Browse files
committed
dev: update
1 parent b2fbfe9 commit 9e0b0c5

18 files changed

+120
-39
lines changed

.github/workflows/tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ jobs:
2424
- name: Tests
2525
run: PYTHONPATH=. pytest --cov=data_flow --cov-report term
2626
- name: Lint
27-
run: flake8 data_flow/
27+
run: pflake8 data_flow/

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ tests::
1111
PYTHONPATH=. venv/bin/pytest --cov=data_flow --cov-report html --cov-report term -rP tests/ -vvv
1212

1313
lint::
14-
venv/bin/flake8 data_flow/
14+
venv/bin/pflake8 data_flow/

data_flow/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
from data_flow.data_flow import DataFlow
2-
from data_flow.lib.FileType import FileType
1+
from .data_flow import DataFlow

data_flow/data_flow.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from data_flow.lib import FileType
1010
from data_flow.lib.data_columns import data_get_columns, data_delete_columns, data_rename_columns, data_select_columns
1111
from data_flow.lib.data_from import (
12-
df_from_tmp_filename,
1312
from_csv_2_file,
1413
from_feather_2_file,
1514
from_parquet_2_file,
@@ -23,6 +22,9 @@
2322
to_json_from_file,
2423
to_hdf_from_file,
2524
)
25+
from data_flow.lib.fireducks import from_fireducks_2_file, to_fireducks_from_file
26+
from data_flow.lib.pandas import from_pandas_2_file
27+
from data_flow.lib.polars import from_polars_2_file, to_polars_from_file
2628
from data_flow.lib.tools import generate_temporary_filename, delete_file
2729

2830

@@ -45,25 +47,44 @@ def __del__(self):
4547
if not self.__in_memory:
4648
delete_file(self.__filename)
4749

50+
def from_fireducks(self, df: fd.DataFrame):
51+
if self.__in_memory:
52+
self.__data = df
53+
else:
54+
from_fireducks_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
55+
return self
56+
4857
def to_fireducks(self) -> fd.DataFrame:
4958
if self.__in_memory:
5059
return self.__data
5160
else:
52-
return df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type)
61+
return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type)
62+
63+
def from_pandas(self, df: pd.DataFrame):
64+
if self.__in_memory:
65+
self.__data = fd.from_pandas(df)
66+
else:
67+
from_pandas_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
68+
return self
5369

5470
def to_pandas(self) -> pd.DataFrame:
5571
if self.__in_memory:
5672
return self.__data.to_pandas()
5773
else:
58-
return df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()
74+
return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()
75+
76+
def from_polars(self, df: pl.DataFrame):
77+
if self.__in_memory:
78+
self.__data = fd.from_pandas(df.to_pandas())
79+
else:
80+
from_polars_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
81+
return self
5982

6083
def to_polars(self) -> pl.DataFrame:
6184
if self.__in_memory:
6285
return pl.from_pandas(self.__data.to_pandas())
6386
else:
64-
return pl.from_pandas(
65-
df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()
66-
)
87+
return to_polars_from_file(tmp_filename=self.__filename, file_type=self.__file_type)
6788

6889
def from_csv(self, filename: str):
6990
if self.__in_memory:
@@ -139,14 +160,14 @@ def head(self):
139160
if self.__in_memory:
140161
print(self.__data.head())
141162
else:
142-
print(df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type).head())
163+
print(to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).head())
143164
return self
144165

145166
def stats(self):
146167
if self.__in_memory:
147168
data = self.__data
148169
else:
149-
data = df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type)
170+
data = to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type)
150171

151172
print("***** Data stats *****")
152173
print(f"Columns names : {data.columns.to_list()}")

data_flow/lib/Operator.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from enum import Enum
2+
3+
4+
class Operator(Enum):
5+
Eq = "=="
6+
Gt = ">"
7+
Lt = "<"
8+
Gte = ">="
9+
Lte = "<="
10+
Ne = "!="

data_flow/lib/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
from .FileType import FileType
2+
from .Operator import Operator

data_flow/lib/data_columns.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,5 @@ def data_select_columns(tmp_filename: str, file_type: FileType, columns: list) -
4545
case FileType.feather:
4646
data = fd.read_feather(tmp_filename)[columns]
4747
data.to_feather(tmp_filename)
48-
4948
case _:
5049
raise ValueError(f"File type not implemented: {file_type} !")
51-
52-
53-
# def __slice(dataframe, start_row, end_row, start_col, end_col):
54-
# assert len(dataframe) > end_row and start_row >= 0
55-
# assert len(dataframe.columns) > end_col and start_col >= 0
56-
# list_of_indexes = list(dataframe.columns)[start_col:end_col]
57-
# return dataframe.iloc[start_row:end_row][list_of_indexes]

data_flow/lib/data_from.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,3 @@ def from_hdf_2_file(filename: str, tmp_filename: str, file_type: FileType) -> No
5252
fd.read_hdf(filename).to_feather(tmp_filename)
5353
case _:
5454
raise ValueError(f"File type not implemented: {file_type} !")
55-
56-
57-
def df_from_tmp_filename(tmp_filename: str, file_type: FileType) -> fd.DataFrame:
58-
match file_type:
59-
case FileType.parquet:
60-
return fd.read_parquet(tmp_filename)
61-
case FileType.feather:
62-
return fd.read_feather(tmp_filename)
63-
case _:
64-
raise ValueError(f"File type not implemented: {file_type} !")

data_flow/lib/fireducks.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import fireducks.pandas as fd
2+
3+
from data_flow.lib.FileType import FileType
4+
5+
6+
def from_fireducks_2_file(df: fd.DataFrame, tmp_filename: str, file_type: FileType) -> None:
7+
match file_type:
8+
case FileType.parquet:
9+
df.to_parquet(tmp_filename)
10+
case FileType.feather:
11+
df.to_feather(tmp_filename)
12+
case _:
13+
raise ValueError(f"File type not implemented: {file_type} !")
14+
15+
16+
def to_fireducks_from_file(tmp_filename: str, file_type: FileType) -> fd.DataFrame:
17+
match file_type:
18+
case FileType.parquet:
19+
return fd.read_parquet(tmp_filename)
20+
case FileType.feather:
21+
return fd.read_feather(tmp_filename)
22+
case _:
23+
raise ValueError(f"File type not implemented: {file_type} !")

data_flow/lib/pandas.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import fireducks.pandas as fd
2+
import pandas as pd
3+
4+
from data_flow.lib.FileType import FileType
5+
6+
7+
def from_pandas_2_file(df: pd.DataFrame, tmp_filename: str, file_type: FileType) -> None:
8+
match file_type:
9+
case FileType.parquet:
10+
fd.from_pandas(df).to_parquet(tmp_filename)
11+
case FileType.feather:
12+
fd.from_pandas(df).to_feather(tmp_filename)
13+
case _:
14+
raise ValueError(f"File type not implemented: {file_type} !")
15+
16+
17+
def to_pandas_from_file(tmp_filename: str, file_type: FileType) -> fd.DataFrame:
18+
match file_type:
19+
case FileType.parquet:
20+
return pd.read_parquet(tmp_filename)
21+
case FileType.feather:
22+
return pd.read_feather(tmp_filename)
23+
case _:
24+
raise ValueError(f"File type not implemented: {file_type} !")

0 commit comments

Comments
 (0)