Skip to content

Commit 76753fd

Browse files
authored
fix: reduce number of edge cases where lazy variadic components wait for inputs that can't arrive anymore (#8907)
* wip * fix: running order with lazy variadic components * fix: tests * format * comment * fix: alternative approach to fixing running order * unused imports * revert fix * remove unneeded return * remove data based approach to tie breaking * release note * trailing spaces * newline eof * unused import * add more explanations to release note
1 parent a902af1 commit 76753fd

File tree

7 files changed

+381
-18
lines changed

7 files changed

+381
-18
lines changed

haystack/core/pipeline/async_pipeline.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ async def process_results():
144144
ordered_names = sorted(self.graph.nodes.keys())
145145
cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
146146
component_visits = {component_name: 0 for component_name in ordered_names}
147+
cached_topological_sort = None
147148

148149
# We fill the queue once and raise if all components are BLOCKED
149150
self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
@@ -385,6 +386,15 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
385386

386387
# We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
387388
elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
389+
if len(priority_queue) > 0:
390+
component_name, topological_sort = self._tiebreak_waiting_components(
391+
component_name=component_name,
392+
priority=priority,
393+
priority_queue=priority_queue,
394+
topological_sort=cached_topological_sort,
395+
)
396+
cached_topological_sort = topological_sort
397+
388398
await _schedule_task(component_name)
389399

390400
# To make progress, we wait for one task to complete before re-starting the loop

haystack/core/pipeline/base.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from datetime import datetime
99
from enum import IntEnum
1010
from pathlib import Path
11-
from typing import Any, Dict, Iterator, List, Optional, TextIO, Tuple, Type, TypeVar, Union
11+
from typing import Any, Dict, Iterator, List, Optional, Set, TextIO, Tuple, Type, TypeVar, Union
1212

1313
import networkx # type:ignore
1414

@@ -1042,24 +1042,75 @@ def _add_missing_input_defaults(component_inputs: Dict[str, Any], component_inpu
10421042

10431043
return component_inputs
10441044

1045+
def _tiebreak_waiting_components(
1046+
self,
1047+
component_name: str,
1048+
priority: ComponentPriority,
1049+
priority_queue: FIFOPriorityQueue,
1050+
topological_sort: Union[Dict[str, int], None],
1051+
):
1052+
"""
1053+
Decides which component to run when multiple components are waiting for inputs with the same priority.
1054+
1055+
:param component_name: The name of the component.
1056+
:param priority: Priority of the component.
1057+
:param priority_queue: Priority queue of component names.
1058+
:param topological_sort: Cached topological sort of all components in the pipeline.
1059+
"""
1060+
components_with_same_priority = [component_name]
1061+
1062+
while len(priority_queue) > 0:
1063+
next_priority, next_component_name = priority_queue.peek()
1064+
if next_priority == priority:
1065+
priority_queue.pop() # actually remove the component
1066+
components_with_same_priority.append(next_component_name)
1067+
else:
1068+
break
1069+
1070+
if len(components_with_same_priority) > 1:
1071+
if topological_sort is None:
1072+
if networkx.is_directed_acyclic_graph(self.graph):
1073+
topological_sort = networkx.lexicographical_topological_sort(self.graph)
1074+
topological_sort = {node: idx for idx, node in enumerate(topological_sort)}
1075+
else:
1076+
condensed = networkx.condensation(self.graph)
1077+
condensed_sorted = {node: idx for idx, node in enumerate(networkx.topological_sort(condensed))}
1078+
topological_sort = {
1079+
component_name: condensed_sorted[node]
1080+
for component_name, node in condensed.graph["mapping"].items()
1081+
}
1082+
1083+
components_with_same_priority = sorted(
1084+
components_with_same_priority, key=lambda comp_name: (topological_sort[comp_name], comp_name.lower())
1085+
)
1086+
1087+
component_name = components_with_same_priority[0]
1088+
1089+
return component_name, topological_sort
1090+
10451091
@staticmethod
10461092
def _write_component_outputs(
1047-
component_name, component_outputs, inputs, receivers, include_outputs_from
1093+
component_name: str,
1094+
component_outputs: Dict[str, Any],
1095+
inputs: Dict[str, Any],
1096+
receivers: List[Tuple],
1097+
include_outputs_from: Set[str],
10481098
) -> Dict[str, Any]:
10491099
"""
10501100
Distributes the outputs of a component to the input sockets that it is connected to.
10511101
10521102
:param component_name: The name of the component.
10531103
:param component_outputs: The outputs of the component.
10541104
:param inputs: The current global input state.
1055-
:param receivers: List of receiver_name, sender_socket, receiver_socket for connected components.
1105+
:param receivers: List of components that receive inputs from the component.
10561106
:param include_outputs_from: List of component names that should always return an output from the pipeline.
10571107
"""
10581108
for receiver_name, sender_socket, receiver_socket in receivers:
10591109
# We either get the value that was produced by the actor or we use the _NO_OUTPUT_PRODUCED class to indicate
10601110
# that the sender did not produce an output for this socket.
10611111
# This allows us to track if a pre-decessor already ran but did not produce an output.
10621112
value = component_outputs.get(sender_socket.name, _NO_OUTPUT_PRODUCED)
1113+
10631114
if receiver_name not in inputs:
10641115
inputs[receiver_name] = {}
10651116

haystack/core/pipeline/pipeline.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
import warnings
65
from copy import deepcopy
76
from typing import Any, Dict, Mapping, Optional, Set, cast
87

@@ -209,6 +208,8 @@ def run( # noqa: PLR0915, PLR0912
209208
# We store them here for easy access.
210209
cached_receivers = {name: self._find_receivers_from(name) for name in ordered_component_names}
211210

211+
cached_topological_sort = None
212+
212213
pipeline_outputs: Dict[str, Any] = {}
213214
with tracing.tracer.trace(
214215
"haystack.pipeline.run",
@@ -231,24 +232,23 @@ def run( # noqa: PLR0915, PLR0912
231232
break
232233

233234
priority, component_name, component = candidate
234-
if len(priority_queue) > 0:
235-
next_priority, next_name = priority_queue.peek()
236-
237-
if (
238-
priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
239-
and next_priority == priority
240-
):
241-
msg = (
242-
f"Components '{component_name}' and '{next_name}' are waiting for "
243-
f"optional inputs at the same time. The pipeline will execute '{component_name}' "
244-
f"first based on lexicographical ordering."
245-
)
246-
warnings.warn(msg)
235+
if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]:
236+
component_name, topological_sort = self._tiebreak_waiting_components(
237+
component_name=component_name,
238+
priority=priority,
239+
priority_queue=priority_queue,
240+
topological_sort=cached_topological_sort,
241+
)
242+
cached_topological_sort = topological_sort
243+
component = self._get_component_with_graph_metadata_and_visits(
244+
component_name, component_visits[component_name]
245+
)
247246

248247
component_outputs = self._run_component(component, inputs, component_visits, parent_span=span)
249248

250249
# Updates global input state with component outputs and returns outputs that should go to
251250
# pipeline outputs.
251+
252252
component_pipeline_outputs = self._write_component_outputs(
253253
component_name=component_name,
254254
component_outputs=component_outputs,
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
fixes:
3+
- |
4+
Added a fix to the pipeline's component scheduling alogrithm to reduce edge cases where the execution order of
5+
components that are simultaneously waiting for inputs has an impact on a pipeline's output. We look at topolgical
6+
order first to see which of the waiting components should run first and fall back to lexicographical order when both
7+
components are on the same topology-level. In cyclic pipelines, if the waiting components are in the same cycle,
8+
we fall back to lexicographical order immediately.

test/core/pipeline/features/pipeline_run.feature

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ Feature: Pipeline running
5252
| with a component that has dynamic default inputs |
5353
| with a component that has variadic dynamic default inputs |
5454
| that is a file conversion pipeline with two joiners |
55+
| that is a file conversion pipeline with three joiners |
56+
| that is a file conversion pipeline with three joiners and a loop |
5557
| that has components returning dataframes |
5658

5759
Scenario Outline: Running a bad Pipeline

0 commit comments

Comments
 (0)