Skip to content

Commit a569214

Browse files
committed
support FinishEventProcessor and some Span Set Method
1 parent 374ad7d commit a569214

File tree

14 files changed

+332
-58
lines changed

14 files changed

+332
-58
lines changed

client.go

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Client interface {
3535

3636
// GetWorkspaceID return workspace id
3737
GetWorkspaceID() string
38-
// Close Close the client. Should be called before program exit.
38+
// Close close the client. Should be called before program exit.
3939
Close(ctx context.Context)
4040
}
4141

@@ -62,6 +62,8 @@ type options struct {
6262
promptCacheRefreshInterval time.Duration
6363
promptTrace bool
6464
exporter trace.Exporter
65+
traceFinishEventProcessor func(ctx context.Context, info *FinishEventInfo)
66+
traceTagTruncateConf *TagTruncateConf
6567
}
6668

6769
func (o *options) MD5() string {
@@ -133,10 +135,19 @@ func NewClient(opts ...Option) (Client, error) {
133135
Timeout: options.timeout,
134136
UploadTimeout: options.uploadTimeout,
135137
})
138+
traceFinishEventProcessor := trace.DefaultFinishEventProcessor
139+
if options.traceFinishEventProcessor != nil {
140+
traceFinishEventProcessor = func(ctx context.Context, info *consts.FinishEventInfo) {
141+
trace.DefaultFinishEventProcessor(ctx, info)
142+
options.traceFinishEventProcessor(ctx, (*FinishEventInfo)(info))
143+
}
144+
}
136145
c.traceProvider = trace.NewTraceProvider(httpClient, trace.Options{
137-
WorkspaceID: options.workspaceID,
138-
UltraLargeReport: options.ultraLargeReport,
139-
Exporter: options.exporter,
146+
WorkspaceID: options.workspaceID,
147+
UltraLargeReport: options.ultraLargeReport,
148+
Exporter: options.exporter,
149+
TraceFinishEventProcessor: traceFinishEventProcessor,
150+
TraceTagTruncateConf: (*trace.TagTruncateConf)(options.traceTagTruncateConf),
140151
})
141152
c.promptProvider = prompt.NewPromptProvider(httpClient, c.traceProvider, prompt.Options{
142153
WorkspaceID: options.workspaceID,
@@ -247,6 +258,18 @@ func WithExporter(e trace.Exporter) Option {
247258
}
248259
}
249260

261+
func WithTraceFinishEventProcessor(f func(ctx context.Context, info *FinishEventInfo)) Option {
262+
return func(p *options) {
263+
p.traceFinishEventProcessor = f
264+
}
265+
}
266+
267+
func WithTraceTagTruncateConf(conf *TagTruncateConf) Option {
268+
return func(p *options) {
269+
p.traceTagTruncateConf = conf
270+
}
271+
}
272+
250273
// GetWorkspaceID return space id
251274
func GetWorkspaceID() string {
252275
return getDefaultClient().GetWorkspaceID()
@@ -350,13 +373,27 @@ func buildAuth(opts options) (httpclient.Auth, error) {
350373
return nil, ErrAuthInfoRequired
351374
}
352375

376+
func SetDefaultClient(client Client) {
377+
defaultClientLock.Lock()
378+
defer defaultClientLock.Unlock()
379+
defaultClient = client
380+
}
381+
353382
func getDefaultClient() Client {
383+
if defaultClient != nil {
384+
return defaultClient
385+
}
354386
once.Do(func() {
355387
var err error
356-
defaultClient, err = NewClient()
388+
client, err := NewClient()
357389
if err != nil {
390+
defaultClientLock.Lock()
358391
defaultClient = &NoopClient{newClientError: err}
392+
defaultClientLock.Unlock()
359393
} else {
394+
defaultClientLock.Lock()
395+
defaultClient = client
396+
defaultClientLock.Unlock()
360397
sigChan := make(chan os.Signal, 1)
361398
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
362399
go func() {
@@ -366,7 +403,9 @@ func getDefaultClient() Client {
366403

367404
logger.CtxInfof(ctx, "Received signal: %v, starting graceful shutdown...", sig)
368405
defaultClient.Close(ctx)
406+
defaultClientLock.Lock()
369407
defaultClient = &NoopClient{newClientError: consts.ErrClientClosed}
408+
defaultClientLock.Unlock()
370409
logger.CtxInfof(ctx, "Graceful shutdown finished.")
371410
os.Exit(0)
372411
}()
@@ -376,9 +415,10 @@ func getDefaultClient() Client {
376415
}
377416

378417
var (
379-
defaultClient Client
380-
once sync.Once
381-
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
418+
defaultClient Client
419+
defaultClientLock sync.RWMutex
420+
once sync.Once
421+
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
382422
)
383423

384424
type loopClient struct {
@@ -426,7 +466,7 @@ func (c *loopClient) PromptFormat(ctx context.Context, loopPrompt *entity.Prompt
426466

427467
func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) {
428468
if c.closed {
429-
return ctx, defaultNoopSpan
469+
return ctx, DefaultNoopSpan
430470
}
431471
config := trace.StartSpanOptions{}
432472
for _, opt := range opts {
@@ -438,25 +478,25 @@ func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts
438478
ctx, span, err := c.traceProvider.StartSpan(ctx, name, spanType, config)
439479
if err != nil {
440480
logger.CtxWarnf(ctx, "start span failed, return noop span. %v", err)
441-
return ctx, defaultNoopSpan
481+
return ctx, DefaultNoopSpan
442482
}
443483
return ctx, span
444484
}
445485

446486
func (c *loopClient) GetSpanFromContext(ctx context.Context) Span {
447487
if c.closed {
448-
return defaultNoopSpan
488+
return DefaultNoopSpan
449489
}
450490
span := c.traceProvider.GetSpanFromContext(ctx)
451491
if span == nil {
452-
return defaultNoopSpan
492+
return DefaultNoopSpan
453493
}
454494
return span
455495
}
456496

457497
func (c *loopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext {
458498
if c.closed {
459-
return defaultNoopSpan
499+
return DefaultNoopSpan
460500
}
461501
return c.traceProvider.GetSpanFromHeader(ctx, header)
462502
}

const.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package cozeloop
55

66
import (
77
"github.com/coze-dev/cozeloop-go/internal/consts"
8+
"github.com/coze-dev/cozeloop-go/internal/trace"
89
)
910

1011
const (
@@ -19,3 +20,17 @@ const (
1920
// ComBaseURL = consts.ComBaseURL
2021
CnBaseURL = consts.CnBaseURL
2122
)
23+
24+
// SpanFinishEvent finish inner event
25+
type SpanFinishEvent consts.SpanFinishEvent
26+
27+
const (
28+
SpanFinishEventSpanQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventSpanQueueEntryRate)
29+
SpanFinishEventFileQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventFileQueueEntryRate)
30+
SpanFinishEventFlushSpanRate = SpanFinishEvent(consts.SpanFinishEventFlushSpanRate)
31+
SpanFinishEventFlushFileRate = SpanFinishEvent(consts.SpanFinishEventFlushFileRate)
32+
)
33+
34+
type FinishEventInfo consts.FinishEventInfo
35+
36+
type TagTruncateConf trace.TagTruncateConf

entity/export.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package entity
22

33
type UploadSpan struct {
4-
StartedATMicros int64 `json:"started_at_micros"`
4+
StartedATMicros int64 `json:"started_at_micros"` // start time in microseconds
5+
LogID string `json:"log_id"` // the custom log id, identify different query.
56
SpanID string `json:"span_id"`
67
ParentID string `json:"parent_id"`
78
TraceID string `json:"trace_id"`
8-
DurationMicros int64 `json:"duration_micros"`
9-
WorkspaceID string `json:"workspace_id"`
9+
DurationMicros int64 `json:"duration_micros"` // duration in microseconds
10+
ServiceName string `json:"service_name"` // the custom service name, identify different services.
11+
WorkspaceID string `json:"workspace_id"` // cozeloop workspace id
1012
SpanName string `json:"span_name"`
1113
SpanType string `json:"span_type"`
1214
StatusCode int32 `json:"status_code"`
1315
Input string `json:"input"`
1416
Output string `json:"output"`
15-
ObjectStorage string `json:"object_storage"`
17+
ObjectStorage string `json:"object_storage"` // file object storage
1618
SystemTagsString map[string]string `json:"system_tags_string"`
1719
SystemTagsLong map[string]int64 `json:"system_tags_long"`
1820
SystemTagsDouble map[string]float64 `json:"system_tags_double"`

internal/consts/finish_event.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package consts
2+
3+
type SpanFinishEvent string
4+
5+
const (
6+
SpanFinishEventSpanQueueEntryRate SpanFinishEvent = "queue_manager.span_entry.rate"
7+
SpanFinishEventFileQueueEntryRate SpanFinishEvent = "queue_manager.file_entry.rate"
8+
9+
SpanFinishEventFlushSpanRate SpanFinishEvent = "exporter.span_flush.rate"
10+
SpanFinishEventFlushFileRate SpanFinishEvent = "exporter.file_flush.rate"
11+
)
12+
13+
type FinishEventInfo struct {
14+
EventType SpanFinishEvent
15+
IsEventFail bool
16+
ItemNum int // maybe multiple span is processed in one event
17+
DetailMsg string
18+
ExtraParams *FinishEventInfoExtra
19+
}
20+
21+
type FinishEventInfoExtra struct {
22+
IsRootSpan bool
23+
}

internal/httpclient/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ func setAuthorizationHeader(ctx context.Context, request *http.Request, auth Aut
203203
}
204204

205205
func parseResponse(ctx context.Context, url string, response *http.Response, resp OpenAPIResponse) error {
206+
defer response.Body.Close()
207+
206208
logID := response.Header.Get(consts.LogIDHeader)
207209
respBody, err := io.ReadAll(response.Body)
208210
if err != nil {

internal/trace/exporter.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@ func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFi
5252
resp := httpclient.BaseResponse{}
5353
err := e.client.UploadFile(ctx, pathUploadFile, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
5454
if err != nil {
55-
logger.CtxErrorf(ctx, "export files[%s] fail, err:[%v], retry later", file.TosKey, err)
56-
return err
55+
return consts.NewError(fmt.Sprintf("export files[%s] fail", file.TosKey)).Wrap(err)
5756
}
5857
if resp.GetCode() != 0 { // todo: some err code do not need retry
59-
logger.CtxErrorf(ctx, "export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg())
60-
return consts.ErrRemoteService
58+
return consts.NewError(fmt.Sprintf("export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg()))
6159
}
6260
logger.CtxDebugf(ctx, "uploadFile end, file name: %s", file.Name)
6361
}
@@ -74,12 +72,10 @@ func (e *SpanExporter) ExportSpans(ctx context.Context, ss []*entity.UploadSpan)
7472
resp := httpclient.BaseResponse{}
7573
err = e.client.Post(ctx, pathIngestTrace, UploadSpanData{ss}, &resp)
7674
if err != nil {
77-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], err:[%v]", len(ss), err)
78-
return err
75+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d]", len(ss))).Wrap(err)
7976
}
8077
if resp.GetCode() != 0 { // todo: some err code do not need retry
81-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg())
82-
return consts.ErrRemoteService
78+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg()))
8379
}
8480

8581
return
@@ -107,10 +103,12 @@ func transferToUploadSpanAndFile(ctx context.Context, spans []*Span) ([]*entity.
107103
systemTagStrM, systemTagLongM, systemTagDoubleM, _ := parseTag(span.SystemTagMap, true)
108104
resSpan = append(resSpan, &entity.UploadSpan{
109105
StartedATMicros: span.GetStartTime().UnixMicro(),
106+
LogID: span.GetLogID(),
110107
SpanID: span.GetSpanID(),
111108
ParentID: span.GetParentID(),
112109
TraceID: span.GetTraceID(),
113110
DurationMicros: span.GetDuration(),
111+
ServiceName: span.GetServiceName(),
114112
WorkspaceID: span.GetSpaceID(),
115113
SpanName: span.GetSpanName(),
116114
SpanType: span.GetSpanType(),

internal/trace/noop_span.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ func (n noopSpan) SetInputTokens(ctx context.Context, inputTokens int)
3434
func (n noopSpan) SetOutputTokens(ctx context.Context, outputTokens int) {}
3535
func (n noopSpan) SetStartTimeFirstResp(ctx context.Context, startTimeFirstResp int64) {}
3636
func (n noopSpan) SetRuntime(ctx context.Context, runtime tracespec.Runtime) {}
37+
func (n noopSpan) SetServiceName(ctx context.Context, serviceName string) {}
38+
func (n noopSpan) SetLogID(ctx context.Context, logID string) {}
39+
func (n noopSpan) SetFinishTime(finishTime time.Time) {}
3740

3841
// implement of Span
3942
func (n noopSpan) SetTags(ctx context.Context, tagKVs map[string]interface{}) {}

internal/trace/queue_manager.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,23 @@ package trace
1313

1414
import (
1515
"context"
16+
"fmt"
1617
"sync"
1718
"sync/atomic"
1819
"time"
1920

21+
"github.com/coze-dev/cozeloop-go/internal/consts"
2022
"github.com/coze-dev/cozeloop-go/internal/logger"
2123
"github.com/coze-dev/cozeloop-go/internal/util"
2224
)
2325

26+
const (
27+
queueNameSpan = "span"
28+
queueNameSpanRetry = "span_retry"
29+
queueNameFile = "file"
30+
queueNameFileRetry = "file_retry"
31+
)
32+
2433
type exportFunc func(ctx context.Context, s []interface{})
2534

2635
// QueueManager is a queue that batches spans and exports them
@@ -37,7 +46,8 @@ type batchQueueManagerOptions struct {
3746
maxExportBatchLength int
3847
maxExportBatchByteSize int
3948

40-
exportFunc exportFunc
49+
exportFunc exportFunc
50+
finishEventProcessor func(ctx context.Context, info *consts.FinishEventInfo)
4151
}
4252

4353
func newBatchQueueManager(o batchQueueManagerOptions) *BatchQueueManager {
@@ -181,18 +191,41 @@ func (b *BatchQueueManager) Enqueue(ctx context.Context, sd interface{}, byteSiz
181191
if atomic.LoadInt32(&b.stopped) != 0 {
182192
return
183193
}
184-
194+
var extraParams *consts.FinishEventInfoExtra
195+
var eventType = consts.SpanFinishEventFileQueueEntryRate
196+
var errMsg string
197+
var isFail bool
185198
select {
186199
case b.queue <- sd:
187200
b.batchMutex.Lock()
188201
b.batchByteSize += byteSize
189202
b.batchMutex.Unlock()
190-
//logger.CtxDebugf(ctx, "%s queue length: %d", b.o.queueName, len(b.queue))
191-
return
192203
default: // queue is full, not block, drop
193-
logger.CtxErrorf(ctx, "%s queue is full, dropped item", b.o.queueName)
204+
errMsg = fmt.Sprintf("%s queue is full, dropped item", b.o.queueName)
205+
isFail = true
194206
atomic.AddUint32(&b.dropped, 1)
195207
}
208+
209+
switch b.o.queueName {
210+
case queueNameSpan, queueNameSpanRetry:
211+
eventType = consts.SpanFinishEventSpanQueueEntryRate
212+
span, ok := sd.(*Span)
213+
if ok {
214+
extraParams = &consts.FinishEventInfoExtra{
215+
IsRootSpan: span.IsRootSpan(),
216+
}
217+
}
218+
default:
219+
}
220+
if b.o.finishEventProcessor != nil {
221+
b.o.finishEventProcessor(ctx, &consts.FinishEventInfo{
222+
EventType: eventType,
223+
IsEventFail: isFail,
224+
ItemNum: 1,
225+
DetailMsg: errMsg,
226+
ExtraParams: extraParams,
227+
})
228+
}
196229
return
197230
}
198231

0 commit comments

Comments
 (0)