Skip to content

Commit 3f01d90

Browse files
committed
support FinishEventProcessor and some Span Set Method
1 parent 374ad7d commit 3f01d90

File tree

17 files changed

+490
-72
lines changed

17 files changed

+490
-72
lines changed

client.go

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Client interface {
3535

3636
// GetWorkspaceID return workspace id
3737
GetWorkspaceID() string
38-
// Close Close the client. Should be called before program exit.
38+
// Close close the client. Should be called before program exit.
3939
Close(ctx context.Context)
4040
}
4141

@@ -46,6 +46,7 @@ type HttpClient = httpclient.HTTPClient
4646

4747
type options struct {
4848
apiBaseURL string
49+
apiBasePath *APIBasePath
4950
workspaceID string
5051
httpClient HttpClient
5152
timeout time.Duration
@@ -62,6 +63,9 @@ type options struct {
6263
promptCacheRefreshInterval time.Duration
6364
promptTrace bool
6465
exporter trace.Exporter
66+
traceFinishEventProcessor func(ctx context.Context, info *FinishEventInfo)
67+
traceTagTruncateConf *TagTruncateConf
68+
traceQueueConf *TraceQueueConf
6569
}
6670

6771
func (o *options) MD5() string {
@@ -80,6 +84,10 @@ func (o *options) MD5() string {
8084
h.Write([]byte(fmt.Sprintf("%d", o.promptCacheMaxCount) + separator))
8185
h.Write([]byte(o.promptCacheRefreshInterval.String() + separator))
8286
h.Write([]byte(fmt.Sprintf("%v", o.promptTrace) + separator))
87+
h.Write([]byte(fmt.Sprintf("%p", o.exporter) + separator))
88+
h.Write([]byte(fmt.Sprintf("%p", o.traceFinishEventProcessor) + separator))
89+
h.Write([]byte(fmt.Sprintf("%p", o.traceTagTruncateConf) + separator))
90+
h.Write([]byte(fmt.Sprintf("%p", o.traceQueueConf) + separator))
8391
return hex.EncodeToString(h.Sum(nil))
8492
}
8593

@@ -133,10 +141,28 @@ func NewClient(opts ...Option) (Client, error) {
133141
Timeout: options.timeout,
134142
UploadTimeout: options.uploadTimeout,
135143
})
144+
traceFinishEventProcessor := trace.DefaultFinishEventProcessor
145+
if options.traceFinishEventProcessor != nil {
146+
traceFinishEventProcessor = func(ctx context.Context, info *consts.FinishEventInfo) {
147+
trace.DefaultFinishEventProcessor(ctx, info)
148+
options.traceFinishEventProcessor(ctx, (*FinishEventInfo)(info))
149+
}
150+
}
151+
var spanUploadPath string
152+
var fileUploadPath string
153+
if options.apiBasePath != nil {
154+
spanUploadPath = options.apiBasePath.TraceSpanUploadPath
155+
fileUploadPath = options.apiBasePath.TraceFileUploadPath
156+
}
136157
c.traceProvider = trace.NewTraceProvider(httpClient, trace.Options{
137-
WorkspaceID: options.workspaceID,
138-
UltraLargeReport: options.ultraLargeReport,
139-
Exporter: options.exporter,
158+
WorkspaceID: options.workspaceID,
159+
UltraLargeReport: options.ultraLargeReport,
160+
Exporter: options.exporter,
161+
FinishEventProcessor: traceFinishEventProcessor,
162+
TagTruncateConf: (*trace.TagTruncateConf)(options.traceTagTruncateConf),
163+
SpanUploadPath: spanUploadPath,
164+
FileUploadPath: fileUploadPath,
165+
QueueConf: (*trace.QueueConf)(options.traceQueueConf),
140166
})
141167
c.promptProvider = prompt.NewPromptProvider(httpClient, c.traceProvider, prompt.Options{
142168
WorkspaceID: options.workspaceID,
@@ -185,6 +211,12 @@ func WithAPIBaseURL(apiBaseURL string) Option {
185211
}
186212
}
187213

214+
func WithAPIBasePath(apiBasePath *APIBasePath) Option {
215+
return func(p *options) {
216+
p.apiBasePath = apiBasePath
217+
}
218+
}
219+
188220
// WithWorkspaceID set workspace id.
189221
func WithWorkspaceID(workspaceID string) Option {
190222
return func(p *options) {
@@ -241,12 +273,33 @@ func WithPromptTrace(enable bool) Option {
241273
}
242274
}
243275

276+
// WithExporter set custom trace exporter.
244277
func WithExporter(e trace.Exporter) Option {
245278
return func(p *options) {
246279
p.exporter = e
247280
}
248281
}
249282

283+
// WithTraceFinishEventProcessor set custom finish event processor, after span finish.
284+
func WithTraceFinishEventProcessor(f func(ctx context.Context, info *FinishEventInfo)) Option {
285+
return func(p *options) {
286+
p.traceFinishEventProcessor = f
287+
}
288+
}
289+
290+
// WithTraceTagTruncateConf set span tag truncate conf.
291+
func WithTraceTagTruncateConf(conf *TagTruncateConf) Option {
292+
return func(p *options) {
293+
p.traceTagTruncateConf = conf
294+
}
295+
}
296+
297+
func WithTraceQueueConf(conf *TraceQueueConf) Option {
298+
return func(p *options) {
299+
p.traceQueueConf = conf
300+
}
301+
}
302+
250303
// GetWorkspaceID return space id
251304
func GetWorkspaceID() string {
252305
return getDefaultClient().GetWorkspaceID()
@@ -350,13 +403,27 @@ func buildAuth(opts options) (httpclient.Auth, error) {
350403
return nil, ErrAuthInfoRequired
351404
}
352405

406+
func SetDefaultClient(client Client) {
407+
defaultClientLock.Lock()
408+
defer defaultClientLock.Unlock()
409+
defaultClient = client
410+
}
411+
353412
func getDefaultClient() Client {
413+
if defaultClient != nil {
414+
return defaultClient
415+
}
354416
once.Do(func() {
355417
var err error
356-
defaultClient, err = NewClient()
418+
client, err := NewClient()
357419
if err != nil {
420+
defaultClientLock.Lock()
358421
defaultClient = &NoopClient{newClientError: err}
422+
defaultClientLock.Unlock()
359423
} else {
424+
defaultClientLock.Lock()
425+
defaultClient = client
426+
defaultClientLock.Unlock()
360427
sigChan := make(chan os.Signal, 1)
361428
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
362429
go func() {
@@ -366,7 +433,9 @@ func getDefaultClient() Client {
366433

367434
logger.CtxInfof(ctx, "Received signal: %v, starting graceful shutdown...", sig)
368435
defaultClient.Close(ctx)
436+
defaultClientLock.Lock()
369437
defaultClient = &NoopClient{newClientError: consts.ErrClientClosed}
438+
defaultClientLock.Unlock()
370439
logger.CtxInfof(ctx, "Graceful shutdown finished.")
371440
os.Exit(0)
372441
}()
@@ -376,9 +445,10 @@ func getDefaultClient() Client {
376445
}
377446

378447
var (
379-
defaultClient Client
380-
once sync.Once
381-
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
448+
defaultClient Client
449+
defaultClientLock sync.RWMutex
450+
once sync.Once
451+
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
382452
)
383453

384454
type loopClient struct {
@@ -426,7 +496,7 @@ func (c *loopClient) PromptFormat(ctx context.Context, loopPrompt *entity.Prompt
426496

427497
func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) {
428498
if c.closed {
429-
return ctx, defaultNoopSpan
499+
return ctx, DefaultNoopSpan
430500
}
431501
config := trace.StartSpanOptions{}
432502
for _, opt := range opts {
@@ -438,25 +508,25 @@ func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts
438508
ctx, span, err := c.traceProvider.StartSpan(ctx, name, spanType, config)
439509
if err != nil {
440510
logger.CtxWarnf(ctx, "start span failed, return noop span. %v", err)
441-
return ctx, defaultNoopSpan
511+
return ctx, DefaultNoopSpan
442512
}
443513
return ctx, span
444514
}
445515

446516
func (c *loopClient) GetSpanFromContext(ctx context.Context) Span {
447517
if c.closed {
448-
return defaultNoopSpan
518+
return DefaultNoopSpan
449519
}
450520
span := c.traceProvider.GetSpanFromContext(ctx)
451521
if span == nil {
452-
return defaultNoopSpan
522+
return DefaultNoopSpan
453523
}
454524
return span
455525
}
456526

457527
func (c *loopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext {
458528
if c.closed {
459-
return defaultNoopSpan
529+
return DefaultNoopSpan
460530
}
461531
return c.traceProvider.GetSpanFromHeader(ctx, header)
462532
}

const.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package cozeloop
55

66
import (
77
"github.com/coze-dev/cozeloop-go/internal/consts"
8+
"github.com/coze-dev/cozeloop-go/internal/trace"
89
)
910

1011
const (
@@ -19,3 +20,24 @@ const (
1920
// ComBaseURL = consts.ComBaseURL
2021
CnBaseURL = consts.CnBaseURL
2122
)
23+
24+
// SpanFinishEvent finish inner event
25+
type SpanFinishEvent consts.SpanFinishEvent
26+
27+
const (
28+
SpanFinishEventSpanQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventSpanQueueEntryRate)
29+
SpanFinishEventFileQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventFileQueueEntryRate)
30+
SpanFinishEventFlushSpanRate = SpanFinishEvent(consts.SpanFinishEventFlushSpanRate)
31+
SpanFinishEventFlushFileRate = SpanFinishEvent(consts.SpanFinishEventFlushFileRate)
32+
)
33+
34+
type FinishEventInfo consts.FinishEventInfo
35+
36+
type TagTruncateConf trace.TagTruncateConf
37+
38+
type APIBasePath struct {
39+
TraceSpanUploadPath string
40+
TraceFileUploadPath string
41+
}
42+
43+
type TraceQueueConf trace.QueueConf

entity/export.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package entity
22

33
type UploadSpan struct {
4-
StartedATMicros int64 `json:"started_at_micros"`
4+
StartedATMicros int64 `json:"started_at_micros"` // start time in microseconds
5+
LogID string `json:"log_id"` // the custom log id, identify different query.
56
SpanID string `json:"span_id"`
67
ParentID string `json:"parent_id"`
78
TraceID string `json:"trace_id"`
8-
DurationMicros int64 `json:"duration_micros"`
9-
WorkspaceID string `json:"workspace_id"`
9+
DurationMicros int64 `json:"duration_micros"` // duration in microseconds
10+
ServiceName string `json:"service_name"` // the custom service name, identify different services.
11+
WorkspaceID string `json:"workspace_id"` // cozeloop workspace id
1012
SpanName string `json:"span_name"`
1113
SpanType string `json:"span_type"`
1214
StatusCode int32 `json:"status_code"`
1315
Input string `json:"input"`
1416
Output string `json:"output"`
15-
ObjectStorage string `json:"object_storage"`
17+
ObjectStorage string `json:"object_storage"` // file object storage
1618
SystemTagsString map[string]string `json:"system_tags_string"`
1719
SystemTagsLong map[string]int64 `json:"system_tags_long"`
1820
SystemTagsDouble map[string]float64 `json:"system_tags_double"`

internal/consts/finish_event.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package consts
2+
3+
type SpanFinishEvent string
4+
5+
const (
6+
SpanFinishEventSpanQueueEntryRate SpanFinishEvent = "queue_manager.span_entry.rate"
7+
SpanFinishEventFileQueueEntryRate SpanFinishEvent = "queue_manager.file_entry.rate"
8+
9+
SpanFinishEventFlushSpanRate SpanFinishEvent = "exporter.span_flush.rate"
10+
SpanFinishEventFlushFileRate SpanFinishEvent = "exporter.file_flush.rate"
11+
)
12+
13+
type FinishEventInfo struct {
14+
EventType SpanFinishEvent
15+
IsEventFail bool
16+
ItemNum int // maybe multiple span is processed in one event
17+
DetailMsg string
18+
ExtraParams *FinishEventInfoExtra
19+
}
20+
21+
type FinishEventInfoExtra struct {
22+
IsRootSpan bool
23+
LatencyMs int64
24+
}

internal/consts/span_key.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
ThreadID = "thread_id"
1111
StartTimeFirstResp = "start_time_first_resp"
1212
LatencyFirstResp = "latency_first_resp"
13+
DeploymentEnv = "deployment_env"
1314

1415
CutOff = "cut_off"
1516
)

internal/httpclient/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ func setAuthorizationHeader(ctx context.Context, request *http.Request, auth Aut
203203
}
204204

205205
func parseResponse(ctx context.Context, url string, response *http.Response, resp OpenAPIResponse) error {
206+
if response == nil {
207+
return nil
208+
}
209+
defer response.Body.Close()
210+
206211
logID := response.Header.Get(consts.LogIDHeader)
207212
respBody, err := io.ReadAll(response.Body)
208213
if err != nil {

internal/trace/exporter.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ const (
3939
var _ Exporter = (*SpanExporter)(nil)
4040

4141
type SpanExporter struct {
42-
client *httpclient.Client
42+
client *httpclient.Client
43+
uploadPath UploadPath
44+
}
45+
46+
type UploadPath struct {
47+
spanUploadPath string
48+
fileUploadPath string
4349
}
4450

4551
func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFile) error {
@@ -50,14 +56,12 @@ func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFi
5056
}
5157
logger.CtxDebugf(ctx, "uploadFile start, file name: %s", file.Name)
5258
resp := httpclient.BaseResponse{}
53-
err := e.client.UploadFile(ctx, pathUploadFile, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
59+
err := e.client.UploadFile(ctx, e.uploadPath.fileUploadPath, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
5460
if err != nil {
55-
logger.CtxErrorf(ctx, "export files[%s] fail, err:[%v], retry later", file.TosKey, err)
56-
return err
61+
return consts.NewError(fmt.Sprintf("export files[%s] fail", file.TosKey)).Wrap(err)
5762
}
5863
if resp.GetCode() != 0 { // todo: some err code do not need retry
59-
logger.CtxErrorf(ctx, "export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg())
60-
return consts.ErrRemoteService
64+
return consts.NewError(fmt.Sprintf("export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg()))
6165
}
6266
logger.CtxDebugf(ctx, "uploadFile end, file name: %s", file.Name)
6367
}
@@ -69,17 +73,13 @@ func (e *SpanExporter) ExportSpans(ctx context.Context, ss []*entity.UploadSpan)
6973
if len(ss) == 0 {
7074
return
7175
}
72-
logger.CtxDebugf(ctx, "export spans, spans count: %d", len(ss))
73-
7476
resp := httpclient.BaseResponse{}
75-
err = e.client.Post(ctx, pathIngestTrace, UploadSpanData{ss}, &resp)
77+
err = e.client.Post(ctx, e.uploadPath.spanUploadPath, UploadSpanData{ss}, &resp)
7678
if err != nil {
77-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], err:[%v]", len(ss), err)
78-
return err
79+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d]", len(ss))).Wrap(err)
7980
}
8081
if resp.GetCode() != 0 { // todo: some err code do not need retry
81-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg())
82-
return consts.ErrRemoteService
82+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg()))
8383
}
8484

8585
return
@@ -107,10 +107,12 @@ func transferToUploadSpanAndFile(ctx context.Context, spans []*Span) ([]*entity.
107107
systemTagStrM, systemTagLongM, systemTagDoubleM, _ := parseTag(span.SystemTagMap, true)
108108
resSpan = append(resSpan, &entity.UploadSpan{
109109
StartedATMicros: span.GetStartTime().UnixMicro(),
110+
LogID: span.GetLogID(),
110111
SpanID: span.GetSpanID(),
111112
ParentID: span.GetParentID(),
112113
TraceID: span.GetTraceID(),
113114
DurationMicros: span.GetDuration(),
115+
ServiceName: span.GetServiceName(),
114116
WorkspaceID: span.GetSpaceID(),
115117
SpanName: span.GetSpanName(),
116118
SpanType: span.GetSpanType(),

internal/trace/noop_span.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ func (n noopSpan) SetInputTokens(ctx context.Context, inputTokens int)
3434
func (n noopSpan) SetOutputTokens(ctx context.Context, outputTokens int) {}
3535
func (n noopSpan) SetStartTimeFirstResp(ctx context.Context, startTimeFirstResp int64) {}
3636
func (n noopSpan) SetRuntime(ctx context.Context, runtime tracespec.Runtime) {}
37+
func (n noopSpan) SetServiceName(ctx context.Context, serviceName string) {}
38+
func (n noopSpan) SetLogID(ctx context.Context, logID string) {}
39+
func (n noopSpan) SetFinishTime(finishTime time.Time) {}
40+
func (n noopSpan) SetSystemTags(ctx context.Context, systemTags map[string]interface{}) {}
41+
func (n noopSpan) SetDeploymentEnv(ctx context.Context, deploymentEnv string) {}
3742

3843
// implement of Span
3944
func (n noopSpan) SetTags(ctx context.Context, tagKVs map[string]interface{}) {}

0 commit comments

Comments
 (0)