-
Notifications
You must be signed in to change notification settings - Fork 32
Open
Description
Overview
Python 3.7.5, Ubuntu
If you try to use the flow
key instead of run
, custom flows saved under a path stored in DPP_PROCESSOR_PATH are not properly loaded. Running simply dpp
correctly shows the pipeline as being valid, but when you actually run it you get:
ModuleNotFoundError: No module named 'bcodmo_pipeline_processors'
recreate:
DPP_PROCESSOR_PATH=~/path/to/processors/
pipeline-spec.yaml:
test:
title: test
description: this is a test
pipeline:
- flow: bcodmo_pipeline_processors.load
parameters:
format: csv
from: /path/to/load/from
name: res
where a load flow is stored under:
/path/to/processors/bcodmo_pipeline_processors/load.py
and the load flow looks like:
from dataflows import Flow, load
from datapackage_pipelines.utilities.resources import PROP_STREAMING, PROP_STREAMED_FROM
def flow(parameters, datapackage, resources, stats):
def count_resources():
def func(package):
global num_resources
num_resources = len(package.pkg.resources)
yield package.pkg
yield from package
return func
def mark_streaming(_from):
def func(package):
for i in range(num_resources, len(package.pkg.resources)):
package.pkg.descriptor['resources'][i].setdefault(PROP_STREAMING, True)
package.pkg.descriptor['resources'][i].setdefault(PROP_STREAMED_FROM, _from)
yield package.pkg
yield from package
return func
_from = parameters.pop('from')
return Flow(
count_resources(),
load(_from, parameters),
mark_streaming(_from),
)
Tasks
- review
- fix if there is a quick fix OR
- remove from the BCO-DMO as it's not needed anymore
Metadata
Metadata
Assignees
Labels
No labels