The Bug
Despite its name, ParallelAgent executes its sub-agents sequentially on a single thread rather than in parallel. This defeats the purpose of using a parallel agent, as total execution time equals the sum of all individual agent execution times rather than the longest single execution time.
To Reproduce
- Configure a
ParallelAgent with multiple sub-agents.
- Trigger the agent execution using
Runner.runAsync(...) and subscribe to the resulting Flowable.
- Observe the logs or execution times: the sub-agents process their requests one after the other, not simultaneously.
Root Cause Analysis
The bug is located in ParallelAgent.java inside the runAsyncImpl method.
- The method maps over
currentSubAgents and calls runAsync on each to create a list of Flowable<Event> streams.
- It then passes this list directly into
Flowable.merge(...).
- Because it does not explicitly apply a concurrent scheduler (such as
.subscribeOn(Schedulers.io())) to the individual sub-agent flows before merging them, RxJava defaults to executing the merged "cold" streams sequentially on the caller's thread.
Expected behavior
ParallelAgent should internally handle thread scheduling for its sub-agents (e.g., dispatching them to an I/O thread pool before the merge step) so that Flowable.merge() can actually process their streams concurrently.
The Bug
Despite its name,
ParallelAgentexecutes its sub-agents sequentially on a single thread rather than in parallel. This defeats the purpose of using a parallel agent, as total execution time equals the sum of all individual agent execution times rather than the longest single execution time.To Reproduce
ParallelAgentwith multiple sub-agents.Runner.runAsync(...)and subscribe to the resultingFlowable.Root Cause Analysis
The bug is located in
ParallelAgent.javainside therunAsyncImplmethod.currentSubAgentsand callsrunAsyncon each to create a list ofFlowable<Event>streams.Flowable.merge(...)..subscribeOn(Schedulers.io())) to the individual sub-agent flows before merging them, RxJava defaults to executing the merged "cold" streams sequentially on the caller's thread.Expected behavior
ParallelAgentshould internally handle thread scheduling for its sub-agents (e.g., dispatching them to an I/O thread pool before the merge step) so thatFlowable.merge()can actually process their streams concurrently.