Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions spring-ai-alibaba-graph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,67 @@
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>

<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-reference-server</artifactId>
<version>${a2a-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-server-common</artifactId>
<version>${a2a-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-client</artifactId>
<version>${a2a-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -182,11 +229,14 @@
<scope>test</scope>
</dependency>

<!-- 移除slf4j-jdk14以避免日志框架冲突 -->
<!--
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
-->

<dependency>
<groupId>org.redisson</groupId>
Expand Down Expand Up @@ -269,6 +319,13 @@
<scope>provided</scope>
</dependency>

<!-- Reactor Test for reactive testing -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<reporting>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.ai.graph.action;

import com.alibaba.cloud.ai.graph.OverAllState;
import reactor.core.publisher.Flux;

import java.util.Map;

public interface StreamingGraphNode extends NodeAction {

/**
* 执行流式节点操作,返回响应式数据流。 这是流式节点的核心方法,用于生成连续的数据流。
* @param state 图的整体状态
* @return 包含图输出数据的响应式流
* @throws Exception 执行过程中可能出现的异常
*/
Flux<Map<String, Object>> executeStreaming(OverAllState state) throws Exception;

/**
* 默认实现,通过流式方法的第一个元素来提供同步兼容性。 该方法确保现有系统的向后兼容性。
* @param state 图的整体状态
* @return 同步执行结果
* @throws Exception 执行过程中可能出现的异常
*/
@Override
default Map<String, Object> apply(OverAllState state) throws Exception {
return executeStreaming(state).blockFirst();
}

/**
* 判断是否为流式节点。 用于GraphEngine区分同步和流式节点的执行方式。
* @return 总是返回true,表示这是一个流式节点
*/
default boolean isStreaming() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.cloud.ai.graph.action.Command;
import com.alibaba.cloud.ai.graph.action.InterruptableAction;
import com.alibaba.cloud.ai.graph.action.InterruptionMetadata;
import com.alibaba.cloud.ai.graph.action.StreamingGraphNode;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.exception.RunnableErrors;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
Expand Down Expand Up @@ -93,6 +94,11 @@ private Flux<GraphResponse<NodeOutput>> executeNode(GraphRunnerContext context,
}
}

// 检查是否为流式节点
if (action instanceof StreamingGraphNode) {
return executeStreamingNode((StreamingGraphNode) action, context, resultValue);
}

context.doListeners(NODE_BEFORE, null);

CompletableFuture<Map<String, Object>> future = action.apply(context.getOverallState(),
Expand Down Expand Up @@ -404,4 +410,55 @@ private Flux<GraphResponse<NodeOutput>> handleEmbeddedGenerator(GraphRunnerConte
}));
}

/**
* 执行流式节点,处理响应式数据流。
* @param streamingNode 流式节点实例
* @param context 图运行上下文
* @param resultValue 结果值的原子引用
* @return 流式图响应的Flux
*/
private Flux<GraphResponse<NodeOutput>> executeStreamingNode(StreamingGraphNode streamingNode,
GraphRunnerContext context, AtomicReference<Object> resultValue) {
try {
context.doListeners(NODE_BEFORE, null);

// 执行流式节点
Flux<Map<String, Object>> streamingFlux = streamingNode.executeStreaming(context.getOverallState());

return streamingFlux.map(output -> {
try {
// 为每个流元素创建NodeOutput
NodeOutput nodeOutput = context.buildNodeOutput(context.getCurrentNodeId());
return GraphResponse.of(nodeOutput);
}
catch (Exception e) {
return GraphResponse.<NodeOutput>error(e);
}
}).concatWith(Flux.defer(() -> {
// 流结束后处理下一步
context.doListeners(NODE_AFTER, null);

try {
// 获取流的最后一个结果作为节点的最终输出
// 注意:这里的逻辑假设流式节点会在最后一个元素中包含完整的状态更新
Command nextCommand = context.nextNodeId(context.getCurrentNodeId(), context.getCurrentState());
context.setNextNodeId(nextCommand.gotoNode());
context.updateCurrentState(nextCommand.update());

return mainGraphExecutor.execute(context, resultValue);
}
catch (Exception e) {
return Flux.just(GraphResponse.<NodeOutput>error(e));
}
})).onErrorResume(error -> {
context.doListeners(NODE_AFTER, null);
return Flux.just(GraphResponse.<NodeOutput>error(error));
});

}
catch (Exception e) {
return Flux.just(GraphResponse.<NodeOutput>error(e));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.ai.graph.streaming;

/**
* StreamHttpNode专用异常类 提供更详细的流式HTTP处理异常信息
*/
public class StreamHttpException extends RuntimeException {

private final String nodeId;

private final int httpStatus;

private final String url;

public StreamHttpException(String nodeId, String url, String message) {
this(nodeId, url, -1, message, null);
}

public StreamHttpException(String nodeId, String url, String message, Throwable cause) {
this(nodeId, url, -1, message, cause);
}

public StreamHttpException(String nodeId, String url, int httpStatus, String message, Throwable cause) {
super(String.format("StreamHttpNode[%s] failed: %s (URL: %s, Status: %d)", nodeId, message, url, httpStatus),
cause);
this.nodeId = nodeId;
this.httpStatus = httpStatus;
this.url = url;
}

public String getNodeId() {
return nodeId;
}

public int getHttpStatus() {
return httpStatus;
}

public String getUrl() {
return url;
}

/**
* 创建网络异常
*/
public static StreamHttpException networkError(String nodeId, String url, Throwable cause) {
return new StreamHttpException(nodeId, url, "Network connection failed", cause);
}

/**
* 创建HTTP状态异常
*/
public static StreamHttpException httpError(String nodeId, String url, int status, String message) {
return new StreamHttpException(nodeId, url, status, "HTTP error: " + message, null);
}

/**
* 创建数据解析异常
*/
public static StreamHttpException parseError(String nodeId, String url, String message, Throwable cause) {
return new StreamHttpException(nodeId, url, "Data parsing failed: " + message, cause);
}

/**
* 创建超时异常
*/
public static StreamHttpException timeoutError(String nodeId, String url, String message) {
return new StreamHttpException(nodeId, url, "Request timeout: " + message, null);
}

}
Loading
Loading