-
Notifications
You must be signed in to change notification settings - Fork 140
Description
把NewTool改成了NewStreamTool
`userInfoTool := utils.NewStreamTool(
&schema.ToolInfo{
Name: "user_info",
Desc: "根据用户的姓名和邮箱,查询用户的公司、职位、薪酬信息",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"name": {
Type: "string",
Desc: "用户的姓名",
},
"email": {
Type: "string",
Desc: "用户的邮箱",
},
}),
},
//func(ctx context.Context, input *userInfoRequest) (output *userInfoResponse, err error) {
// return &userInfoResponse{
// Name: input.Name,
// Email: input.Email,
// Company: "Awesome company",
// Position: "CEO",
// Salary: "9999",
// }, nil
//})
func(ctx context.Context, input *userInfoRequest) (output *schema.StreamReader[*userInfoResponse], err error) {
sr, sw := schema.Pipe[*userInfoResponse](1)
go func() {
defer sw.Close()
sw.Send(&userInfoResponse{
Name: input.Name,
Email: input.Email,
Company: "Awesome company",
Position: "CEO",
Salary: "9999",
}, nil)
}()
return sr, nil
})`
最终执行r.Stream报错
`sr, err := r.Stream(ctx, map[string]any{
"message_histories": []*schema.Message{},
"user_query": "我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产",
}, compose.WithCallbacks(&loggerCallbacks{}))
if err != nil {
logs.Errorf("failed to stream: %v", err)
return
}
defer sr.Close() // remember to close the stream
logs.Infof("\n\n===== start streaming =====\n\n")
for {
msg, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
// error
logs.Infof("failed to recv: %v", err)
return
}
// 打字机打印
logs.Tokenf("%v", msg)
}`
报错信息为execute node[tools] fail: default implementation: 'TransformByStream' got error, when try to concat stream items, err:
stream reader is empty, concat fail。但是我的tool func明确是返回了一个mock的userInfoResponse对象