Skip to content

feat: Support placeholders for input_path and output_path for all States (except Fail) and items_path for MapState #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/stepfunctions/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput
from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, MapItemIndex, MapItemValue, StepInput
43 changes: 42 additions & 1 deletion src/stepfunctions/inputs/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class StepInput(Placeholder):
def __init__(self, schema=None, **kwargs):
super(StepInput, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Input.
Expand All @@ -291,3 +291,44 @@ def _create_variable(self, name, parent, type=None):
return StepInput(name=name, parent=parent, type=type)
else:
return StepInput(name=name, parent=parent)


class MapItemValue(Placeholder):
"""
Top-level class for map item value placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(MapItemValue, self).__init__(schema, **kwargs)
self.json_str_template = '$$.Map.Item.Value{}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Map Item Value.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection: "
f" {self.schema}")
if type:
return MapItemValue(name=name, parent=parent, type=type)
else:
return MapItemValue(name=name, parent=parent)


class MapItemIndex(Placeholder):
"""
Top-level class for map item index placeholders.
"""

def __init__(self, **kwargs):
if kwargs.get('schema'):
raise AttributeError("MapItemIndex does not support schema object")
super(MapItemIndex, self).__init__(**kwargs)
self.json_str_template = '$$.Map.Item.Index'

def _create_variable(self, name, parent, type=None):
raise AttributeError("MapItemIndex has no _create_variable object")

def __getitem__(self, item):
raise AttributeError("MapItemIndex has no __getitem__ object")
28 changes: 25 additions & 3 deletions src/stepfunctions/steps/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,33 @@ def to_dict(self):
k = to_pascalcase(k)
if k == to_pascalcase(Field.Parameters.value):
result[k] = self._replace_placeholders(v)
elif self._is_placeholder_compatible(k):
if isinstance(v, Placeholder):
result[k] = v.to_jsonpath()
else:
result[k] = v
else:
result[k] = v

return result

@staticmethod
def _is_placeholder_compatible(field):
"""
Check if the field is placeholder compatible
Args:
field: Field against which to verify placeholder compatibility
"""
return field in [
# Common fields
to_pascalcase(Field.InputPath.value),
to_pascalcase(Field.OutputPath.value),

# Map
to_pascalcase(Field.ItemsPath.value)
]


def to_json(self, pretty=False):
"""Serialize to a JSON formatted string.

Expand Down Expand Up @@ -541,13 +563,13 @@ def __init__(self, state_id, **kwargs):
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
iterator (State or Chain): State or chain to execute for each of the items in `items_path`.
items_path (str, optional): Path in the input for items to iterate over. (default: '$')
items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$')
max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0)
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
super(Map, self).__init__(state_id, 'Map', **kwargs)

Expand Down
189 changes: 187 additions & 2 deletions tests/unit/test_placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest
import json

from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.inputs import ExecutionInput, MapItemIndex, MapItemValue, StepInput

def test_placeholder_creation_with_subscript_operator():
step_input = StepInput()
Expand Down Expand Up @@ -178,4 +178,189 @@ def check_immutable(placeholder):
for k, v in placeholder.store.items():
return check_immutable(v)
else:
return False
return False


def test_map_item_value_creation_with_subscript_operator():
map_item_placeholder = MapItemValue()
map_item_placeholder = map_item_placeholder["A"]
assert map_item_placeholder.name == "A"
assert map_item_placeholder.type is None


def test_map_item_index_creation_with_subscript_operator():
map_item_placeholder = MapItemIndex()
with pytest.raises(AttributeError):
map_item_placeholder["A"]
assert not map_item_placeholder.get_schema_as_dict()
assert not map_item_placeholder.immutable


def test_map_item_value_creation_with_type():
map_item_placeholder = MapItemValue()
map_item_variable = map_item_placeholder["A"]["b"].get("C", float)
assert map_item_variable.name == "C"
assert map_item_variable.type == float


def test_map_item_value_creation_with_int_key():
map_item_placeholder = MapItemValue()
map_item_variable = map_item_placeholder["A"][0]
assert map_item_variable.name == 0
assert map_item_variable.type is None


def test_map_item_value_creation_with_invalid_key():
map_item_placeholder = MapItemValue()
with pytest.raises(ValueError):
map_item_placeholder["A"][1.3]
with pytest.raises(ValueError):
map_item_placeholder["A"].get(1.2, str)


def test_map_item_value_creation_failure_with_type():
map_item_placeholder = MapItemValue()
map_item_placeholder["A"]["b"].get("C", float)
with pytest.raises(ValueError):
map_item_placeholder["A"]["b"].get("C", int)


def test_map_item_value_path():
map_item_placeholder = MapItemValue()
placeholder_variable = map_item_placeholder["A"]["b"]["C"]
expected_path = ["A", "b", "C"]
assert placeholder_variable._get_path() == expected_path


def test_map_item_value_contains():
map_item_placeholder = MapItemValue()
var_three = map_item_placeholder["Key01"]["Key04"]

map_item_placeholder_two = StepInput()
var_five = map_item_placeholder_two["Key07"]

assert map_item_placeholder.contains(var_three) == True
assert map_item_placeholder.contains(var_five) == False
assert map_item_placeholder_two.contains(var_three) == False
assert map_item_placeholder_two.contains(var_five) == True


def test_map_item_value_schema_as_dict():
map_item_placeholder = MapItemValue()
map_item_placeholder["A"]["b"].get("C", float)
map_item_placeholder["Message"]
map_item_placeholder["Key01"]["Key02"]
map_item_placeholder["Key03"]
map_item_placeholder["Key03"]["Key04"]

expected_schema = {
"A": {
"b": {
"C": float
}
},
"Message": str,
"Key01": {
"Key02": str
},
"Key03": {
"Key04": str
}
}

assert map_item_placeholder.get_schema_as_dict() == expected_schema


def test_map_item_value_schema_as_json():
map_item_placeholder = MapItemValue()
map_item_placeholder["Response"].get("StatusCode", int)
map_item_placeholder["Hello"]["World"]
map_item_placeholder["A"]
map_item_placeholder["Hello"]["World"].get("Test", str)

expected_schema = {
"Response": {
"StatusCode": "int"
},
"Hello": {
"World": {
"Test": "str"
}
},
"A": "str"
}

assert map_item_placeholder.get_schema_as_json() == json.dumps(expected_schema)


def test_map_item_value_is_empty():
workflow_input = MapItemValue()
placeholder_variable = workflow_input["A"]["B"]["C"]
assert placeholder_variable._is_empty() == True
workflow_input["A"]["B"]["C"]["D"]
assert placeholder_variable._is_empty() == False


def test_map_item_value_make_immutable():
workflow_input = MapItemValue()
workflow_input["A"]["b"].get("C", float)
workflow_input["Message"]
workflow_input["Key01"]["Key02"]
workflow_input["Key03"]
workflow_input["Key03"]["Key04"]

assert check_immutable(workflow_input) == False

workflow_input._make_immutable()
assert check_immutable(workflow_input) == True


def test_map_item_value_with_schema():
test_schema = {
"A": {
"B": {
"C": int
}
},
"Request": {
"Status": str
},
"Hello": float
}
workflow_input = MapItemValue(schema=test_schema)
assert workflow_input.get_schema_as_dict() == test_schema
assert workflow_input.immutable == True
assert workflow_input['A']['B'].get("C", int)

with pytest.raises(ValueError):
workflow_input["A"]["B"]["D"]

with pytest.raises(ValueError):
workflow_input["A"]["B"].get("C", float)


def test_map_item_index_with_schema():
test_schema = {
"A": {
"B": {
"C": int
}
},
"Request": {
"Status": str
},
"Hello": float
}
with pytest.raises(AttributeError):
workflow_input = MapItemIndex(schema=test_schema)


def test_map_item_value_jsonpath():
map_item_value = MapItemValue()
map_item_value_variable = map_item_value["A"]["b"].get(0, float)
assert map_item_value_variable.to_jsonpath() == "$$.Map.Item.Value['A']['b'][0]"


def test_map_item_index_jsonpath():
map_item_index = MapItemIndex()
assert map_item_index.to_jsonpath() == "$$.Map.Item.Index"
27 changes: 25 additions & 2 deletions tests/unit/test_placeholders_with_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from stepfunctions.steps import Pass, Succeed, Fail, Wait, Choice, ChoiceRule, Parallel, Map, Task, Retry, Catch, Chain, Graph
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.inputs import ExecutionInput, MapItemValue, MapItemIndex

def test_workflow_input_placeholder():

Expand Down Expand Up @@ -176,8 +176,22 @@ def test_step_input_order_validation():

def test_map_state_with_placeholders():
workflow_input = ExecutionInput()
map_item_value = MapItemValue(schema={
'name': str,
'age': str
})

map_state = Map('MapState01')
map_state = Map(
'MapState01',
input_path=workflow_input['input_path'],
items_path=workflow_input['items_path'],
output_path=workflow_input['output_path'],
parameters={
"MapIndex": MapItemIndex(),
"Name": map_item_value['name'],
"Age": map_item_value['age']
}
)
iterator_state = Pass(
'TrainIterator',
parameters={
Expand All @@ -194,6 +208,9 @@ def test_map_state_with_placeholders():
"MapState01": {
"Type": "Map",
"End": True,
"InputPath": "$$.Execution.Input['input_path']",
"ItemsPath": "$$.Execution.Input['items_path']",
"OutputPath": "$$.Execution.Input['output_path']",
"Iterator": {
"StartAt": "TrainIterator",
"States": {
Expand All @@ -206,6 +223,11 @@ def test_map_state_with_placeholders():
"End": True
}
}
},
'Parameters': {
'Age.$': "$$.Map.Item.Value['age']",
'MapIndex.$': '$$.Map.Item.Index',
'Name.$': "$$.Map.Item.Value['name']"
}
}
}
Expand All @@ -214,6 +236,7 @@ def test_map_state_with_placeholders():
result = Graph(workflow_definition).to_dict()
assert result == expected_repr


def test_parallel_state_with_placeholders():
workflow_input = ExecutionInput()

Expand Down