1
1
import uuid
2
- import pandas as pd
3
- import yaml
4
2
from abc import ABC , abstractmethod
3
+ from collections .abc import Iterable
5
4
from copy import deepcopy
6
5
from dataclasses import dataclass
7
6
from pathlib import Path
8
7
from typing import Any
9
- from collections .abc import Iterable
8
+
9
+ import pandas as pd
10
+ import yaml
11
+
10
12
from dplutils .pipeline .graph import PipelineGraph
11
13
from dplutils .pipeline .utils import dict_from_coord
12
14
@@ -27,6 +29,7 @@ class PipelineExecutor(ABC):
27
29
to execute the pipeline and return and generator of dataframes of the final
28
30
tasks in the graph.
29
31
"""
32
+
30
33
def __init__ (self , graph : PipelineGraph ):
31
34
if isinstance (graph , list ):
32
35
self .graph = PipelineGraph (deepcopy (graph ))
@@ -36,21 +39,21 @@ def __init__(self, graph: PipelineGraph):
36
39
self ._run_id = None
37
40
38
41
@classmethod
39
- def from_graph (cls , graph : PipelineGraph ) -> ' PipelineExecutor' :
42
+ def from_graph (cls , graph : PipelineGraph ) -> " PipelineExecutor" :
40
43
return cls (graph )
41
44
42
45
@property
43
46
def tasks_idx (self ): # for back compat
44
47
return self .graph .task_map
45
48
46
- def set_context (self , key , value ) -> ' PipelineExecutor' :
49
+ def set_context (self , key , value ) -> " PipelineExecutor" :
47
50
self .ctx [key ] = value
48
51
return self
49
52
50
- def set_config_from_dict (self , config ) -> ' PipelineExecutor' :
53
+ def set_config_from_dict (self , config ) -> " PipelineExecutor" :
51
54
for task_name , confs in config .items ():
52
55
if task_name not in self .tasks_idx :
53
- raise ValueError (f' no such task: { task_name } ' )
56
+ raise ValueError (f" no such task: { task_name } " )
54
57
for key , value in confs .items ():
55
58
task = self .tasks_idx [task_name ]
56
59
task_val = getattr (task , key )
@@ -61,11 +64,11 @@ def set_config_from_dict(self, config) -> 'PipelineExecutor':
61
64
return self
62
65
63
66
def set_config (
64
- self ,
65
- coord : str | dict | None = None ,
66
- value : Any | None = None ,
67
- from_yaml : str | Path | None = None ,
68
- ) -> ' PipelineExecutor' :
67
+ self ,
68
+ coord : str | dict | None = None ,
69
+ value : Any | None = None ,
70
+ from_yaml : str | Path | None = None ,
71
+ ) -> " PipelineExecutor" :
69
72
"""Set task configuration options for this instance.
70
73
71
74
This applies configurations to :class:`PipelineTask
@@ -90,8 +93,8 @@ def set_config(
90
93
"""
91
94
if coord is None :
92
95
if from_yaml is None :
93
- raise ValueError (' one of dict/string coordinate and value/file input is required' )
94
- with open (from_yaml , 'r' ) as f :
96
+ raise ValueError (" one of dict/string coordinate and value/file input is required" )
97
+ with open (from_yaml , "r" ) as f :
95
98
return self .set_config_from_dict (yaml .load (f , yaml .SafeLoader ))
96
99
if isinstance (coord , dict ):
97
100
return self .set_config_from_dict (coord )
@@ -106,7 +109,7 @@ def validate(self) -> None:
106
109
except ValueError as e :
107
110
excs .append (str (e ))
108
111
if len (excs ) > 0 :
109
- raise ValueError (' Errors in validation:\n - ' + ' \n - ' .join (excs ))
112
+ raise ValueError (" Errors in validation:\n - " + " \n - " .join (excs ))
110
113
111
114
@property
112
115
def run_id (self ) -> str :
@@ -147,7 +150,9 @@ def run(self) -> Iterable[OutputBatch]:
147
150
self ._run_id = None # force reallocation
148
151
return self .execute ()
149
152
150
- def writeto (self , outdir : Path | str , partition_by_task : bool | None = None , task_partition_name : str = 'task' ) -> None :
153
+ def writeto (
154
+ self , outdir : Path | str , partition_by_task : bool | None = None , task_partition_name : str = "task"
155
+ ) -> None :
151
156
"""Run pipeline, writing results to parquet table.
152
157
153
158
args:
@@ -166,10 +171,10 @@ def writeto(self, outdir: Path|str, partition_by_task: bool|None = None, task_pa
166
171
Path (outdir ).mkdir (parents = True , exist_ok = True )
167
172
for c , batch in enumerate (self .run ()):
168
173
if partition_by_task :
169
- part_name = batch .task or ' __HIVE_DEFAULT_PARTITION__'
170
- part_path = Path (outdir ) / f' { task_partition_name } ={ part_name } '
174
+ part_name = batch .task or " __HIVE_DEFAULT_PARTITION__"
175
+ part_path = Path (outdir ) / f" { task_partition_name } ={ part_name } "
171
176
part_path .mkdir (exist_ok = True )
172
- outfile = part_path / f' { self .run_id } -{ c } .parquet'
177
+ outfile = part_path / f" { self .run_id } -{ c } .parquet"
173
178
else :
174
- outfile = Path (outdir ) / f' { self .run_id } -{ c } .parquet'
179
+ outfile = Path (outdir ) / f" { self .run_id } -{ c } .parquet"
175
180
batch .data .to_parquet (outfile , index = False )
0 commit comments