Skip to content
57 changes: 57 additions & 0 deletions spring-ai-alibaba-graph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,67 @@
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>

<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-reference-server</artifactId>
<version>${a2a-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-server-common</artifactId>
<version>${a2a-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-client</artifactId>
<version>${a2a-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -182,11 +229,14 @@
<scope>test</scope>
</dependency>

<!-- 移除slf4j-jdk14以避免日志框架冲突 -->
<!--
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
-->

<dependency>
<groupId>org.redisson</groupId>
Expand Down Expand Up @@ -269,6 +319,13 @@
<scope>provided</scope>
</dependency>

<!-- Reactor Test for reactive testing -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<reporting>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.ai.graph.action;

import com.alibaba.cloud.ai.graph.OverAllState;
import reactor.core.publisher.Flux;

import java.util.Map;

public interface StreamingGraphNode extends NodeAction {

/**
* 执行流式节点操作,返回响应式数据流。 这是流式节点的核心方法,用于生成连续的数据流。
* @param state 图的整体状态
* @return 包含图输出数据的响应式流
* @throws Exception 执行过程中可能出现的异常
*/
Flux<Map<String, Object>> executeStreaming(OverAllState state) throws Exception;

/**
* 默认实现,通过流式方法的第一个元素来提供同步兼容性。 该方法确保现有系统的向后兼容性。
* @param state 图的整体状态
* @return 同步执行结果
* @throws Exception 执行过程中可能出现的异常
*/
@Override
default Map<String, Object> apply(OverAllState state) throws Exception {
return executeStreaming(state).blockFirst();
}

/**
* 判断是否为流式节点。 用于GraphEngine区分同步和流式节点的执行方式。
* @return 总是返回true,表示这是一个流式节点
*/
default boolean isStreaming() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.cloud.ai.graph.action.Command;
import com.alibaba.cloud.ai.graph.action.InterruptableAction;
import com.alibaba.cloud.ai.graph.action.InterruptionMetadata;
import com.alibaba.cloud.ai.graph.action.StreamingGraphNode;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.exception.RunnableErrors;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
Expand Down Expand Up @@ -92,6 +93,11 @@ private Flux<GraphResponse<NodeOutput>> executeNode(GraphRunnerContext context,
}
}

// 检查是否为流式节点
if (action instanceof StreamingGraphNode) {
return executeStreamingNode((StreamingGraphNode) action, context, resultValue);
}

context.doListeners(NODE_BEFORE, null);

CompletableFuture<Map<String, Object>> future = action.apply(context.getOverallState(),
Expand Down Expand Up @@ -403,4 +409,55 @@ private Flux<GraphResponse<NodeOutput>> handleEmbeddedGenerator(GraphRunnerConte
}));
}

/**
* 执行流式节点,处理响应式数据流。
* @param streamingNode 流式节点实例
* @param context 图运行上下文
* @param resultValue 结果值的原子引用
* @return 流式图响应的Flux
*/
private Flux<GraphResponse<NodeOutput>> executeStreamingNode(StreamingGraphNode streamingNode,
GraphRunnerContext context, AtomicReference<Object> resultValue) {
try {
context.doListeners(NODE_BEFORE, null);

// 执行流式节点
Flux<Map<String, Object>> streamingFlux = streamingNode.executeStreaming(context.getOverallState());

return streamingFlux.map(output -> {
try {
// 为每个流元素创建NodeOutput
NodeOutput nodeOutput = context.buildNodeOutput(context.getCurrentNodeId());
return GraphResponse.of(nodeOutput);
}
catch (Exception e) {
return GraphResponse.<NodeOutput>error(e);
}
}).concatWith(Flux.defer(() -> {
// 流结束后处理下一步
context.doListeners(NODE_AFTER, null);

try {
// 获取流的最后一个结果作为节点的最终输出
// 注意:这里的逻辑假设流式节点会在最后一个元素中包含完整的状态更新
Command nextCommand = context.nextNodeId(context.getCurrentNodeId(), context.getCurrentState());
context.setNextNodeId(nextCommand.gotoNode());
context.updateCurrentState(nextCommand.update());

return mainGraphExecutor.execute(context, resultValue);
}
catch (Exception e) {
return Flux.just(GraphResponse.<NodeOutput>error(e));
}
})).onErrorResume(error -> {
context.doListeners(NODE_AFTER, null);
return Flux.just(GraphResponse.<NodeOutput>error(error));
});

}
catch (Exception e) {
return Flux.just(GraphResponse.<NodeOutput>error(e));
}
}

}
Loading
Loading