-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(graph): add streamhttpnode #2438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mengnankkkk
wants to merge
9
commits into
alibaba:main
Choose a base branch
from
mengnankkkk:feat-streamhttpnode
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,699
−0
Open
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
4e47724
feat(graph): add streamhttpnode
mengnankkkk cdaf12c
Update StreamHttpNode.java
mengnankkkk c445dc4
fix(graph):remove studio streamhttpnode
mengnankkkk 9ce538f
fix(graph): ci bug
mengnankkkk f1659ae
feat(graph): fix ci bug
mengnankkkk 0776691
Merge branch 'main' into feat-streamhttpnode
mengnankkkk 29003bd
fix ci bug
mengnankkkk cbfdec9
fix: use AsyncNodeAction
mengnankkkk d6a112f
del ara
mengnankkkk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
52 changes: 52 additions & 0 deletions
52
...libaba-graph-core/src/main/java/com/alibaba/cloud/ai/graph/action/StreamingGraphNode.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright 2024-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.alibaba.cloud.ai.graph.action; | ||
|
||
import com.alibaba.cloud.ai.graph.OverAllState; | ||
import reactor.core.publisher.Flux; | ||
|
||
import java.util.Map; | ||
|
||
public interface StreamingGraphNode extends NodeAction { | ||
|
||
/** | ||
* 执行流式节点操作,返回响应式数据流。 这是流式节点的核心方法,用于生成连续的数据流。 | ||
* @param state 图的整体状态 | ||
* @return 包含图输出数据的响应式流 | ||
* @throws Exception 执行过程中可能出现的异常 | ||
*/ | ||
Flux<Map<String, Object>> executeStreaming(OverAllState state) throws Exception; | ||
|
||
/** | ||
* 默认实现,通过流式方法的第一个元素来提供同步兼容性。 该方法确保现有系统的向后兼容性。 | ||
* @param state 图的整体状态 | ||
* @return 同步执行结果 | ||
* @throws Exception 执行过程中可能出现的异常 | ||
*/ | ||
@Override | ||
default Map<String, Object> apply(OverAllState state) throws Exception { | ||
return executeStreaming(state).blockFirst(); | ||
} | ||
|
||
/** | ||
* 判断是否为流式节点。 用于GraphEngine区分同步和流式节点的执行方式。 | ||
* @return 总是返回true,表示这是一个流式节点 | ||
*/ | ||
default boolean isStreaming() { | ||
return true; | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
85 changes: 85 additions & 0 deletions
85
...ba-graph-core/src/main/java/com/alibaba/cloud/ai/graph/streaming/StreamHttpException.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright 2024-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.alibaba.cloud.ai.graph.streaming; | ||
|
||
/** | ||
* StreamHttpNode专用异常类 提供更详细的流式HTTP处理异常信息 | ||
*/ | ||
public class StreamHttpException extends RuntimeException { | ||
|
||
private final String nodeId; | ||
|
||
private final int httpStatus; | ||
|
||
private final String url; | ||
|
||
public StreamHttpException(String nodeId, String url, String message) { | ||
this(nodeId, url, -1, message, null); | ||
} | ||
|
||
public StreamHttpException(String nodeId, String url, String message, Throwable cause) { | ||
this(nodeId, url, -1, message, cause); | ||
} | ||
|
||
public StreamHttpException(String nodeId, String url, int httpStatus, String message, Throwable cause) { | ||
super(String.format("StreamHttpNode[%s] failed: %s (URL: %s, Status: %d)", nodeId, message, url, httpStatus), | ||
cause); | ||
this.nodeId = nodeId; | ||
this.httpStatus = httpStatus; | ||
this.url = url; | ||
} | ||
|
||
public String getNodeId() { | ||
return nodeId; | ||
} | ||
|
||
public int getHttpStatus() { | ||
return httpStatus; | ||
} | ||
|
||
public String getUrl() { | ||
return url; | ||
} | ||
|
||
/** | ||
* 创建网络异常 | ||
*/ | ||
public static StreamHttpException networkError(String nodeId, String url, Throwable cause) { | ||
return new StreamHttpException(nodeId, url, "Network connection failed", cause); | ||
} | ||
|
||
/** | ||
* 创建HTTP状态异常 | ||
*/ | ||
public static StreamHttpException httpError(String nodeId, String url, int status, String message) { | ||
return new StreamHttpException(nodeId, url, status, "HTTP error: " + message, null); | ||
} | ||
|
||
/** | ||
* 创建数据解析异常 | ||
*/ | ||
public static StreamHttpException parseError(String nodeId, String url, String message, Throwable cause) { | ||
return new StreamHttpException(nodeId, url, "Data parsing failed: " + message, cause); | ||
} | ||
|
||
/** | ||
* 创建超时异常 | ||
*/ | ||
public static StreamHttpException timeoutError(String nodeId, String url, String message) { | ||
return new StreamHttpException(nodeId, url, "Request timeout: " + message, null); | ||
} | ||
|
||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.