Skip to content

Commit 53a87ee

Browse files
vigithyhl25
andauthored
fix: conditional forwarding in Async Dataplane (#2668)
Signed-off-by: Vigith Maurice <vigith@gmail.com> Signed-off-by: Yashash H L <yashashhl25@gmail.com> Co-authored-by: Yashash H L <yashashhl25@gmail.com>
1 parent f18d643 commit 53a87ee

File tree

4 files changed

+117
-18
lines changed

4 files changed

+117
-18
lines changed

docs/user-guide/reference/conditional-forwarding.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,31 @@ Below is a list of different logic operations that can be done on tags.
77
- **or** - forwards the message if one of the tags specified is present in Message's tags.
88
- **not** - forwards the message if all the tags specified are not present in Message's tags.
99

10-
For example, there's a UDF used to process numbers, and forward the result to different vertices based on the number is even or odd. In this case, you can set the `tag` to `even-tag` or `odd-tag` in each of the returned messages,
10+
For example, there's a UDF used to process numbers, and forward the result to different vertices based on the number is
11+
even or odd. In this case, you can set the `tag` to `even-tag` or `odd-tag` in each of the returned messages,
1112
and define the edges as below:
1213

14+
## Default Behavior
15+
16+
* If no `conditions` are specified in the spec, the message will be forwarded to all the downstream vertices (independent
17+
of the `tags` in the `Messages`).
18+
* In the code, if the `Messages` are not tagged but conditions are configured, we will still honour the edge conditions.
19+
20+
## Syntax
21+
22+
```yaml
23+
edges:
24+
- from: ...
25+
to: ...
26+
conditions:
27+
tags:
28+
operator: ...
29+
values:
30+
- ...
31+
```
32+
33+
## Example
34+
1335
```yaml
1436
edges:
1537
- from: p1

pkg/reconciler/validator/pipeline_validate.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
133133
} else {
134134
toFromEdge[e.From+e.To] = true
135135
}
136+
137+
if e.Conditions != nil {
138+
if e.Conditions.Tags != nil {
139+
if len(e.Conditions.Tags.Values) == 0 {
140+
return fmt.Errorf("invalid edge: conditional forwarding requires at least one tag value")
141+
}
142+
}
143+
}
136144
}
137145

138146
if len(namesInEdges) != len(names) {

pkg/reconciler/validator/pipeline_validate_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,17 @@ func TestValidatePipeline(t *testing.T) {
520520
assert.NoError(t, err)
521521
})
522522

523+
t.Run("no tag values conditional forwarding", func(t *testing.T) {
524+
testObj := testPipeline.DeepCopy()
525+
operatorOr := dfv1.LogicOperatorOr
526+
testObj.Spec.Edges[1].Conditions = &dfv1.ForwardConditions{Tags: &dfv1.TagConditions{
527+
Operator: &operatorOr,
528+
Values: []string{}}}
529+
err := ValidatePipeline(testObj)
530+
assert.Error(t, err)
531+
assert.Contains(t, err.Error(), "invalid edge: conditional forwarding requires at least one tag value")
532+
})
533+
523534
t.Run("allow conditional forwarding from source vertex or udf vertex", func(t *testing.T) {
524535
testObj := testPipeline.DeepCopy()
525536
operatorOr := dfv1.LogicOperatorOr

rust/numaflow-core/src/shared/forward.rs

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,29 @@ use std::sync::Arc;
44
use numaflow_models::models::ForwardConditions;
55

66
/// Checks if the message should be written to downstream vertex based on the conditions
7-
/// and message tags. If no tags are provided but there are edge conditions present, we will
8-
/// still forward to all vertices.
7+
/// and message tags. If no tags are provided, we treat it as empty tags and still perform
8+
/// the condition check.
99
pub(crate) fn should_forward(
1010
tags: Option<Arc<[String]>>,
1111
conditions: Option<Box<ForwardConditions>>,
1212
) -> bool {
13-
conditions.is_none_or(|conditions| {
14-
conditions.tags.operator.as_ref().is_none_or(|operator| {
15-
tags.as_ref().is_none_or(|tags| {
16-
!conditions.tags.values.is_empty()
17-
&& check_operator_condition(operator, &conditions.tags.values, tags)
18-
})
19-
})
20-
})
13+
// we should forward the message to downstream vertex if there are no edge conditions
14+
let Some(conditions) = conditions else {
15+
return true;
16+
};
17+
18+
// Return true if there are no tags in the edge condition
19+
if conditions.tags.values.is_empty() {
20+
return true;
21+
}
22+
23+
// Treat missing tags as empty and check the condition
24+
let tags = tags.unwrap_or_else(|| Arc::from(vec![]));
25+
// Default operator is "or", if not specified
26+
let operator = conditions.tags.operator.as_deref().unwrap_or("or");
27+
check_operator_condition(operator, &conditions.tags.values, &tags)
2128
}
29+
2230
/// Determine the partition to write the message to by hashing the message id.
2331
pub(crate) fn determine_partition(
2432
message_id: String,
@@ -72,13 +80,6 @@ mod tests {
7280
assert!(result);
7381
}
7482

75-
#[tokio::test]
76-
async fn test_evaluate_write_condition_no_tags() {
77-
let conditions = ForwardConditions::new(TagConditions::new(vec!["tag1".to_string()]));
78-
let result = should_forward(None, Some(Box::new(conditions)));
79-
assert!(result);
80-
}
81-
8283
#[tokio::test]
8384
async fn test_evaluate_write_condition_and_operator() {
8485
let mut tag_conditions = TagConditions::new(vec!["tag1".to_string(), "tag2".to_string()]);
@@ -108,4 +109,61 @@ mod tests {
108109
let result = should_forward(tags, Some(Box::new(conditions)));
109110
assert!(result);
110111
}
112+
113+
#[tokio::test]
114+
async fn test_empty_tags_with_and_operator() {
115+
let mut tag_conditions = TagConditions::new(vec!["tag1".to_string(), "tag2".to_string()]);
116+
tag_conditions.operator = Some("and".to_string());
117+
let conditions = ForwardConditions::new(tag_conditions);
118+
119+
// Empty tags array (explicit empty)
120+
let tags = Some(Arc::from(Vec::<String>::new()));
121+
let result = should_forward(tags, Some(Box::new(conditions.clone())));
122+
assert!(!result, "AND condition should fail with empty tags");
123+
124+
// None tags
125+
let result = should_forward(None, Some(Box::new(conditions)));
126+
assert!(!result, "AND condition should fail with None tags");
127+
}
128+
129+
#[tokio::test]
130+
async fn test_empty_tags_with_or_operator() {
131+
let mut tag_conditions = TagConditions::new(vec!["tag1".to_string(), "tag2".to_string()]);
132+
tag_conditions.operator = Some("or".to_string());
133+
let conditions = ForwardConditions::new(tag_conditions);
134+
135+
// Empty tags array (explicit empty)
136+
let tags = Some(Arc::from(Vec::<String>::new()));
137+
let result = should_forward(tags, Some(Box::new(conditions.clone())));
138+
assert!(!result, "OR condition should fail with empty tags");
139+
140+
// None tags
141+
let result = should_forward(None, Some(Box::new(conditions)));
142+
assert!(!result, "OR condition should fail with None tags");
143+
}
144+
145+
#[tokio::test]
146+
async fn test_empty_tags_with_not_operator() {
147+
let mut tag_conditions = TagConditions::new(vec!["tag1".to_string(), "tag2".to_string()]);
148+
tag_conditions.operator = Some("not".to_string());
149+
let conditions = ForwardConditions::new(tag_conditions);
150+
151+
// Empty tags array (explicit empty)
152+
let tags = Some(Arc::from(Vec::<String>::new()));
153+
let result = should_forward(tags, Some(Box::new(conditions.clone())));
154+
assert!(result, "NOT condition should pass with empty tags");
155+
156+
// None tags
157+
let result = should_forward(None, Some(Box::new(conditions)));
158+
assert!(result, "NOT condition should pass with None tags");
159+
}
160+
161+
#[tokio::test]
162+
async fn test_default_operator() {
163+
let tag_conditions = TagConditions::new(vec!["tag1".to_string(), "tag2".to_string()]);
164+
let conditions = ForwardConditions::new(tag_conditions);
165+
let tags = Some(Arc::from(vec!["tag1".to_string(), "tag2".to_string()]));
166+
let result = should_forward(tags, Some(Box::new(conditions)));
167+
assert!(result);
168+
}
111169
}

0 commit comments

Comments
 (0)