From 48fe852a5773e9683d3604bfdff6fb8ff0052913 Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Tue, 29 Jul 2025 10:42:47 +0800 Subject: [PATCH] support FinishEventProcessor and some Span Set Method --- client.go | 97 +++++++++++++++++--- const.go | 22 +++++ entity/export.go | 10 +- internal/consts/finish_event.go | 24 +++++ internal/consts/span_key.go | 1 + internal/httpclient/client.go | 5 + internal/trace/exporter.go | 28 +++--- internal/trace/noop_span.go | 5 + internal/trace/queue_manager.go | 55 +++++++++-- internal/trace/queue_manager_test.go | 2 +- internal/trace/span.go | 105 ++++++++++++++++++++- internal/trace/span_processor.go | 131 +++++++++++++++++++++++---- internal/trace/span_test.go | 2 +- internal/trace/trace.go | 50 ++++++++-- internal/version.go | 2 +- noop.go | 8 +- span.go | 20 ++++ spec/tracespec/runtime.go | 3 + 18 files changed, 495 insertions(+), 75 deletions(-) create mode 100644 internal/consts/finish_event.go diff --git a/client.go b/client.go index 7a2dbf0..b04d448 100644 --- a/client.go +++ b/client.go @@ -35,7 +35,7 @@ type Client interface { // GetWorkspaceID return workspace id GetWorkspaceID() string - // Close Close the client. Should be called before program exit. + // Close close the client. Should be called before program exit. Close(ctx context.Context) } @@ -46,6 +46,7 @@ type HttpClient = httpclient.HTTPClient type options struct { apiBaseURL string + apiBasePath *APIBasePath workspaceID string httpClient HttpClient timeout time.Duration @@ -62,12 +63,16 @@ type options struct { promptCacheRefreshInterval time.Duration promptTrace bool exporter trace.Exporter + traceFinishEventProcessor func(ctx context.Context, info *FinishEventInfo) + traceTagTruncateConf *TagTruncateConf + traceQueueConf *TraceQueueConf } func (o *options) MD5() string { h := md5.New() separator := "\t" h.Write([]byte(o.apiBaseURL + separator)) + h.Write([]byte(fmt.Sprintf("%p", o.apiBasePath) + separator)) h.Write([]byte(o.workspaceID + separator)) h.Write([]byte(fmt.Sprintf("%p", o.httpClient) + separator)) h.Write([]byte(o.timeout.String() + separator)) @@ -80,6 +85,10 @@ func (o *options) MD5() string { h.Write([]byte(fmt.Sprintf("%d", o.promptCacheMaxCount) + separator)) h.Write([]byte(o.promptCacheRefreshInterval.String() + separator)) h.Write([]byte(fmt.Sprintf("%v", o.promptTrace) + separator)) + h.Write([]byte(fmt.Sprintf("%p", o.exporter) + separator)) + h.Write([]byte(fmt.Sprintf("%p", o.traceFinishEventProcessor) + separator)) + h.Write([]byte(fmt.Sprintf("%p", o.traceTagTruncateConf) + separator)) + h.Write([]byte(fmt.Sprintf("%p", o.traceQueueConf) + separator)) return hex.EncodeToString(h.Sum(nil)) } @@ -133,10 +142,28 @@ func NewClient(opts ...Option) (Client, error) { Timeout: options.timeout, UploadTimeout: options.uploadTimeout, }) + traceFinishEventProcessor := trace.DefaultFinishEventProcessor + if options.traceFinishEventProcessor != nil { + traceFinishEventProcessor = func(ctx context.Context, info *consts.FinishEventInfo) { + trace.DefaultFinishEventProcessor(ctx, info) + options.traceFinishEventProcessor(ctx, (*FinishEventInfo)(info)) + } + } + var spanUploadPath string + var fileUploadPath string + if options.apiBasePath != nil { + spanUploadPath = options.apiBasePath.TraceSpanUploadPath + fileUploadPath = options.apiBasePath.TraceFileUploadPath + } c.traceProvider = trace.NewTraceProvider(httpClient, trace.Options{ - WorkspaceID: options.workspaceID, - UltraLargeReport: options.ultraLargeReport, - Exporter: options.exporter, + WorkspaceID: options.workspaceID, + UltraLargeReport: options.ultraLargeReport, + Exporter: options.exporter, + FinishEventProcessor: traceFinishEventProcessor, + TagTruncateConf: (*trace.TagTruncateConf)(options.traceTagTruncateConf), + SpanUploadPath: spanUploadPath, + FileUploadPath: fileUploadPath, + QueueConf: (*trace.QueueConf)(options.traceQueueConf), }) c.promptProvider = prompt.NewPromptProvider(httpClient, c.traceProvider, prompt.Options{ WorkspaceID: options.workspaceID, @@ -185,6 +212,12 @@ func WithAPIBaseURL(apiBaseURL string) Option { } } +func WithAPIBasePath(apiBasePath *APIBasePath) Option { + return func(p *options) { + p.apiBasePath = apiBasePath + } +} + // WithWorkspaceID set workspace id. func WithWorkspaceID(workspaceID string) Option { return func(p *options) { @@ -241,12 +274,33 @@ func WithPromptTrace(enable bool) Option { } } +// WithExporter set custom trace exporter. func WithExporter(e trace.Exporter) Option { return func(p *options) { p.exporter = e } } +// WithTraceFinishEventProcessor set custom finish event processor, after span finish. +func WithTraceFinishEventProcessor(f func(ctx context.Context, info *FinishEventInfo)) Option { + return func(p *options) { + p.traceFinishEventProcessor = f + } +} + +// WithTraceTagTruncateConf set span tag truncate conf. +func WithTraceTagTruncateConf(conf *TagTruncateConf) Option { + return func(p *options) { + p.traceTagTruncateConf = conf + } +} + +func WithTraceQueueConf(conf *TraceQueueConf) Option { + return func(p *options) { + p.traceQueueConf = conf + } +} + // GetWorkspaceID return space id func GetWorkspaceID() string { return getDefaultClient().GetWorkspaceID() @@ -350,13 +404,27 @@ func buildAuth(opts options) (httpclient.Auth, error) { return nil, ErrAuthInfoRequired } +func SetDefaultClient(client Client) { + defaultClientLock.Lock() + defer defaultClientLock.Unlock() + defaultClient = client +} + func getDefaultClient() Client { + if defaultClient != nil { + return defaultClient + } once.Do(func() { var err error - defaultClient, err = NewClient() + client, err := NewClient() if err != nil { + defaultClientLock.Lock() defaultClient = &NoopClient{newClientError: err} + defaultClientLock.Unlock() } else { + defaultClientLock.Lock() + defaultClient = client + defaultClientLock.Unlock() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { @@ -366,7 +434,9 @@ func getDefaultClient() Client { logger.CtxInfof(ctx, "Received signal: %v, starting graceful shutdown...", sig) defaultClient.Close(ctx) + defaultClientLock.Lock() defaultClient = &NoopClient{newClientError: consts.ErrClientClosed} + defaultClientLock.Unlock() logger.CtxInfof(ctx, "Graceful shutdown finished.") os.Exit(0) }() @@ -376,9 +446,10 @@ func getDefaultClient() Client { } var ( - defaultClient Client - once sync.Once - clientCache sync.Map // client cache to avoid creating multiple clients with the same options + defaultClient Client + defaultClientLock sync.RWMutex + once sync.Once + clientCache sync.Map // client cache to avoid creating multiple clients with the same options ) type loopClient struct { @@ -426,7 +497,7 @@ func (c *loopClient) PromptFormat(ctx context.Context, loopPrompt *entity.Prompt func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) { if c.closed { - return ctx, defaultNoopSpan + return ctx, DefaultNoopSpan } config := trace.StartSpanOptions{} for _, opt := range opts { @@ -438,25 +509,25 @@ func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ctx, span, err := c.traceProvider.StartSpan(ctx, name, spanType, config) if err != nil { logger.CtxWarnf(ctx, "start span failed, return noop span. %v", err) - return ctx, defaultNoopSpan + return ctx, DefaultNoopSpan } return ctx, span } func (c *loopClient) GetSpanFromContext(ctx context.Context) Span { if c.closed { - return defaultNoopSpan + return DefaultNoopSpan } span := c.traceProvider.GetSpanFromContext(ctx) if span == nil { - return defaultNoopSpan + return DefaultNoopSpan } return span } func (c *loopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext { if c.closed { - return defaultNoopSpan + return DefaultNoopSpan } return c.traceProvider.GetSpanFromHeader(ctx, header) } diff --git a/const.go b/const.go index 9b69a49..fe65a07 100644 --- a/const.go +++ b/const.go @@ -5,6 +5,7 @@ package cozeloop import ( "github.com/coze-dev/cozeloop-go/internal/consts" + "github.com/coze-dev/cozeloop-go/internal/trace" ) const ( @@ -19,3 +20,24 @@ const ( // ComBaseURL = consts.ComBaseURL CnBaseURL = consts.CnBaseURL ) + +// SpanFinishEvent finish inner event +type SpanFinishEvent consts.SpanFinishEvent + +const ( + SpanFinishEventSpanQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventSpanQueueEntryRate) + SpanFinishEventFileQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventFileQueueEntryRate) + SpanFinishEventFlushSpanRate = SpanFinishEvent(consts.SpanFinishEventFlushSpanRate) + SpanFinishEventFlushFileRate = SpanFinishEvent(consts.SpanFinishEventFlushFileRate) +) + +type FinishEventInfo consts.FinishEventInfo + +type TagTruncateConf trace.TagTruncateConf + +type APIBasePath struct { + TraceSpanUploadPath string + TraceFileUploadPath string +} + +type TraceQueueConf trace.QueueConf diff --git a/entity/export.go b/entity/export.go index 970569a..e76bc43 100644 --- a/entity/export.go +++ b/entity/export.go @@ -1,18 +1,20 @@ package entity type UploadSpan struct { - StartedATMicros int64 `json:"started_at_micros"` + StartedATMicros int64 `json:"started_at_micros"` // start time in microseconds + LogID string `json:"log_id"` // the custom log id, identify different query. SpanID string `json:"span_id"` ParentID string `json:"parent_id"` TraceID string `json:"trace_id"` - DurationMicros int64 `json:"duration_micros"` - WorkspaceID string `json:"workspace_id"` + DurationMicros int64 `json:"duration_micros"` // duration in microseconds + ServiceName string `json:"service_name"` // the custom service name, identify different services. + WorkspaceID string `json:"workspace_id"` // cozeloop workspace id SpanName string `json:"span_name"` SpanType string `json:"span_type"` StatusCode int32 `json:"status_code"` Input string `json:"input"` Output string `json:"output"` - ObjectStorage string `json:"object_storage"` + ObjectStorage string `json:"object_storage"` // file object storage SystemTagsString map[string]string `json:"system_tags_string"` SystemTagsLong map[string]int64 `json:"system_tags_long"` SystemTagsDouble map[string]float64 `json:"system_tags_double"` diff --git a/internal/consts/finish_event.go b/internal/consts/finish_event.go new file mode 100644 index 0000000..cee7930 --- /dev/null +++ b/internal/consts/finish_event.go @@ -0,0 +1,24 @@ +package consts + +type SpanFinishEvent string + +const ( + SpanFinishEventSpanQueueEntryRate SpanFinishEvent = "queue_manager.span_entry.rate" + SpanFinishEventFileQueueEntryRate SpanFinishEvent = "queue_manager.file_entry.rate" + + SpanFinishEventFlushSpanRate SpanFinishEvent = "exporter.span_flush.rate" + SpanFinishEventFlushFileRate SpanFinishEvent = "exporter.file_flush.rate" +) + +type FinishEventInfo struct { + EventType SpanFinishEvent + IsEventFail bool + ItemNum int // maybe multiple span is processed in one event + DetailMsg string + ExtraParams *FinishEventInfoExtra +} + +type FinishEventInfoExtra struct { + IsRootSpan bool + LatencyMs int64 +} diff --git a/internal/consts/span_key.go b/internal/consts/span_key.go index a1fd956..be0e9a1 100644 --- a/internal/consts/span_key.go +++ b/internal/consts/span_key.go @@ -10,6 +10,7 @@ const ( ThreadID = "thread_id" StartTimeFirstResp = "start_time_first_resp" LatencyFirstResp = "latency_first_resp" + DeploymentEnv = "deployment_env" CutOff = "cut_off" ) diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 0bf2728..d005611 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -203,6 +203,11 @@ func setAuthorizationHeader(ctx context.Context, request *http.Request, auth Aut } func parseResponse(ctx context.Context, url string, response *http.Response, resp OpenAPIResponse) error { + if response == nil { + return nil + } + defer response.Body.Close() + logID := response.Header.Get(consts.LogIDHeader) respBody, err := io.ReadAll(response.Body) if err != nil { diff --git a/internal/trace/exporter.go b/internal/trace/exporter.go index 317eac3..a8bb9a5 100644 --- a/internal/trace/exporter.go +++ b/internal/trace/exporter.go @@ -39,7 +39,13 @@ const ( var _ Exporter = (*SpanExporter)(nil) type SpanExporter struct { - client *httpclient.Client + client *httpclient.Client + uploadPath UploadPath +} + +type UploadPath struct { + spanUploadPath string + fileUploadPath string } func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFile) error { @@ -50,14 +56,12 @@ func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFi } logger.CtxDebugf(ctx, "uploadFile start, file name: %s", file.Name) resp := httpclient.BaseResponse{} - err := e.client.UploadFile(ctx, pathUploadFile, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp) + err := e.client.UploadFile(ctx, e.uploadPath.fileUploadPath, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp) if err != nil { - logger.CtxErrorf(ctx, "export files[%s] fail, err:[%v], retry later", file.TosKey, err) - return err + return consts.NewError(fmt.Sprintf("export files[%s] fail", file.TosKey)).Wrap(err) } if resp.GetCode() != 0 { // todo: some err code do not need retry - logger.CtxErrorf(ctx, "export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg()) - return consts.ErrRemoteService + return consts.NewError(fmt.Sprintf("export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg())) } logger.CtxDebugf(ctx, "uploadFile end, file name: %s", file.Name) } @@ -69,17 +73,13 @@ func (e *SpanExporter) ExportSpans(ctx context.Context, ss []*entity.UploadSpan) if len(ss) == 0 { return } - logger.CtxDebugf(ctx, "export spans, spans count: %d", len(ss)) - resp := httpclient.BaseResponse{} - err = e.client.Post(ctx, pathIngestTrace, UploadSpanData{ss}, &resp) + err = e.client.Post(ctx, e.uploadPath.spanUploadPath, UploadSpanData{ss}, &resp) if err != nil { - logger.CtxErrorf(ctx, "export spans fail, span count: [%d], err:[%v]", len(ss), err) - return err + return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d]", len(ss))).Wrap(err) } if resp.GetCode() != 0 { // todo: some err code do not need retry - logger.CtxErrorf(ctx, "export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg()) - return consts.ErrRemoteService + return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg())) } return @@ -107,10 +107,12 @@ func transferToUploadSpanAndFile(ctx context.Context, spans []*Span) ([]*entity. systemTagStrM, systemTagLongM, systemTagDoubleM, _ := parseTag(span.SystemTagMap, true) resSpan = append(resSpan, &entity.UploadSpan{ StartedATMicros: span.GetStartTime().UnixMicro(), + LogID: span.GetLogID(), SpanID: span.GetSpanID(), ParentID: span.GetParentID(), TraceID: span.GetTraceID(), DurationMicros: span.GetDuration(), + ServiceName: span.GetServiceName(), WorkspaceID: span.GetSpaceID(), SpanName: span.GetSpanName(), SpanType: span.GetSpanType(), diff --git a/internal/trace/noop_span.go b/internal/trace/noop_span.go index b13bb3a..3b0ff73 100644 --- a/internal/trace/noop_span.go +++ b/internal/trace/noop_span.go @@ -34,6 +34,11 @@ func (n noopSpan) SetInputTokens(ctx context.Context, inputTokens int) func (n noopSpan) SetOutputTokens(ctx context.Context, outputTokens int) {} func (n noopSpan) SetStartTimeFirstResp(ctx context.Context, startTimeFirstResp int64) {} func (n noopSpan) SetRuntime(ctx context.Context, runtime tracespec.Runtime) {} +func (n noopSpan) SetServiceName(ctx context.Context, serviceName string) {} +func (n noopSpan) SetLogID(ctx context.Context, logID string) {} +func (n noopSpan) SetFinishTime(finishTime time.Time) {} +func (n noopSpan) SetSystemTags(ctx context.Context, systemTags map[string]interface{}) {} +func (n noopSpan) SetDeploymentEnv(ctx context.Context, deploymentEnv string) {} // implement of Span func (n noopSpan) SetTags(ctx context.Context, tagKVs map[string]interface{}) {} diff --git a/internal/trace/queue_manager.go b/internal/trace/queue_manager.go index 6aecde7..9247059 100644 --- a/internal/trace/queue_manager.go +++ b/internal/trace/queue_manager.go @@ -13,14 +13,23 @@ package trace import ( "context" + "fmt" "sync" "sync/atomic" "time" + "github.com/coze-dev/cozeloop-go/internal/consts" "github.com/coze-dev/cozeloop-go/internal/logger" "github.com/coze-dev/cozeloop-go/internal/util" ) +const ( + queueNameSpan = "span" + queueNameSpanRetry = "span_retry" + queueNameFile = "file" + queueNameFileRetry = "file_retry" +) + type exportFunc func(ctx context.Context, s []interface{}) // QueueManager is a queue that batches spans and exports them @@ -37,7 +46,8 @@ type batchQueueManagerOptions struct { maxExportBatchLength int maxExportBatchByteSize int - exportFunc exportFunc + exportFunc exportFunc + finishEventProcessor func(ctx context.Context, info *consts.FinishEventInfo) } func newBatchQueueManager(o batchQueueManagerOptions) *BatchQueueManager { @@ -47,6 +57,7 @@ func newBatchQueueManager(o batchQueueManagerOptions) *BatchQueueManager { dropped: 0, batch: make([]interface{}, 0, o.maxExportBatchLength), batchMutex: sync.Mutex{}, + sizeMutex: sync.RWMutex{}, timer: time.NewTimer(o.batchTimeout), exportFunc: o.exportFunc, stopWait: sync.WaitGroup{}, @@ -75,6 +86,7 @@ type BatchQueueManager struct { batch []interface{} batchByteSize int64 batchMutex sync.Mutex + sizeMutex sync.RWMutex timer *time.Timer exportFunc func(ctx context.Context, s []interface{}) @@ -127,6 +139,9 @@ func (b *BatchQueueManager) isShouldExport() bool { if len(b.batch) >= b.o.maxExportBatchLength { return true } + + b.sizeMutex.RLock() + defer b.sizeMutex.RUnlock() if b.batchByteSize >= int64(b.o.maxExportBatchByteSize) { return true } @@ -172,7 +187,9 @@ func (b *BatchQueueManager) doExport(ctx context.Context) { } // delete the batch b.batch = b.batch[:0] + b.sizeMutex.Lock() b.batchByteSize = 0 + b.sizeMutex.Unlock() } } @@ -181,18 +198,42 @@ func (b *BatchQueueManager) Enqueue(ctx context.Context, sd interface{}, byteSiz if atomic.LoadInt32(&b.stopped) != 0 { return } - + var extraParams *consts.FinishEventInfoExtra + var eventType = consts.SpanFinishEventFileQueueEntryRate + var detailMsg string + var isFail bool select { case b.queue <- sd: - b.batchMutex.Lock() + b.sizeMutex.Lock() b.batchByteSize += byteSize - b.batchMutex.Unlock() - //logger.CtxDebugf(ctx, "%s queue length: %d", b.o.queueName, len(b.queue)) - return + b.sizeMutex.Unlock() + detailMsg = fmt.Sprintf("%s enqueue, queue length: %d", b.o.queueName, len(b.queue)) default: // queue is full, not block, drop - logger.CtxErrorf(ctx, "%s queue is full, dropped item", b.o.queueName) + detailMsg = fmt.Sprintf("%s queue is full, dropped item", b.o.queueName) + isFail = true atomic.AddUint32(&b.dropped, 1) } + + switch b.o.queueName { + case queueNameSpan, queueNameSpanRetry: + eventType = consts.SpanFinishEventSpanQueueEntryRate + span, ok := sd.(*Span) + if ok { + extraParams = &consts.FinishEventInfoExtra{ + IsRootSpan: span.IsRootSpan(), + } + } + default: + } + if b.o.finishEventProcessor != nil { + b.o.finishEventProcessor(ctx, &consts.FinishEventInfo{ + EventType: eventType, + IsEventFail: isFail, + ItemNum: 1, + DetailMsg: detailMsg, + ExtraParams: extraParams, + }) + } return } diff --git a/internal/trace/queue_manager_test.go b/internal/trace/queue_manager_test.go index 91053cf..7fff147 100644 --- a/internal/trace/queue_manager_test.go +++ b/internal/trace/queue_manager_test.go @@ -15,7 +15,7 @@ import ( func Test_GetBatchSpanProcessor(t *testing.T) { ctx := context.Background() httpClient := &httpclient.Client{} - spanQM := NewBatchSpanProcessor(httpClient) + spanQM := NewBatchSpanProcessor(nil, httpClient, nil, nil) PatchConvey("Test GetBatchSpanProcessor", t, func() { PatchConvey("Test with valid inputs", func() { diff --git a/internal/trace/span.go b/internal/trace/span.go index b71eea9..162e8d1 100644 --- a/internal/trace/span.go +++ b/internal/trace/span.go @@ -55,9 +55,12 @@ type Span struct { Name string // These params can be changed, but remember locking when changed + ServiceName string + LogID string WorkspaceID string ParentSpanID string StartTime time.Time + FinishTime time.Time Duration time.Duration TagMap map[string]interface{} // custom tags SystemTagMap map[string]interface{} // system tags @@ -71,7 +74,13 @@ type Span struct { flags byte // for W3C, useless now isFinished int32 // avoid executing finish repeatedly. lock sync.RWMutex - bytesSize int64 // bytes size of span, note: it is an estimated value, may not be accurate. + bytesSize int64 // bytes size of span, note: it is an estimated value, may not be accurate. + tagTruncateConf *TagTruncateConf // tag truncate byte conf +} + +type TagTruncateConf struct { + NormalFieldMaxByte int + InputOutputFieldMaxByte int } func (s *Span) setCutOffTag(cutOffKeys []string) { @@ -639,7 +648,7 @@ func (s *Span) GetRectifiedMap(ctx context.Context, inputMap map[string]interfac value = valueStr } // Truncate the value if a single tag's value is too large - tagValueLengthLimit := util.GetTagValueSizeLimit(key) + tagValueLengthLimit := s.getTagValueSizeLimit(key) isUltraLargeReport := false v, isTruncate := util.TruncateStringByByte(valueStr, tagValueLengthLimit) if isTruncate { @@ -672,6 +681,23 @@ func (s *Span) GetRectifiedMap(ctx context.Context, inputMap map[string]interfac return validateMap, cutOffKeys, bytesSize } +func (s *Span) getTagValueSizeLimit(tagKey string) int { + limit := util.GetTagValueSizeLimit(tagKey) + + switch tagKey { + case tracespec.Input, tracespec.Output: + if s.tagTruncateConf != nil && s.tagTruncateConf.InputOutputFieldMaxByte > 0 { + limit = s.tagTruncateConf.InputOutputFieldMaxByte + } + default: + if s.tagTruncateConf != nil && s.tagTruncateConf.NormalFieldMaxByte > 0 { + limit = s.tagTruncateConf.NormalFieldMaxByte + } + } + + return limit +} + func isTagValidDataType(key string, value interface{}) bool { types, ok := consts.ReserveFieldTypes[key] if !ok { @@ -828,8 +854,12 @@ func (s *Span) setStatInfo(ctx context.Context) { s.SetTags(ctx, map[string]interface{}{tracespec.Tokens: util.GetValueOfInt(inputTokens) + util.GetValueOfInt(outputTokens)}) } - // Duration = now - start_time, unit: microseconds - duration := time.Now().UnixNano()/1000 - s.GetStartTime().UnixNano()/1000 + // Duration = finish_time - start_time, unit: microseconds + finishTime := time.Now() + if !s.GetFinishTime().IsZero() { + finishTime = s.GetFinishTime() + } + duration := finishTime.UnixNano()/1000 - s.GetStartTime().UnixNano()/1000 s.lock.Lock() s.Duration = time.Duration(duration) s.lock.Unlock() @@ -843,6 +873,22 @@ func (s *Span) GetStartTime() time.Time { return s.StartTime } +func (s *Span) GetLogID() string { + if s == nil { + return "" + } + + return s.LogID +} + +func (s *Span) GetServiceName() string { + if s == nil { + return "" + } + + return s.ServiceName +} + func (s *Span) ToHeader() (map[string]string, error) { if s == nil { return nil, nil @@ -885,7 +931,58 @@ func (s *Span) SetRuntime(ctx context.Context, runtime tracespec.Runtime) { if s.SystemTagMap == nil { s.SystemTagMap = make(map[string]interface{}) } + runtime.Scene = tracespec.VSceneCustom s.lock.Lock() s.SystemTagMap[tracespec.Runtime_] = runtime s.lock.Unlock() } + +func (s *Span) SetServiceName(ctx context.Context, serviceName string) { + s.lock.Lock() + defer s.lock.Unlock() + s.ServiceName = serviceName +} + +func (s *Span) SetLogID(ctx context.Context, logID string) { + s.lock.Lock() + defer s.lock.Unlock() + s.LogID = logID +} + +func (s *Span) IsRootSpan() bool { + return s.ParentSpanID == "" || s.ParentSpanID == "0" +} + +// SetFinishTime +// Default is time.Now() when span Finish(). DO NOT set unless you do not use default time. +func (s *Span) SetFinishTime(finishTime time.Time) { + s.lock.Lock() + defer s.lock.Unlock() + s.FinishTime = finishTime +} + +func (s *Span) GetFinishTime() time.Time { + if s == nil { + return time.Time{} + } + + return s.FinishTime +} + +func (s *Span) SetSystemTags(ctx context.Context, systemTags map[string]interface{}) { + if s == nil { + return + } + s.lock.Lock() + defer s.lock.Unlock() + for key, value := range systemTags { + s.SystemTagMap[key] = value + } +} + +func (s *Span) SetDeploymentEnv(ctx context.Context, deploymentEnv string) { + if s == nil { + return + } + s.SetTags(ctx, oneTag(consts.DeploymentEnv, deploymentEnv)) +} diff --git a/internal/trace/span_processor.go b/internal/trace/span_processor.go index e5e6780..57a6cab 100644 --- a/internal/trace/span_processor.go +++ b/internal/trace/span_processor.go @@ -14,17 +14,19 @@ package trace import ( "context" + "fmt" "sync/atomic" "time" "github.com/coze-dev/cozeloop-go/entity" + "github.com/coze-dev/cozeloop-go/internal/consts" "github.com/coze-dev/cozeloop-go/internal/httpclient" - "github.com/coze-dev/cozeloop-go/internal/logger" ) // Defaults for batchQueueManagerOptions. const ( - DefaultMaxQueueLength = 2048 + DefaultMaxQueueLength = 1024 + DefaultMaxRetryQueueLength = 512 DefaultMaxExportBatchLength = 100 DefaultMaxExportBatchByteSize = 4 * 1024 * 1024 // 4MB MaxRetryExportBatchLength = 50 @@ -36,6 +38,11 @@ const ( FileScheduleDelay = 5000 // millisecond ) +type QueueConf struct { + SpanQueueLength int + SpanMaxExportBatchLength int +} + var _ SpanProcessor = (*BatchSpanProcessor)(nil) type SpanProcessor interface { @@ -44,50 +51,86 @@ type SpanProcessor interface { ForceFlush(ctx context.Context) error } -func NewBatchSpanProcessor(ex Exporter, client *httpclient.Client) SpanProcessor { +func NewBatchSpanProcessor( + ex Exporter, + client *httpclient.Client, + uploadPath *UploadPath, + finishEventProcessor func(ctx context.Context, info *consts.FinishEventInfo), + queueConf *QueueConf, +) SpanProcessor { var exporter Exporter - exporter = &SpanExporter{client: client} + spanPath := pathIngestTrace + filePath := pathUploadFile + if uploadPath != nil { + if uploadPath.spanUploadPath != "" { + spanPath = uploadPath.spanUploadPath + } + if uploadPath.fileUploadPath != "" { + filePath = uploadPath.fileUploadPath + } + } + exporter = &SpanExporter{ + client: client, + uploadPath: UploadPath{ + spanUploadPath: spanPath, + fileUploadPath: filePath, + }, + } if ex != nil { exporter = ex } + var spanQueueLength = DefaultMaxQueueLength + var spanMaxExportBatchLength = DefaultMaxExportBatchLength + if queueConf != nil { + if queueConf.SpanQueueLength > 0 { + spanQueueLength = queueConf.SpanQueueLength + } + if queueConf.SpanMaxExportBatchLength > 0 { // todo: need max limit + spanMaxExportBatchLength = queueConf.SpanMaxExportBatchLength + } + } fileRetryQM := newBatchQueueManager( batchQueueManagerOptions{ - queueName: "file retry", + queueName: queueNameFileRetry, batchTimeout: time.Duration(FileScheduleDelay) * time.Millisecond, maxQueueLength: MaxFileQueueLength, maxExportBatchLength: MaxFileExportBatchLength, maxExportBatchByteSize: MaxFileExportBatchByteSize, - exportFunc: newExportFilesFunc(exporter, nil), + exportFunc: newExportFilesFunc(exporter, nil, finishEventProcessor), + finishEventProcessor: finishEventProcessor, }) fileQM := newBatchQueueManager( batchQueueManagerOptions{ - queueName: "file", + queueName: queueNameFile, batchTimeout: time.Duration(FileScheduleDelay) * time.Millisecond, maxQueueLength: MaxFileQueueLength, maxExportBatchLength: MaxFileExportBatchLength, maxExportBatchByteSize: MaxFileExportBatchByteSize, - exportFunc: newExportFilesFunc(exporter, fileRetryQM), + exportFunc: newExportFilesFunc(exporter, fileRetryQM, finishEventProcessor), + finishEventProcessor: finishEventProcessor, }) spanRetryQM := newBatchQueueManager( batchQueueManagerOptions{ - queueName: "span retry", + queueName: queueNameSpanRetry, batchTimeout: time.Duration(DefaultScheduleDelay) * time.Millisecond, - maxQueueLength: DefaultMaxQueueLength, + maxQueueLength: DefaultMaxRetryQueueLength, maxExportBatchLength: MaxRetryExportBatchLength, maxExportBatchByteSize: DefaultMaxExportBatchByteSize, - exportFunc: newExportSpansFunc(exporter, nil, fileQM), + exportFunc: newExportSpansFunc(exporter, nil, fileQM, finishEventProcessor), + finishEventProcessor: finishEventProcessor, }) spanQM := newBatchQueueManager( batchQueueManagerOptions{ - queueName: "span", + queueName: queueNameSpan, batchTimeout: time.Duration(DefaultScheduleDelay) * time.Millisecond, - maxQueueLength: DefaultMaxQueueLength, - maxExportBatchLength: DefaultMaxExportBatchLength, + maxQueueLength: spanQueueLength, + maxExportBatchLength: spanMaxExportBatchLength, maxExportBatchByteSize: DefaultMaxExportBatchByteSize, - exportFunc: newExportSpansFunc(exporter, spanRetryQM, fileQM), + exportFunc: newExportSpansFunc(exporter, spanRetryQM, fileQM, finishEventProcessor), + finishEventProcessor: finishEventProcessor, }) return &BatchSpanProcessor{ @@ -153,7 +196,12 @@ func (b *BatchSpanProcessor) ForceFlush(ctx context.Context) error { return nil } -func newExportSpansFunc(exporter Exporter, spanRetryQueue QueueManager, fileQueue QueueManager) exportFunc { +func newExportSpansFunc( + exporter Exporter, + spanRetryQueue QueueManager, + fileQueue QueueManager, + finishEventProcessor func(ctx context.Context, info *consts.FinishEventInfo), +) exportFunc { return func(ctx context.Context, l []interface{}) { spans := make([]*Span, 0, len(l)) for _, s := range l { @@ -161,13 +209,22 @@ func newExportSpansFunc(exporter Exporter, spanRetryQueue QueueManager, fileQueu spans = append(spans, span) } } + var errMsg string + var isFail bool uploadSpans, uploadFiles := transferToUploadSpanAndFile(ctx, spans) - if err := exporter.ExportSpans(ctx, uploadSpans); err != nil { // fail, send to retry queue. + before := time.Now() + err := exporter.ExportSpans(ctx, uploadSpans) + tsMs := time.Now().Sub(before).Milliseconds() + if err != nil { // fail, send to retry queue. if spanRetryQueue != nil { for _, span := range spans { spanRetryQueue.Enqueue(ctx, span, span.bytesSize) } + errMsg = fmt.Sprintf("%v, retry later", err.Error()) + } else { + errMsg = fmt.Sprintf("%v, retry second time failed", err.Error()) } + isFail = true } else { // success, send to file queue. for _, file := range uploadFiles { if file == nil { @@ -178,10 +235,25 @@ func newExportSpansFunc(exporter Exporter, spanRetryQueue QueueManager, fileQueu } } } + if finishEventProcessor != nil { + finishEventProcessor(ctx, &consts.FinishEventInfo{ + EventType: consts.SpanFinishEventFlushSpanRate, + IsEventFail: isFail, + ItemNum: len(uploadSpans), + DetailMsg: errMsg, + ExtraParams: &consts.FinishEventInfoExtra{ + LatencyMs: tsMs, + }, + }) + } } } -func newExportFilesFunc(exporter Exporter, fileRetryQueue QueueManager) exportFunc { +func newExportFilesFunc( + exporter Exporter, + fileRetryQueue QueueManager, + finishEventProcessor func(ctx context.Context, info *consts.FinishEventInfo), +) exportFunc { return func(ctx context.Context, l []interface{}) { files := make([]*entity.UploadFile, 0, len(l)) for _, f := range l { @@ -189,13 +261,32 @@ func newExportFilesFunc(exporter Exporter, fileRetryQueue QueueManager) exportFu files = append(files, file) } } - if err := exporter.ExportFiles(ctx, files); err != nil { - logger.CtxWarnf(ctx, "exporter export failed, err: %v", err) + var errMsg string + var isFail bool + before := time.Now() + err := exporter.ExportFiles(ctx, files) + tsMs := time.Now().Sub(before).Milliseconds() + if err != nil { if fileRetryQueue != nil { for _, bat := range files { fileRetryQueue.Enqueue(ctx, bat, int64(len(bat.Data))) } + errMsg = fmt.Sprintf("%v, retry later", err.Error()) + } else { + errMsg = fmt.Sprintf("%v, retry second time failed", err.Error()) } + isFail = true + } + if finishEventProcessor != nil { + finishEventProcessor(ctx, &consts.FinishEventInfo{ + EventType: consts.SpanFinishEventFlushFileRate, + IsEventFail: isFail, + ItemNum: len(files), + DetailMsg: errMsg, + ExtraParams: &consts.FinishEventInfoExtra{ + LatencyMs: tsMs, + }, + }) } } } diff --git a/internal/trace/span_test.go b/internal/trace/span_test.go index 8c59c32..0a59834 100644 --- a/internal/trace/span_test.go +++ b/internal/trace/span_test.go @@ -129,7 +129,7 @@ func Test_Finish(t *testing.T) { httpClient := httpclient.NewClient("", nil, nil, nil) s := &Span{ isFinished: 0, - spanProcessor: NewBatchSpanProcessor(httpClient), + spanProcessor: NewBatchSpanProcessor(nil, httpClient, nil, nil), lock: sync.RWMutex{}, TagMap: make(map[string]interface{}), } diff --git a/internal/trace/trace.go b/internal/trace/trace.go index 8f95c09..6d782c7 100644 --- a/internal/trace/trace.go +++ b/internal/trace/trace.go @@ -22,9 +22,14 @@ type Provider struct { } type Options struct { - WorkspaceID string - UltraLargeReport bool - Exporter Exporter + WorkspaceID string + UltraLargeReport bool + Exporter Exporter + FinishEventProcessor func(ctx context.Context, info *consts.FinishEventInfo) + TagTruncateConf *TagTruncateConf + SpanUploadPath string + FileUploadPath string + QueueConf *QueueConf } type StartSpanOptions struct { @@ -40,10 +45,23 @@ type StartSpanOptions struct { type loopSpanKey struct{} func NewTraceProvider(httpClient *httpclient.Client, options Options) *Provider { + var uploadPath *UploadPath + if options.SpanUploadPath != "" || options.FileUploadPath != "" { + uploadPath = &UploadPath{ + spanUploadPath: options.SpanUploadPath, + fileUploadPath: options.FileUploadPath, + } + } c := &Provider{ - httpClient: httpClient, - opt: &options, - spanProcessor: NewBatchSpanProcessor(options.Exporter, httpClient), + httpClient: httpClient, + opt: &options, + spanProcessor: NewBatchSpanProcessor( + options.Exporter, + httpClient, + uploadPath, + options.FinishEventProcessor, + options.QueueConf, + ), } return c } @@ -54,6 +72,12 @@ func (t *Provider) GetOpts() *Options { func (t *Provider) StartSpan(ctx context.Context, name, spanType string, opts StartSpanOptions) (context.Context, *Span, error) { // 0. check param + if name == "" { + name = "unknown" + } + if spanType == "" { + spanType = "unknown" + } if len(name) > consts.MaxBytesOfOneTagValueDefault { logger.CtxWarnf(ctx, "Name is too long, will be truncated to %d bytes, original name: %s", consts.MaxBytesOfOneTagValueDefault, name) name = name[:consts.MaxBytesOfOneTagValueDefault] @@ -152,10 +176,11 @@ func (t *Provider) startSpan(ctx context.Context, spanName string, spanType stri ultraLargeReport: t.opt.UltraLargeReport, multiModalityKeyMap: make(map[string]struct{}), spanProcessor: t.spanProcessor, - flags: 0, + flags: 1, // for W3C, sampled by default isFinished: 0, lock: sync.RWMutex{}, bytesSize: 0, // The initial value is 0. Default fields do not count towards the size. + tagTruncateConf: t.opt.TagTruncateConf, } // 3. set Baggage from parent span @@ -171,3 +196,14 @@ func (t *Provider) Flush(ctx context.Context) { func (t *Provider) CloseTrace(ctx context.Context) { _ = t.spanProcessor.Shutdown(ctx) } + +func DefaultFinishEventProcessor(ctx context.Context, info *consts.FinishEventInfo) { + if info == nil { + return + } + if info.IsEventFail { + logger.CtxErrorf(ctx, "finish_event[%s] fail, msg: %s", info.EventType, info.DetailMsg) + } else { + logger.CtxDebugf(ctx, "finish_event[%s] success, item_num: %d, msg: %s", info.EventType, info.ItemNum, info.DetailMsg) + } +} diff --git a/internal/version.go b/internal/version.go index 7a3fbb1..1c3d024 100644 --- a/internal/version.go +++ b/internal/version.go @@ -5,5 +5,5 @@ package internal // Version returns the version of the loop package. func Version() string { - return "v0.1.8" + return "v0.1.9" } diff --git a/noop.go b/noop.go index 8b18911..dbee28b 100644 --- a/noop.go +++ b/noop.go @@ -11,7 +11,7 @@ import ( "github.com/coze-dev/cozeloop-go/internal/trace" ) -var defaultNoopSpan = trace.DefaultNoopSpan +var DefaultNoopSpan = trace.DefaultNoopSpan // NoopClient a noop client type NoopClient struct { @@ -39,17 +39,17 @@ func (c *NoopClient) PromptFormat(ctx context.Context, prompt *entity.Prompt, va func (c *NoopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) { logger.CtxWarnf(context.Background(), "Noop client not supported. %v", c.newClientError) - return ctx, defaultNoopSpan + return ctx, DefaultNoopSpan } func (c *NoopClient) GetSpanFromContext(ctx context.Context) Span { logger.CtxWarnf(context.Background(), "Noop client not supported. %v", c.newClientError) - return defaultNoopSpan + return DefaultNoopSpan } func (c *NoopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext { logger.CtxWarnf(context.Background(), "Noop client not supported. %v", c.newClientError) - return defaultNoopSpan + return DefaultNoopSpan } func (c *NoopClient) Flush(ctx context.Context) { diff --git a/span.go b/span.go index 4b35ac2..34ee338 100644 --- a/span.go +++ b/span.go @@ -110,6 +110,26 @@ type commonSpanSetter interface { // The runtime of the LLM, such as language, library, scene, etc. // The recommended standard format is Runtime of spec package SetRuntime(ctx context.Context, runtime tracespec.Runtime) + + // SetServiceName + // set the custom service name, identify different services. + SetServiceName(ctx context.Context, serviceName string) + + // SetLogID + // set the custom log id, identify different query. + SetLogID(ctx context.Context, logID string) + + // SetFinishTime + // Default is time.Now() when span Finish(). DO NOT set unless you do not use default time. + SetFinishTime(finishTime time.Time) + + // SetSystemTags + // set the system tags. DO NOT set unless you know what you are doing. + SetSystemTags(ctx context.Context, systemTags map[string]interface{}) + + // SetDeploymentEnv + // set the deployment env, identify custom env. + SetDeploymentEnv(ctx context.Context, deploymentEnv string) } // SpanContext is the interface for span Baggage transfer. diff --git a/spec/tracespec/runtime.go b/spec/tracespec/runtime.go index 14d2c76..2df3ec8 100644 --- a/spec/tracespec/runtime.go +++ b/spec/tracespec/runtime.go @@ -12,4 +12,7 @@ type Runtime struct { // Dependency Versions. LibraryVersion string `json:"library_version,omitempty"` LoopSDKVersion string `json:"loop_sdk_version,omitempty"` + + // Extra info. + Extra map[string]interface{} `json:"extra,omitempty"` }