Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class CodeExecutorProperties {
/**
* Container network mode
*/
String networkMode = "bridge";
String networkMode = "none";

public CodePoolExecutorEnum getCodePoolExecutor() {
return codePoolExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,25 @@ public Map<String, Object> apply(OverAllState state) throws Exception {
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}
log.info("Python Execute Success! StdOut: {}", taskResponse.stdOut());

// Python输出的JSON字符串可能有Unicode转义形式,需要解析回汉字
String stdout = taskResponse.stdOut();
try {
Object value = objectMapper.readValue(stdout, Object.class);
stdout = objectMapper.writeValueAsString(value);
}
catch (Exception e) {
stdout = taskResponse.stdOut();
}
String finalStdout = stdout;

log.info("Python Execute Success! StdOut: {}", finalStdout);

// Create display flux for user experience only
Flux<ChatResponse> displayFlux = Flux.create(emitter -> {
emitter.next(ChatResponseUtil.createStatusResponse("开始执行Python代码..."));
emitter.next(ChatResponseUtil.createStatusResponse("标准输出:\n```"));
emitter.next(ChatResponseUtil.createStatusResponse(taskResponse.stdOut()));
emitter.next(ChatResponseUtil.createStatusResponse(finalStdout));
emitter.next(ChatResponseUtil.createStatusResponse("\n```"));
emitter.next(ChatResponseUtil.createStatusResponse("Python代码执行成功!"));
emitter.complete();
Expand All @@ -91,14 +103,14 @@ public Map<String, Object> apply(OverAllState state) throws Exception {
// Create generator using utility class, returning pre-computed business logic
// result
var generator = StreamingChatGeneratorUtil.createStreamingGeneratorWithMessages(this.getClass(), state,
v -> Map.of(PYTHON_EXECUTE_NODE_OUTPUT, taskResponse.stdOut(), PYTHON_IS_SUCCESS, true),
displayFlux, StreamResponseType.PYTHON_EXECUTE);
v -> Map.of(PYTHON_EXECUTE_NODE_OUTPUT, finalStdout, PYTHON_IS_SUCCESS, true), displayFlux,
StreamResponseType.PYTHON_EXECUTE);

return Map.of(PYTHON_EXECUTE_NODE_OUTPUT, generator);
}
catch (Exception e) {
String errorMessage = e.getMessage();
log.error("Python Execute Exception: {}", errorMessage, e);
log.error("Python Execute Exception: {}", errorMessage);

// Prepare error result
Map<String, Object> errorResult = Map.of(PYTHON_EXECUTE_NODE_OUTPUT, errorMessage, PYTHON_IS_SUCCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public Map<String, Object> apply(OverAllState state) throws Exception {
.stream()
.chatResponse();

var generator = StreamingChatGeneratorUtil.createStreamingGeneratorWithMessages(this.getClass(), state,
"正在生成Python代码...", "Python代码生成完成。", aiResponse -> {
var generator = StreamingChatGeneratorUtil.createStreamingGeneratorWithMessages(this.getClass(), state, "", "",
aiResponse -> {
// Some AI models still output Markdown markup (even though Prompt has
// emphasized this)
aiResponse = MarkdownParser.extractRawText(aiResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
*/
public enum CodePoolExecutorEnum {

DOCKER, CONTAINERD, KATA, AI_SIMULATION;
DOCKER, CONTAINERD, KATA, AI_SIMULATION, LOCAL;

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,22 @@ record TaskRequest(String code, String input, String requirement) {

}

record TaskResponse(boolean isSuccess, String stdOut, String stdErr, String exceptionMsg) {
public static TaskResponse error(String msg) {
return new TaskResponse(false, null, null, "An exception occurred while executing the task: " + msg);
record TaskResponse(boolean isSuccess, boolean executionSuccessButResultFailed, String stdOut, String stdErr,
String exceptionMsg) {

// 执行运行代码任务时发生异常
public static TaskResponse exception(String msg) {
return new TaskResponse(false, false, null, null, "An exception occurred while executing the task: " + msg);
}

// 执行运行代码任务成功,并且代码正常返回
public static TaskResponse success(String stdOut) {
return new TaskResponse(true, false, stdOut, null, null);
}

// 执行运行代码任务成功,但是代码异常返回
public static TaskResponse failure(String stdOut, String stdErr) {
return new TaskResponse(false, true, stdOut, stdErr, "StdErr: " + stdErr);
}

@Override
Expand All @@ -44,7 +57,7 @@ public String toString() {

enum State {

READY, RUNNING
READY, RUNNING, REMOVING

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.cloud.ai.config.CodeExecutorProperties;
import com.alibaba.cloud.ai.service.code.impl.AiSimulationCodeExecutorService;
import com.alibaba.cloud.ai.service.code.impl.DockerCodePoolExecutorService;
import com.alibaba.cloud.ai.service.code.impl.LocalCodePoolExecutorService;
import org.springframework.ai.chat.client.ChatClient;

/**
Expand All @@ -35,15 +36,13 @@ private CodePoolExecutorServiceFactory() {

public static CodePoolExecutorService newInstance(CodeExecutorProperties properties,
ChatClient.Builder chatClientBuilder) {
if (properties.getCodePoolExecutor().equals(CodePoolExecutorEnum.DOCKER)) {
return new DockerCodePoolExecutorService(properties);
}
else if (properties.getCodePoolExecutor().equals(CodePoolExecutorEnum.AI_SIMULATION)) {
return new AiSimulationCodeExecutorService(chatClientBuilder);
}
else {
throw new IllegalArgumentException("Unknown container impl: " + properties.getCodePoolExecutor());
}
return switch (properties.getCodePoolExecutor()) {
case DOCKER -> new DockerCodePoolExecutorService(properties);
case LOCAL -> new LocalCodePoolExecutorService(properties);
case AI_SIMULATION -> new AiSimulationCodeExecutorService(chatClientBuilder);
default -> throw new UnsupportedOperationException(
"This option does not have a corresponding implementation class yet.");
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,38 @@ public AbstractCodePoolExecutorService(CodeExecutorProperties properties) {
}));
}

/**
* 创建新的容器
* @return 容器ID
*/
protected abstract String createNewContainer() throws Exception;

protected abstract TaskResponse execTaskInContainer(TaskRequest request, String containerId) throws Exception;
/**
* 在指定容器ID的容器运行任务
* @param request 任务请求对象
* @param containerId 容器ID
* @return 运行结果对象
*/
protected abstract TaskResponse execTaskInContainer(TaskRequest request, String containerId);

/**
* 停止指定容器
* @param containerId 容器ID
*/
protected abstract void stopContainer(String containerId) throws Exception;

/**
* 删除指定容器
* @param containerId 容器ID
*/
protected abstract void removeContainer(String containerId) throws Exception;

protected void shutdownPool() throws Exception {
// Shutdown thread pool
this.consumerThreadPool.shutdownNow();
// Stop and delete all containers
for (String containerId : this.tempContainerState.keySet()) {
try {
this.stopContainer(containerId);
this.removeContainer(containerId);
}
catch (Exception ignored) {

}
}
for (String containerId : this.coreContainerState.keySet()) {
try {
this.stopContainer(containerId);
this.removeContainer(containerId);
}
catch (Exception ignored) {

}
}
this.tempContainerState.keySet().forEach(id -> this.removeContainerAndState(id, false, true));
this.coreContainerState.keySet().forEach(id -> this.removeContainerAndState(id, true, true));
this.tempContainerState.clear();
this.coreContainerState.clear();
this.tempContainerRemoveFuture.clear();
Expand All @@ -142,6 +144,48 @@ protected void shutdownPool() throws Exception {
this.taskQueue.clear();
}

private void removeContainerAndState(String containerId, boolean isCore, boolean isForce) {
try {
if (isCore) {
// Remove core container
State state = this.coreContainerState.replace(containerId, State.REMOVING);
if (state == State.RUNNING) {
if (isForce) {
this.stopContainer(containerId);
}
else {
throw new RuntimeException("Container is still Running!");
}
}
this.removeContainer(containerId);
this.coreContainerState.remove(containerId);
this.currentCoreContainerSize.decrementAndGet();
log.info("Core Container {} has been removed successfully", containerId);
}
else {
// Remove temporary container
State state = this.tempContainerState.replace(containerId, State.REMOVING);
if (state == State.RUNNING) {
if (isForce) {
this.stopContainer(containerId);
}
else {
throw new RuntimeException("Container is still Running!");
}
}
this.removeContainer(containerId);
this.tempContainerState.remove(containerId);
this.tempContainerRemoveFuture.remove(containerId);
this.currentTempContainerSize.decrementAndGet();
log.info("Temp Container {} has been removed successfully", containerId);
}
}
catch (Exception e) {
log.error("Error when trying to remove a container, containerId: {}, info: {}", containerId, e.getMessage(),
e);
}
}

// Create thread to delete temporary containers
private Future<?> registerRemoveTempContainer(String containerId) {
return consumerThreadPool.submit(() -> {
Expand All @@ -156,17 +200,7 @@ private Future<?> registerRemoveTempContainer(String containerId) {
log.debug("Interrupted while waiting for temp container to be removed, info: {}", e.getMessage());
return;
}
try {
// Remove temporary container
this.tempContainerState.remove(containerId);
this.tempContainerRemoveFuture.remove(containerId);
this.removeContainer(containerId);
log.debug("Container {} has been removed successfully", containerId);
}
catch (Exception e) {
log.error("Error when trying to register temp container to be removed, containerId: {}, info: {}",
containerId, e.getMessage(), e);
}
this.removeContainerAndState(containerId, false, false);
});
}

Expand All @@ -176,6 +210,13 @@ private TaskResponse useCoreContainer(String containerId, TaskRequest request) {
// Execute task
this.coreContainerState.replace(containerId, State.RUNNING);
TaskResponse resp = this.execTaskInContainer(request, containerId);
// 如果运行代码任务时出现了异常,认为容器损坏,执行容器清除,并将当前任务放进队列里重新执行
if (!resp.isSuccess() && !resp.executionSuccessButResultFailed()) {
log.error("use core container failed, {}", resp.exceptionMsg());
this.coreContainerState.replace(containerId, State.REMOVING);
this.removeContainerAndState(containerId, true, true);
return this.pushTaskQueue(request);
}
this.coreContainerState.replace(containerId, State.READY);
// Put back into blocking queue
this.readyCoreContainer.add(containerId);
Expand All @@ -185,7 +226,7 @@ private TaskResponse useCoreContainer(String containerId, TaskRequest request) {
}
catch (Exception e) {
log.error("use core container failed, {}", e.getMessage(), e);
return TaskResponse.error(e.getMessage());
return TaskResponse.exception(e.getMessage());
}
}

Expand All @@ -205,6 +246,13 @@ private TaskResponse useTempContainer(String containerId, TaskRequest request) {
// Execute task
this.tempContainerState.replace(containerId, State.RUNNING);
TaskResponse resp = this.execTaskInContainer(request, containerId);
// 如果运行代码任务时出现了异常,认为容器损坏,执行容器清除,并将当前任务放进队列里重新执行
if (!resp.isSuccess() && !resp.executionSuccessButResultFailed()) {
log.error("use temp container failed, {}", resp.exceptionMsg());
this.tempContainerState.replace(containerId, State.REMOVING);
this.removeContainerAndState(containerId, false, true);
return this.pushTaskQueue(request);
}
this.tempContainerState.replace(containerId, State.READY);
// Put back into blocking queue
this.readyTempContainer.add(containerId);
Expand All @@ -216,7 +264,7 @@ private TaskResponse useTempContainer(String containerId, TaskRequest request) {
}
catch (Exception e) {
log.error("use temp container failed, {}", e.getMessage(), e);
return TaskResponse.error(e.getMessage());
return TaskResponse.exception(e.getMessage());
}
}

Expand All @@ -228,7 +276,7 @@ private TaskResponse createAndUseCoreContainer(TaskRequest request) {
}
catch (Exception e) {
log.error("create new container failed, {}", e.getMessage(), e);
return TaskResponse.error(e.getMessage());
return TaskResponse.exception(e.getMessage());
}
// Record newly added container
this.coreContainerState.put(containerId, State.READY);
Expand All @@ -244,7 +292,7 @@ private TaskResponse createAndUseTempContainer(TaskRequest request) {
}
catch (Exception e) {
log.error("create new container failed, {}", e.getMessage(), e);
return TaskResponse.error(e.getMessage());
return TaskResponse.exception(e.getMessage());
}
// Record newly added container
this.tempContainerState.put(containerId, State.READY);
Expand Down Expand Up @@ -326,7 +374,7 @@ public TaskResponse runTask(TaskRequest request) {
}
catch (Exception e) {
log.error("An exception occurred while executing the task: {}", e.getMessage(), e);
return TaskResponse.error(e.getMessage());
return TaskResponse.exception(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public TaskResponse runTask(TaskRequest request) {
```
""", request.code(), request.input());
String output = chatClient.prompt().user(userPrompt).call().content();
return new TaskResponse(true, output, null, null);
return TaskResponse.success(output);
}

}
Loading
Loading