Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 84 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client interface {

// GetWorkspaceID return workspace id
GetWorkspaceID() string
// Close Close the client. Should be called before program exit.
// Close close the client. Should be called before program exit.
Close(ctx context.Context)
}

Expand All @@ -46,6 +46,7 @@ type HttpClient = httpclient.HTTPClient

type options struct {
apiBaseURL string
apiBasePath *APIBasePath
workspaceID string
httpClient HttpClient
timeout time.Duration
Expand All @@ -62,12 +63,16 @@ type options struct {
promptCacheRefreshInterval time.Duration
promptTrace bool
exporter trace.Exporter
traceFinishEventProcessor func(ctx context.Context, info *FinishEventInfo)
traceTagTruncateConf *TagTruncateConf
traceQueueConf *TraceQueueConf
}

func (o *options) MD5() string {
h := md5.New()
separator := "\t"
h.Write([]byte(o.apiBaseURL + separator))
h.Write([]byte(fmt.Sprintf("%p", o.apiBasePath) + separator))
h.Write([]byte(o.workspaceID + separator))
h.Write([]byte(fmt.Sprintf("%p", o.httpClient) + separator))
h.Write([]byte(o.timeout.String() + separator))
Expand All @@ -80,6 +85,10 @@ func (o *options) MD5() string {
h.Write([]byte(fmt.Sprintf("%d", o.promptCacheMaxCount) + separator))
h.Write([]byte(o.promptCacheRefreshInterval.String() + separator))
h.Write([]byte(fmt.Sprintf("%v", o.promptTrace) + separator))
h.Write([]byte(fmt.Sprintf("%p", o.exporter) + separator))
h.Write([]byte(fmt.Sprintf("%p", o.traceFinishEventProcessor) + separator))
h.Write([]byte(fmt.Sprintf("%p", o.traceTagTruncateConf) + separator))
h.Write([]byte(fmt.Sprintf("%p", o.traceQueueConf) + separator))
return hex.EncodeToString(h.Sum(nil))
}

Expand Down Expand Up @@ -133,10 +142,28 @@ func NewClient(opts ...Option) (Client, error) {
Timeout: options.timeout,
UploadTimeout: options.uploadTimeout,
})
traceFinishEventProcessor := trace.DefaultFinishEventProcessor
if options.traceFinishEventProcessor != nil {
traceFinishEventProcessor = func(ctx context.Context, info *consts.FinishEventInfo) {
trace.DefaultFinishEventProcessor(ctx, info)
options.traceFinishEventProcessor(ctx, (*FinishEventInfo)(info))
}
}
var spanUploadPath string
var fileUploadPath string
if options.apiBasePath != nil {
spanUploadPath = options.apiBasePath.TraceSpanUploadPath
fileUploadPath = options.apiBasePath.TraceFileUploadPath
}
c.traceProvider = trace.NewTraceProvider(httpClient, trace.Options{
WorkspaceID: options.workspaceID,
UltraLargeReport: options.ultraLargeReport,
Exporter: options.exporter,
WorkspaceID: options.workspaceID,
UltraLargeReport: options.ultraLargeReport,
Exporter: options.exporter,
FinishEventProcessor: traceFinishEventProcessor,
TagTruncateConf: (*trace.TagTruncateConf)(options.traceTagTruncateConf),
SpanUploadPath: spanUploadPath,
FileUploadPath: fileUploadPath,
QueueConf: (*trace.QueueConf)(options.traceQueueConf),
})
c.promptProvider = prompt.NewPromptProvider(httpClient, c.traceProvider, prompt.Options{
WorkspaceID: options.workspaceID,
Expand Down Expand Up @@ -185,6 +212,12 @@ func WithAPIBaseURL(apiBaseURL string) Option {
}
}

func WithAPIBasePath(apiBasePath *APIBasePath) Option {
return func(p *options) {
p.apiBasePath = apiBasePath
}
}

// WithWorkspaceID set workspace id.
func WithWorkspaceID(workspaceID string) Option {
return func(p *options) {
Expand Down Expand Up @@ -241,12 +274,33 @@ func WithPromptTrace(enable bool) Option {
}
}

// WithExporter set custom trace exporter.
func WithExporter(e trace.Exporter) Option {
return func(p *options) {
p.exporter = e
}
}

// WithTraceFinishEventProcessor set custom finish event processor, after span finish.
func WithTraceFinishEventProcessor(f func(ctx context.Context, info *FinishEventInfo)) Option {
return func(p *options) {
p.traceFinishEventProcessor = f
}
}

// WithTraceTagTruncateConf set span tag truncate conf.
func WithTraceTagTruncateConf(conf *TagTruncateConf) Option {
return func(p *options) {
p.traceTagTruncateConf = conf
}
}

func WithTraceQueueConf(conf *TraceQueueConf) Option {
return func(p *options) {
p.traceQueueConf = conf
}
}

// GetWorkspaceID return space id
func GetWorkspaceID() string {
return getDefaultClient().GetWorkspaceID()
Expand Down Expand Up @@ -350,13 +404,27 @@ func buildAuth(opts options) (httpclient.Auth, error) {
return nil, ErrAuthInfoRequired
}

func SetDefaultClient(client Client) {
defaultClientLock.Lock()
defer defaultClientLock.Unlock()
defaultClient = client
}

func getDefaultClient() Client {
if defaultClient != nil {
return defaultClient
}
once.Do(func() {
var err error
defaultClient, err = NewClient()
client, err := NewClient()
if err != nil {
defaultClientLock.Lock()
defaultClient = &NoopClient{newClientError: err}
defaultClientLock.Unlock()
} else {
defaultClientLock.Lock()
defaultClient = client
defaultClientLock.Unlock()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
Expand All @@ -366,7 +434,9 @@ func getDefaultClient() Client {

logger.CtxInfof(ctx, "Received signal: %v, starting graceful shutdown...", sig)
defaultClient.Close(ctx)
defaultClientLock.Lock()
defaultClient = &NoopClient{newClientError: consts.ErrClientClosed}
defaultClientLock.Unlock()
logger.CtxInfof(ctx, "Graceful shutdown finished.")
os.Exit(0)
}()
Expand All @@ -376,9 +446,10 @@ func getDefaultClient() Client {
}

var (
defaultClient Client
once sync.Once
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
defaultClient Client
defaultClientLock sync.RWMutex
once sync.Once
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
)

type loopClient struct {
Expand Down Expand Up @@ -426,7 +497,7 @@ func (c *loopClient) PromptFormat(ctx context.Context, loopPrompt *entity.Prompt

func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) {
if c.closed {
return ctx, defaultNoopSpan
return ctx, DefaultNoopSpan
}
config := trace.StartSpanOptions{}
for _, opt := range opts {
Expand All @@ -438,25 +509,25 @@ func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts
ctx, span, err := c.traceProvider.StartSpan(ctx, name, spanType, config)
if err != nil {
logger.CtxWarnf(ctx, "start span failed, return noop span. %v", err)
return ctx, defaultNoopSpan
return ctx, DefaultNoopSpan
}
return ctx, span
}

func (c *loopClient) GetSpanFromContext(ctx context.Context) Span {
if c.closed {
return defaultNoopSpan
return DefaultNoopSpan
}
span := c.traceProvider.GetSpanFromContext(ctx)
if span == nil {
return defaultNoopSpan
return DefaultNoopSpan
}
return span
}

func (c *loopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext {
if c.closed {
return defaultNoopSpan
return DefaultNoopSpan
}
return c.traceProvider.GetSpanFromHeader(ctx, header)
}
Expand Down
22 changes: 22 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package cozeloop

import (
"github.com/coze-dev/cozeloop-go/internal/consts"
"github.com/coze-dev/cozeloop-go/internal/trace"
)

const (
Expand All @@ -19,3 +20,24 @@ const (
// ComBaseURL = consts.ComBaseURL
CnBaseURL = consts.CnBaseURL
)

// SpanFinishEvent finish inner event
type SpanFinishEvent consts.SpanFinishEvent

const (
SpanFinishEventSpanQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventSpanQueueEntryRate)
SpanFinishEventFileQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventFileQueueEntryRate)
SpanFinishEventFlushSpanRate = SpanFinishEvent(consts.SpanFinishEventFlushSpanRate)
SpanFinishEventFlushFileRate = SpanFinishEvent(consts.SpanFinishEventFlushFileRate)
)

type FinishEventInfo consts.FinishEventInfo

type TagTruncateConf trace.TagTruncateConf

type APIBasePath struct {
TraceSpanUploadPath string
TraceFileUploadPath string
}

type TraceQueueConf trace.QueueConf
10 changes: 6 additions & 4 deletions entity/export.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package entity

type UploadSpan struct {
StartedATMicros int64 `json:"started_at_micros"`
StartedATMicros int64 `json:"started_at_micros"` // start time in microseconds
LogID string `json:"log_id"` // the custom log id, identify different query.
SpanID string `json:"span_id"`
ParentID string `json:"parent_id"`
TraceID string `json:"trace_id"`
DurationMicros int64 `json:"duration_micros"`
WorkspaceID string `json:"workspace_id"`
DurationMicros int64 `json:"duration_micros"` // duration in microseconds
ServiceName string `json:"service_name"` // the custom service name, identify different services.
WorkspaceID string `json:"workspace_id"` // cozeloop workspace id
SpanName string `json:"span_name"`
SpanType string `json:"span_type"`
StatusCode int32 `json:"status_code"`
Input string `json:"input"`
Output string `json:"output"`
ObjectStorage string `json:"object_storage"`
ObjectStorage string `json:"object_storage"` // file object storage
SystemTagsString map[string]string `json:"system_tags_string"`
SystemTagsLong map[string]int64 `json:"system_tags_long"`
SystemTagsDouble map[string]float64 `json:"system_tags_double"`
Expand Down
24 changes: 24 additions & 0 deletions internal/consts/finish_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package consts

type SpanFinishEvent string

const (
SpanFinishEventSpanQueueEntryRate SpanFinishEvent = "queue_manager.span_entry.rate"
SpanFinishEventFileQueueEntryRate SpanFinishEvent = "queue_manager.file_entry.rate"

SpanFinishEventFlushSpanRate SpanFinishEvent = "exporter.span_flush.rate"
SpanFinishEventFlushFileRate SpanFinishEvent = "exporter.file_flush.rate"
)

type FinishEventInfo struct {
EventType SpanFinishEvent
IsEventFail bool
ItemNum int // maybe multiple span is processed in one event
DetailMsg string
ExtraParams *FinishEventInfoExtra
}

type FinishEventInfoExtra struct {
IsRootSpan bool
LatencyMs int64
}
1 change: 1 addition & 0 deletions internal/consts/span_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
ThreadID = "thread_id"
StartTimeFirstResp = "start_time_first_resp"
LatencyFirstResp = "latency_first_resp"
DeploymentEnv = "deployment_env"

CutOff = "cut_off"
)
5 changes: 5 additions & 0 deletions internal/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ func setAuthorizationHeader(ctx context.Context, request *http.Request, auth Aut
}

func parseResponse(ctx context.Context, url string, response *http.Response, resp OpenAPIResponse) error {
if response == nil {
return nil
}
defer response.Body.Close()

logID := response.Header.Get(consts.LogIDHeader)
respBody, err := io.ReadAll(response.Body)
if err != nil {
Expand Down
28 changes: 15 additions & 13 deletions internal/trace/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ const (
var _ Exporter = (*SpanExporter)(nil)

type SpanExporter struct {
client *httpclient.Client
client *httpclient.Client
uploadPath UploadPath
}

type UploadPath struct {
spanUploadPath string
fileUploadPath string
}

func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFile) error {
Expand All @@ -50,14 +56,12 @@ func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFi
}
logger.CtxDebugf(ctx, "uploadFile start, file name: %s", file.Name)
resp := httpclient.BaseResponse{}
err := e.client.UploadFile(ctx, pathUploadFile, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
err := e.client.UploadFile(ctx, e.uploadPath.fileUploadPath, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
if err != nil {
logger.CtxErrorf(ctx, "export files[%s] fail, err:[%v], retry later", file.TosKey, err)
return err
return consts.NewError(fmt.Sprintf("export files[%s] fail", file.TosKey)).Wrap(err)
}
if resp.GetCode() != 0 { // todo: some err code do not need retry
logger.CtxErrorf(ctx, "export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg())
return consts.ErrRemoteService
return consts.NewError(fmt.Sprintf("export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg()))
}
logger.CtxDebugf(ctx, "uploadFile end, file name: %s", file.Name)
}
Expand All @@ -69,17 +73,13 @@ func (e *SpanExporter) ExportSpans(ctx context.Context, ss []*entity.UploadSpan)
if len(ss) == 0 {
return
}
logger.CtxDebugf(ctx, "export spans, spans count: %d", len(ss))

resp := httpclient.BaseResponse{}
err = e.client.Post(ctx, pathIngestTrace, UploadSpanData{ss}, &resp)
err = e.client.Post(ctx, e.uploadPath.spanUploadPath, UploadSpanData{ss}, &resp)
if err != nil {
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], err:[%v]", len(ss), err)
return err
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d]", len(ss))).Wrap(err)
}
if resp.GetCode() != 0 { // todo: some err code do not need retry
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg())
return consts.ErrRemoteService
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg()))
}

return
Expand Down Expand Up @@ -107,10 +107,12 @@ func transferToUploadSpanAndFile(ctx context.Context, spans []*Span) ([]*entity.
systemTagStrM, systemTagLongM, systemTagDoubleM, _ := parseTag(span.SystemTagMap, true)
resSpan = append(resSpan, &entity.UploadSpan{
StartedATMicros: span.GetStartTime().UnixMicro(),
LogID: span.GetLogID(),
SpanID: span.GetSpanID(),
ParentID: span.GetParentID(),
TraceID: span.GetTraceID(),
DurationMicros: span.GetDuration(),
ServiceName: span.GetServiceName(),
WorkspaceID: span.GetSpaceID(),
SpanName: span.GetSpanName(),
SpanType: span.GetSpanType(),
Expand Down
5 changes: 5 additions & 0 deletions internal/trace/noop_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func (n noopSpan) SetInputTokens(ctx context.Context, inputTokens int)
func (n noopSpan) SetOutputTokens(ctx context.Context, outputTokens int) {}
func (n noopSpan) SetStartTimeFirstResp(ctx context.Context, startTimeFirstResp int64) {}
func (n noopSpan) SetRuntime(ctx context.Context, runtime tracespec.Runtime) {}
func (n noopSpan) SetServiceName(ctx context.Context, serviceName string) {}
func (n noopSpan) SetLogID(ctx context.Context, logID string) {}
func (n noopSpan) SetFinishTime(finishTime time.Time) {}
func (n noopSpan) SetSystemTags(ctx context.Context, systemTags map[string]interface{}) {}
func (n noopSpan) SetDeploymentEnv(ctx context.Context, deploymentEnv string) {}

// implement of Span
func (n noopSpan) SetTags(ctx context.Context, tagKVs map[string]interface{}) {}
Expand Down
Loading