Skip to content

Commit 48fe852

Browse files
committed
support FinishEventProcessor and some Span Set Method
1 parent 374ad7d commit 48fe852

File tree

18 files changed

+495
-75
lines changed

18 files changed

+495
-75
lines changed

client.go

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Client interface {
3535

3636
// GetWorkspaceID return workspace id
3737
GetWorkspaceID() string
38-
// Close Close the client. Should be called before program exit.
38+
// Close close the client. Should be called before program exit.
3939
Close(ctx context.Context)
4040
}
4141

@@ -46,6 +46,7 @@ type HttpClient = httpclient.HTTPClient
4646

4747
type options struct {
4848
apiBaseURL string
49+
apiBasePath *APIBasePath
4950
workspaceID string
5051
httpClient HttpClient
5152
timeout time.Duration
@@ -62,12 +63,16 @@ type options struct {
6263
promptCacheRefreshInterval time.Duration
6364
promptTrace bool
6465
exporter trace.Exporter
66+
traceFinishEventProcessor func(ctx context.Context, info *FinishEventInfo)
67+
traceTagTruncateConf *TagTruncateConf
68+
traceQueueConf *TraceQueueConf
6569
}
6670

6771
func (o *options) MD5() string {
6872
h := md5.New()
6973
separator := "\t"
7074
h.Write([]byte(o.apiBaseURL + separator))
75+
h.Write([]byte(fmt.Sprintf("%p", o.apiBasePath) + separator))
7176
h.Write([]byte(o.workspaceID + separator))
7277
h.Write([]byte(fmt.Sprintf("%p", o.httpClient) + separator))
7378
h.Write([]byte(o.timeout.String() + separator))
@@ -80,6 +85,10 @@ func (o *options) MD5() string {
8085
h.Write([]byte(fmt.Sprintf("%d", o.promptCacheMaxCount) + separator))
8186
h.Write([]byte(o.promptCacheRefreshInterval.String() + separator))
8287
h.Write([]byte(fmt.Sprintf("%v", o.promptTrace) + separator))
88+
h.Write([]byte(fmt.Sprintf("%p", o.exporter) + separator))
89+
h.Write([]byte(fmt.Sprintf("%p", o.traceFinishEventProcessor) + separator))
90+
h.Write([]byte(fmt.Sprintf("%p", o.traceTagTruncateConf) + separator))
91+
h.Write([]byte(fmt.Sprintf("%p", o.traceQueueConf) + separator))
8392
return hex.EncodeToString(h.Sum(nil))
8493
}
8594

@@ -133,10 +142,28 @@ func NewClient(opts ...Option) (Client, error) {
133142
Timeout: options.timeout,
134143
UploadTimeout: options.uploadTimeout,
135144
})
145+
traceFinishEventProcessor := trace.DefaultFinishEventProcessor
146+
if options.traceFinishEventProcessor != nil {
147+
traceFinishEventProcessor = func(ctx context.Context, info *consts.FinishEventInfo) {
148+
trace.DefaultFinishEventProcessor(ctx, info)
149+
options.traceFinishEventProcessor(ctx, (*FinishEventInfo)(info))
150+
}
151+
}
152+
var spanUploadPath string
153+
var fileUploadPath string
154+
if options.apiBasePath != nil {
155+
spanUploadPath = options.apiBasePath.TraceSpanUploadPath
156+
fileUploadPath = options.apiBasePath.TraceFileUploadPath
157+
}
136158
c.traceProvider = trace.NewTraceProvider(httpClient, trace.Options{
137-
WorkspaceID: options.workspaceID,
138-
UltraLargeReport: options.ultraLargeReport,
139-
Exporter: options.exporter,
159+
WorkspaceID: options.workspaceID,
160+
UltraLargeReport: options.ultraLargeReport,
161+
Exporter: options.exporter,
162+
FinishEventProcessor: traceFinishEventProcessor,
163+
TagTruncateConf: (*trace.TagTruncateConf)(options.traceTagTruncateConf),
164+
SpanUploadPath: spanUploadPath,
165+
FileUploadPath: fileUploadPath,
166+
QueueConf: (*trace.QueueConf)(options.traceQueueConf),
140167
})
141168
c.promptProvider = prompt.NewPromptProvider(httpClient, c.traceProvider, prompt.Options{
142169
WorkspaceID: options.workspaceID,
@@ -185,6 +212,12 @@ func WithAPIBaseURL(apiBaseURL string) Option {
185212
}
186213
}
187214

215+
func WithAPIBasePath(apiBasePath *APIBasePath) Option {
216+
return func(p *options) {
217+
p.apiBasePath = apiBasePath
218+
}
219+
}
220+
188221
// WithWorkspaceID set workspace id.
189222
func WithWorkspaceID(workspaceID string) Option {
190223
return func(p *options) {
@@ -241,12 +274,33 @@ func WithPromptTrace(enable bool) Option {
241274
}
242275
}
243276

277+
// WithExporter set custom trace exporter.
244278
func WithExporter(e trace.Exporter) Option {
245279
return func(p *options) {
246280
p.exporter = e
247281
}
248282
}
249283

284+
// WithTraceFinishEventProcessor set custom finish event processor, after span finish.
285+
func WithTraceFinishEventProcessor(f func(ctx context.Context, info *FinishEventInfo)) Option {
286+
return func(p *options) {
287+
p.traceFinishEventProcessor = f
288+
}
289+
}
290+
291+
// WithTraceTagTruncateConf set span tag truncate conf.
292+
func WithTraceTagTruncateConf(conf *TagTruncateConf) Option {
293+
return func(p *options) {
294+
p.traceTagTruncateConf = conf
295+
}
296+
}
297+
298+
func WithTraceQueueConf(conf *TraceQueueConf) Option {
299+
return func(p *options) {
300+
p.traceQueueConf = conf
301+
}
302+
}
303+
250304
// GetWorkspaceID return space id
251305
func GetWorkspaceID() string {
252306
return getDefaultClient().GetWorkspaceID()
@@ -350,13 +404,27 @@ func buildAuth(opts options) (httpclient.Auth, error) {
350404
return nil, ErrAuthInfoRequired
351405
}
352406

407+
func SetDefaultClient(client Client) {
408+
defaultClientLock.Lock()
409+
defer defaultClientLock.Unlock()
410+
defaultClient = client
411+
}
412+
353413
func getDefaultClient() Client {
414+
if defaultClient != nil {
415+
return defaultClient
416+
}
354417
once.Do(func() {
355418
var err error
356-
defaultClient, err = NewClient()
419+
client, err := NewClient()
357420
if err != nil {
421+
defaultClientLock.Lock()
358422
defaultClient = &NoopClient{newClientError: err}
423+
defaultClientLock.Unlock()
359424
} else {
425+
defaultClientLock.Lock()
426+
defaultClient = client
427+
defaultClientLock.Unlock()
360428
sigChan := make(chan os.Signal, 1)
361429
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
362430
go func() {
@@ -366,7 +434,9 @@ func getDefaultClient() Client {
366434

367435
logger.CtxInfof(ctx, "Received signal: %v, starting graceful shutdown...", sig)
368436
defaultClient.Close(ctx)
437+
defaultClientLock.Lock()
369438
defaultClient = &NoopClient{newClientError: consts.ErrClientClosed}
439+
defaultClientLock.Unlock()
370440
logger.CtxInfof(ctx, "Graceful shutdown finished.")
371441
os.Exit(0)
372442
}()
@@ -376,9 +446,10 @@ func getDefaultClient() Client {
376446
}
377447

378448
var (
379-
defaultClient Client
380-
once sync.Once
381-
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
449+
defaultClient Client
450+
defaultClientLock sync.RWMutex
451+
once sync.Once
452+
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
382453
)
383454

384455
type loopClient struct {
@@ -426,7 +497,7 @@ func (c *loopClient) PromptFormat(ctx context.Context, loopPrompt *entity.Prompt
426497

427498
func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) {
428499
if c.closed {
429-
return ctx, defaultNoopSpan
500+
return ctx, DefaultNoopSpan
430501
}
431502
config := trace.StartSpanOptions{}
432503
for _, opt := range opts {
@@ -438,25 +509,25 @@ func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts
438509
ctx, span, err := c.traceProvider.StartSpan(ctx, name, spanType, config)
439510
if err != nil {
440511
logger.CtxWarnf(ctx, "start span failed, return noop span. %v", err)
441-
return ctx, defaultNoopSpan
512+
return ctx, DefaultNoopSpan
442513
}
443514
return ctx, span
444515
}
445516

446517
func (c *loopClient) GetSpanFromContext(ctx context.Context) Span {
447518
if c.closed {
448-
return defaultNoopSpan
519+
return DefaultNoopSpan
449520
}
450521
span := c.traceProvider.GetSpanFromContext(ctx)
451522
if span == nil {
452-
return defaultNoopSpan
523+
return DefaultNoopSpan
453524
}
454525
return span
455526
}
456527

457528
func (c *loopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext {
458529
if c.closed {
459-
return defaultNoopSpan
530+
return DefaultNoopSpan
460531
}
461532
return c.traceProvider.GetSpanFromHeader(ctx, header)
462533
}

const.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package cozeloop
55

66
import (
77
"github.com/coze-dev/cozeloop-go/internal/consts"
8+
"github.com/coze-dev/cozeloop-go/internal/trace"
89
)
910

1011
const (
@@ -19,3 +20,24 @@ const (
1920
// ComBaseURL = consts.ComBaseURL
2021
CnBaseURL = consts.CnBaseURL
2122
)
23+
24+
// SpanFinishEvent finish inner event
25+
type SpanFinishEvent consts.SpanFinishEvent
26+
27+
const (
28+
SpanFinishEventSpanQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventSpanQueueEntryRate)
29+
SpanFinishEventFileQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventFileQueueEntryRate)
30+
SpanFinishEventFlushSpanRate = SpanFinishEvent(consts.SpanFinishEventFlushSpanRate)
31+
SpanFinishEventFlushFileRate = SpanFinishEvent(consts.SpanFinishEventFlushFileRate)
32+
)
33+
34+
type FinishEventInfo consts.FinishEventInfo
35+
36+
type TagTruncateConf trace.TagTruncateConf
37+
38+
type APIBasePath struct {
39+
TraceSpanUploadPath string
40+
TraceFileUploadPath string
41+
}
42+
43+
type TraceQueueConf trace.QueueConf

entity/export.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package entity
22

33
type UploadSpan struct {
4-
StartedATMicros int64 `json:"started_at_micros"`
4+
StartedATMicros int64 `json:"started_at_micros"` // start time in microseconds
5+
LogID string `json:"log_id"` // the custom log id, identify different query.
56
SpanID string `json:"span_id"`
67
ParentID string `json:"parent_id"`
78
TraceID string `json:"trace_id"`
8-
DurationMicros int64 `json:"duration_micros"`
9-
WorkspaceID string `json:"workspace_id"`
9+
DurationMicros int64 `json:"duration_micros"` // duration in microseconds
10+
ServiceName string `json:"service_name"` // the custom service name, identify different services.
11+
WorkspaceID string `json:"workspace_id"` // cozeloop workspace id
1012
SpanName string `json:"span_name"`
1113
SpanType string `json:"span_type"`
1214
StatusCode int32 `json:"status_code"`
1315
Input string `json:"input"`
1416
Output string `json:"output"`
15-
ObjectStorage string `json:"object_storage"`
17+
ObjectStorage string `json:"object_storage"` // file object storage
1618
SystemTagsString map[string]string `json:"system_tags_string"`
1719
SystemTagsLong map[string]int64 `json:"system_tags_long"`
1820
SystemTagsDouble map[string]float64 `json:"system_tags_double"`

internal/consts/finish_event.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package consts
2+
3+
type SpanFinishEvent string
4+
5+
const (
6+
SpanFinishEventSpanQueueEntryRate SpanFinishEvent = "queue_manager.span_entry.rate"
7+
SpanFinishEventFileQueueEntryRate SpanFinishEvent = "queue_manager.file_entry.rate"
8+
9+
SpanFinishEventFlushSpanRate SpanFinishEvent = "exporter.span_flush.rate"
10+
SpanFinishEventFlushFileRate SpanFinishEvent = "exporter.file_flush.rate"
11+
)
12+
13+
type FinishEventInfo struct {
14+
EventType SpanFinishEvent
15+
IsEventFail bool
16+
ItemNum int // maybe multiple span is processed in one event
17+
DetailMsg string
18+
ExtraParams *FinishEventInfoExtra
19+
}
20+
21+
type FinishEventInfoExtra struct {
22+
IsRootSpan bool
23+
LatencyMs int64
24+
}

internal/consts/span_key.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
ThreadID = "thread_id"
1111
StartTimeFirstResp = "start_time_first_resp"
1212
LatencyFirstResp = "latency_first_resp"
13+
DeploymentEnv = "deployment_env"
1314

1415
CutOff = "cut_off"
1516
)

internal/httpclient/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ func setAuthorizationHeader(ctx context.Context, request *http.Request, auth Aut
203203
}
204204

205205
func parseResponse(ctx context.Context, url string, response *http.Response, resp OpenAPIResponse) error {
206+
if response == nil {
207+
return nil
208+
}
209+
defer response.Body.Close()
210+
206211
logID := response.Header.Get(consts.LogIDHeader)
207212
respBody, err := io.ReadAll(response.Body)
208213
if err != nil {

internal/trace/exporter.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ const (
3939
var _ Exporter = (*SpanExporter)(nil)
4040

4141
type SpanExporter struct {
42-
client *httpclient.Client
42+
client *httpclient.Client
43+
uploadPath UploadPath
44+
}
45+
46+
type UploadPath struct {
47+
spanUploadPath string
48+
fileUploadPath string
4349
}
4450

4551
func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFile) error {
@@ -50,14 +56,12 @@ func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFi
5056
}
5157
logger.CtxDebugf(ctx, "uploadFile start, file name: %s", file.Name)
5258
resp := httpclient.BaseResponse{}
53-
err := e.client.UploadFile(ctx, pathUploadFile, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
59+
err := e.client.UploadFile(ctx, e.uploadPath.fileUploadPath, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
5460
if err != nil {
55-
logger.CtxErrorf(ctx, "export files[%s] fail, err:[%v], retry later", file.TosKey, err)
56-
return err
61+
return consts.NewError(fmt.Sprintf("export files[%s] fail", file.TosKey)).Wrap(err)
5762
}
5863
if resp.GetCode() != 0 { // todo: some err code do not need retry
59-
logger.CtxErrorf(ctx, "export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg())
60-
return consts.ErrRemoteService
64+
return consts.NewError(fmt.Sprintf("export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg()))
6165
}
6266
logger.CtxDebugf(ctx, "uploadFile end, file name: %s", file.Name)
6367
}
@@ -69,17 +73,13 @@ func (e *SpanExporter) ExportSpans(ctx context.Context, ss []*entity.UploadSpan)
6973
if len(ss) == 0 {
7074
return
7175
}
72-
logger.CtxDebugf(ctx, "export spans, spans count: %d", len(ss))
73-
7476
resp := httpclient.BaseResponse{}
75-
err = e.client.Post(ctx, pathIngestTrace, UploadSpanData{ss}, &resp)
77+
err = e.client.Post(ctx, e.uploadPath.spanUploadPath, UploadSpanData{ss}, &resp)
7678
if err != nil {
77-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], err:[%v]", len(ss), err)
78-
return err
79+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d]", len(ss))).Wrap(err)
7980
}
8081
if resp.GetCode() != 0 { // todo: some err code do not need retry
81-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg())
82-
return consts.ErrRemoteService
82+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg()))
8383
}
8484

8585
return
@@ -107,10 +107,12 @@ func transferToUploadSpanAndFile(ctx context.Context, spans []*Span) ([]*entity.
107107
systemTagStrM, systemTagLongM, systemTagDoubleM, _ := parseTag(span.SystemTagMap, true)
108108
resSpan = append(resSpan, &entity.UploadSpan{
109109
StartedATMicros: span.GetStartTime().UnixMicro(),
110+
LogID: span.GetLogID(),
110111
SpanID: span.GetSpanID(),
111112
ParentID: span.GetParentID(),
112113
TraceID: span.GetTraceID(),
113114
DurationMicros: span.GetDuration(),
115+
ServiceName: span.GetServiceName(),
114116
WorkspaceID: span.GetSpaceID(),
115117
SpanName: span.GetSpanName(),
116118
SpanType: span.GetSpanType(),

internal/trace/noop_span.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ func (n noopSpan) SetInputTokens(ctx context.Context, inputTokens int)
3434
func (n noopSpan) SetOutputTokens(ctx context.Context, outputTokens int) {}
3535
func (n noopSpan) SetStartTimeFirstResp(ctx context.Context, startTimeFirstResp int64) {}
3636
func (n noopSpan) SetRuntime(ctx context.Context, runtime tracespec.Runtime) {}
37+
func (n noopSpan) SetServiceName(ctx context.Context, serviceName string) {}
38+
func (n noopSpan) SetLogID(ctx context.Context, logID string) {}
39+
func (n noopSpan) SetFinishTime(finishTime time.Time) {}
40+
func (n noopSpan) SetSystemTags(ctx context.Context, systemTags map[string]interface{}) {}
41+
func (n noopSpan) SetDeploymentEnv(ctx context.Context, deploymentEnv string) {}
3742

3843
// implement of Span
3944
func (n noopSpan) SetTags(ctx context.Context, tagKVs map[string]interface{}) {}

0 commit comments

Comments
 (0)