Skip to content

Commit a902af1

Browse files
authored
fix: set tags before context manager closes (#8911)
1 parent af3c89a commit a902af1

File tree

1 file changed

+21
-21
lines changed

1 file changed

+21
-21
lines changed

haystack/core/pipeline/async_pipeline.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -212,29 +212,29 @@ async def _run_component_async(component_name: str, component_inputs: Dict[str,
212212
loop = asyncio.get_running_loop()
213213
outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs))
214214

215-
component_visits[component_name] += 1
216-
217-
if not isinstance(outputs, dict):
218-
raise PipelineRuntimeError(
219-
f"Component '{component_name}' returned an invalid output type. "
220-
f"Expected a dict, but got {type(outputs).__name__} instead. "
215+
component_visits[component_name] += 1
216+
217+
if not isinstance(outputs, dict):
218+
raise PipelineRuntimeError(
219+
f"Component '{component_name}' returned an invalid output type. "
220+
f"Expected a dict, but got {type(outputs).__name__} instead. "
221+
)
222+
223+
span.set_tag("haystack.component.visits", component_visits[component_name])
224+
span.set_content_tag("haystack.component.outputs", deepcopy(outputs))
225+
226+
# Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
227+
pruned = self._write_component_outputs(
228+
component_name=component_name,
229+
component_outputs=outputs,
230+
inputs=inputs_state,
231+
receivers=cached_receivers[component_name],
232+
include_outputs_from=include_outputs_from,
221233
)
234+
if pruned:
235+
pipeline_outputs[component_name] = pruned
222236

223-
span.set_tag("haystack.component.visits", component_visits[component_name])
224-
span.set_content_tag("haystack.component.outputs", deepcopy(outputs))
225-
226-
# Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
227-
pruned = self._write_component_outputs(
228-
component_name=component_name,
229-
component_outputs=outputs,
230-
inputs=inputs_state,
231-
receivers=cached_receivers[component_name],
232-
include_outputs_from=include_outputs_from,
233-
)
234-
if pruned:
235-
pipeline_outputs[component_name] = pruned
236-
237-
return pruned
237+
return pruned
238238

239239
async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[str, Any]]:
240240
"""

0 commit comments

Comments
 (0)