Skip to content

Commit 7a5253f

Browse files
committed
support FinishEventProcessor and some Span Set Method
1 parent 374ad7d commit 7a5253f

File tree

16 files changed

+453
-70
lines changed

16 files changed

+453
-70
lines changed

client.go

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Client interface {
3535

3636
// GetWorkspaceID return workspace id
3737
GetWorkspaceID() string
38-
// Close Close the client. Should be called before program exit.
38+
// Close close the client. Should be called before program exit.
3939
Close(ctx context.Context)
4040
}
4141

@@ -46,6 +46,7 @@ type HttpClient = httpclient.HTTPClient
4646

4747
type options struct {
4848
apiBaseURL string
49+
apiBasePath *APIBasePath
4950
workspaceID string
5051
httpClient HttpClient
5152
timeout time.Duration
@@ -62,6 +63,9 @@ type options struct {
6263
promptCacheRefreshInterval time.Duration
6364
promptTrace bool
6465
exporter trace.Exporter
66+
traceFinishEventProcessor func(ctx context.Context, info *FinishEventInfo)
67+
traceTagTruncateConf *TagTruncateConf
68+
traceQueueConf *TraceQueueConf
6569
}
6670

6771
func (o *options) MD5() string {
@@ -133,10 +137,28 @@ func NewClient(opts ...Option) (Client, error) {
133137
Timeout: options.timeout,
134138
UploadTimeout: options.uploadTimeout,
135139
})
140+
traceFinishEventProcessor := trace.DefaultFinishEventProcessor
141+
if options.traceFinishEventProcessor != nil {
142+
traceFinishEventProcessor = func(ctx context.Context, info *consts.FinishEventInfo) {
143+
trace.DefaultFinishEventProcessor(ctx, info)
144+
options.traceFinishEventProcessor(ctx, (*FinishEventInfo)(info))
145+
}
146+
}
147+
var spanUploadPath string
148+
var fileUploadPath string
149+
if options.apiBasePath != nil {
150+
spanUploadPath = options.apiBasePath.TraceSpanUploadPath
151+
fileUploadPath = options.apiBasePath.TraceFileUploadPath
152+
}
136153
c.traceProvider = trace.NewTraceProvider(httpClient, trace.Options{
137-
WorkspaceID: options.workspaceID,
138-
UltraLargeReport: options.ultraLargeReport,
139-
Exporter: options.exporter,
154+
WorkspaceID: options.workspaceID,
155+
UltraLargeReport: options.ultraLargeReport,
156+
Exporter: options.exporter,
157+
FinishEventProcessor: traceFinishEventProcessor,
158+
TagTruncateConf: (*trace.TagTruncateConf)(options.traceTagTruncateConf),
159+
SpanUploadPath: spanUploadPath,
160+
FileUploadPath: fileUploadPath,
161+
QueueConf: (*trace.QueueConf)(options.traceQueueConf),
140162
})
141163
c.promptProvider = prompt.NewPromptProvider(httpClient, c.traceProvider, prompt.Options{
142164
WorkspaceID: options.workspaceID,
@@ -185,6 +207,12 @@ func WithAPIBaseURL(apiBaseURL string) Option {
185207
}
186208
}
187209

210+
func WithAPIBasePath(apiBasePath *APIBasePath) Option {
211+
return func(p *options) {
212+
p.apiBasePath = apiBasePath
213+
}
214+
}
215+
188216
// WithWorkspaceID set workspace id.
189217
func WithWorkspaceID(workspaceID string) Option {
190218
return func(p *options) {
@@ -241,12 +269,33 @@ func WithPromptTrace(enable bool) Option {
241269
}
242270
}
243271

272+
// WithExporter set custom trace exporter.
244273
func WithExporter(e trace.Exporter) Option {
245274
return func(p *options) {
246275
p.exporter = e
247276
}
248277
}
249278

279+
// WithTraceFinishEventProcessor set custom finish event processor, after span finish.
280+
func WithTraceFinishEventProcessor(f func(ctx context.Context, info *FinishEventInfo)) Option {
281+
return func(p *options) {
282+
p.traceFinishEventProcessor = f
283+
}
284+
}
285+
286+
// WithTraceTagTruncateConf set span tag truncate conf.
287+
func WithTraceTagTruncateConf(conf *TagTruncateConf) Option {
288+
return func(p *options) {
289+
p.traceTagTruncateConf = conf
290+
}
291+
}
292+
293+
func WithTraceQueueConf(conf *TraceQueueConf) Option {
294+
return func(p *options) {
295+
p.traceQueueConf = conf
296+
}
297+
}
298+
250299
// GetWorkspaceID return space id
251300
func GetWorkspaceID() string {
252301
return getDefaultClient().GetWorkspaceID()
@@ -350,13 +399,27 @@ func buildAuth(opts options) (httpclient.Auth, error) {
350399
return nil, ErrAuthInfoRequired
351400
}
352401

402+
func SetDefaultClient(client Client) {
403+
defaultClientLock.Lock()
404+
defer defaultClientLock.Unlock()
405+
defaultClient = client
406+
}
407+
353408
func getDefaultClient() Client {
409+
if defaultClient != nil {
410+
return defaultClient
411+
}
354412
once.Do(func() {
355413
var err error
356-
defaultClient, err = NewClient()
414+
client, err := NewClient()
357415
if err != nil {
416+
defaultClientLock.Lock()
358417
defaultClient = &NoopClient{newClientError: err}
418+
defaultClientLock.Unlock()
359419
} else {
420+
defaultClientLock.Lock()
421+
defaultClient = client
422+
defaultClientLock.Unlock()
360423
sigChan := make(chan os.Signal, 1)
361424
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
362425
go func() {
@@ -366,7 +429,9 @@ func getDefaultClient() Client {
366429

367430
logger.CtxInfof(ctx, "Received signal: %v, starting graceful shutdown...", sig)
368431
defaultClient.Close(ctx)
432+
defaultClientLock.Lock()
369433
defaultClient = &NoopClient{newClientError: consts.ErrClientClosed}
434+
defaultClientLock.Unlock()
370435
logger.CtxInfof(ctx, "Graceful shutdown finished.")
371436
os.Exit(0)
372437
}()
@@ -376,9 +441,10 @@ func getDefaultClient() Client {
376441
}
377442

378443
var (
379-
defaultClient Client
380-
once sync.Once
381-
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
444+
defaultClient Client
445+
defaultClientLock sync.RWMutex
446+
once sync.Once
447+
clientCache sync.Map // client cache to avoid creating multiple clients with the same options
382448
)
383449

384450
type loopClient struct {
@@ -426,7 +492,7 @@ func (c *loopClient) PromptFormat(ctx context.Context, loopPrompt *entity.Prompt
426492

427493
func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts ...StartSpanOption) (context.Context, Span) {
428494
if c.closed {
429-
return ctx, defaultNoopSpan
495+
return ctx, DefaultNoopSpan
430496
}
431497
config := trace.StartSpanOptions{}
432498
for _, opt := range opts {
@@ -438,25 +504,25 @@ func (c *loopClient) StartSpan(ctx context.Context, name, spanType string, opts
438504
ctx, span, err := c.traceProvider.StartSpan(ctx, name, spanType, config)
439505
if err != nil {
440506
logger.CtxWarnf(ctx, "start span failed, return noop span. %v", err)
441-
return ctx, defaultNoopSpan
507+
return ctx, DefaultNoopSpan
442508
}
443509
return ctx, span
444510
}
445511

446512
func (c *loopClient) GetSpanFromContext(ctx context.Context) Span {
447513
if c.closed {
448-
return defaultNoopSpan
514+
return DefaultNoopSpan
449515
}
450516
span := c.traceProvider.GetSpanFromContext(ctx)
451517
if span == nil {
452-
return defaultNoopSpan
518+
return DefaultNoopSpan
453519
}
454520
return span
455521
}
456522

457523
func (c *loopClient) GetSpanFromHeader(ctx context.Context, header map[string]string) SpanContext {
458524
if c.closed {
459-
return defaultNoopSpan
525+
return DefaultNoopSpan
460526
}
461527
return c.traceProvider.GetSpanFromHeader(ctx, header)
462528
}

const.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package cozeloop
55

66
import (
77
"github.com/coze-dev/cozeloop-go/internal/consts"
8+
"github.com/coze-dev/cozeloop-go/internal/trace"
89
)
910

1011
const (
@@ -19,3 +20,24 @@ const (
1920
// ComBaseURL = consts.ComBaseURL
2021
CnBaseURL = consts.CnBaseURL
2122
)
23+
24+
// SpanFinishEvent finish inner event
25+
type SpanFinishEvent consts.SpanFinishEvent
26+
27+
const (
28+
SpanFinishEventSpanQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventSpanQueueEntryRate)
29+
SpanFinishEventFileQueueEntryRate = SpanFinishEvent(consts.SpanFinishEventFileQueueEntryRate)
30+
SpanFinishEventFlushSpanRate = SpanFinishEvent(consts.SpanFinishEventFlushSpanRate)
31+
SpanFinishEventFlushFileRate = SpanFinishEvent(consts.SpanFinishEventFlushFileRate)
32+
)
33+
34+
type FinishEventInfo consts.FinishEventInfo
35+
36+
type TagTruncateConf trace.TagTruncateConf
37+
38+
type APIBasePath struct {
39+
TraceSpanUploadPath string
40+
TraceFileUploadPath string
41+
}
42+
43+
type TraceQueueConf trace.QueueConf

entity/export.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package entity
22

33
type UploadSpan struct {
4-
StartedATMicros int64 `json:"started_at_micros"`
4+
StartedATMicros int64 `json:"started_at_micros"` // start time in microseconds
5+
LogID string `json:"log_id"` // the custom log id, identify different query.
56
SpanID string `json:"span_id"`
67
ParentID string `json:"parent_id"`
78
TraceID string `json:"trace_id"`
8-
DurationMicros int64 `json:"duration_micros"`
9-
WorkspaceID string `json:"workspace_id"`
9+
DurationMicros int64 `json:"duration_micros"` // duration in microseconds
10+
ServiceName string `json:"service_name"` // the custom service name, identify different services.
11+
WorkspaceID string `json:"workspace_id"` // cozeloop workspace id
1012
SpanName string `json:"span_name"`
1113
SpanType string `json:"span_type"`
1214
StatusCode int32 `json:"status_code"`
1315
Input string `json:"input"`
1416
Output string `json:"output"`
15-
ObjectStorage string `json:"object_storage"`
17+
ObjectStorage string `json:"object_storage"` // file object storage
1618
SystemTagsString map[string]string `json:"system_tags_string"`
1719
SystemTagsLong map[string]int64 `json:"system_tags_long"`
1820
SystemTagsDouble map[string]float64 `json:"system_tags_double"`

internal/consts/finish_event.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package consts
2+
3+
type SpanFinishEvent string
4+
5+
const (
6+
SpanFinishEventSpanQueueEntryRate SpanFinishEvent = "queue_manager.span_entry.rate"
7+
SpanFinishEventFileQueueEntryRate SpanFinishEvent = "queue_manager.file_entry.rate"
8+
9+
SpanFinishEventFlushSpanRate SpanFinishEvent = "exporter.span_flush.rate"
10+
SpanFinishEventFlushFileRate SpanFinishEvent = "exporter.file_flush.rate"
11+
)
12+
13+
type FinishEventInfo struct {
14+
EventType SpanFinishEvent
15+
IsEventFail bool
16+
ItemNum int // maybe multiple span is processed in one event
17+
DetailMsg string
18+
ExtraParams *FinishEventInfoExtra
19+
}
20+
21+
type FinishEventInfoExtra struct {
22+
IsRootSpan bool
23+
}

internal/httpclient/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ func setAuthorizationHeader(ctx context.Context, request *http.Request, auth Aut
203203
}
204204

205205
func parseResponse(ctx context.Context, url string, response *http.Response, resp OpenAPIResponse) error {
206+
defer response.Body.Close()
207+
206208
logID := response.Header.Get(consts.LogIDHeader)
207209
respBody, err := io.ReadAll(response.Body)
208210
if err != nil {

internal/trace/exporter.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ const (
3939
var _ Exporter = (*SpanExporter)(nil)
4040

4141
type SpanExporter struct {
42-
client *httpclient.Client
42+
client *httpclient.Client
43+
uploadPath UploadPath
44+
}
45+
46+
type UploadPath struct {
47+
spanUploadPath string
48+
fileUploadPath string
4349
}
4450

4551
func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFile) error {
@@ -50,14 +56,12 @@ func (e *SpanExporter) ExportFiles(ctx context.Context, files []*entity.UploadFi
5056
}
5157
logger.CtxDebugf(ctx, "uploadFile start, file name: %s", file.Name)
5258
resp := httpclient.BaseResponse{}
53-
err := e.client.UploadFile(ctx, pathUploadFile, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
59+
err := e.client.UploadFile(ctx, e.uploadPath.fileUploadPath, file.TosKey, bytes.NewReader([]byte(file.Data)), map[string]string{"workspace_id": file.SpaceID}, &resp)
5460
if err != nil {
55-
logger.CtxErrorf(ctx, "export files[%s] fail, err:[%v], retry later", file.TosKey, err)
56-
return err
61+
return consts.NewError(fmt.Sprintf("export files[%s] fail", file.TosKey)).Wrap(err)
5762
}
5863
if resp.GetCode() != 0 { // todo: some err code do not need retry
59-
logger.CtxErrorf(ctx, "export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg())
60-
return consts.ErrRemoteService
64+
return consts.NewError(fmt.Sprintf("export files[%s] fail, code:[%v], msg:[%v] retry later", file.TosKey, resp.GetCode(), resp.GetMsg()))
6165
}
6266
logger.CtxDebugf(ctx, "uploadFile end, file name: %s", file.Name)
6367
}
@@ -69,17 +73,13 @@ func (e *SpanExporter) ExportSpans(ctx context.Context, ss []*entity.UploadSpan)
6973
if len(ss) == 0 {
7074
return
7175
}
72-
logger.CtxDebugf(ctx, "export spans, spans count: %d", len(ss))
73-
7476
resp := httpclient.BaseResponse{}
75-
err = e.client.Post(ctx, pathIngestTrace, UploadSpanData{ss}, &resp)
77+
err = e.client.Post(ctx, e.uploadPath.spanUploadPath, UploadSpanData{ss}, &resp)
7678
if err != nil {
77-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], err:[%v]", len(ss), err)
78-
return err
79+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d]", len(ss))).Wrap(err)
7980
}
8081
if resp.GetCode() != 0 { // todo: some err code do not need retry
81-
logger.CtxErrorf(ctx, "export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg())
82-
return consts.ErrRemoteService
82+
return consts.NewError(fmt.Sprintf("export spans fail, span count: [%d], code:[%v], msg:[%v]", len(ss), resp.GetCode(), resp.GetMsg()))
8383
}
8484

8585
return
@@ -107,10 +107,12 @@ func transferToUploadSpanAndFile(ctx context.Context, spans []*Span) ([]*entity.
107107
systemTagStrM, systemTagLongM, systemTagDoubleM, _ := parseTag(span.SystemTagMap, true)
108108
resSpan = append(resSpan, &entity.UploadSpan{
109109
StartedATMicros: span.GetStartTime().UnixMicro(),
110+
LogID: span.GetLogID(),
110111
SpanID: span.GetSpanID(),
111112
ParentID: span.GetParentID(),
112113
TraceID: span.GetTraceID(),
113114
DurationMicros: span.GetDuration(),
115+
ServiceName: span.GetServiceName(),
114116
WorkspaceID: span.GetSpaceID(),
115117
SpanName: span.GetSpanName(),
116118
SpanType: span.GetSpanType(),

internal/trace/noop_span.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ func (n noopSpan) SetInputTokens(ctx context.Context, inputTokens int)
3434
func (n noopSpan) SetOutputTokens(ctx context.Context, outputTokens int) {}
3535
func (n noopSpan) SetStartTimeFirstResp(ctx context.Context, startTimeFirstResp int64) {}
3636
func (n noopSpan) SetRuntime(ctx context.Context, runtime tracespec.Runtime) {}
37+
func (n noopSpan) SetServiceName(ctx context.Context, serviceName string) {}
38+
func (n noopSpan) SetLogID(ctx context.Context, logID string) {}
39+
func (n noopSpan) SetFinishTime(finishTime time.Time) {}
40+
func (n noopSpan) SetSystemTags(ctx context.Context, systemTags map[string]interface{}) {}
3741

3842
// implement of Span
3943
func (n noopSpan) SetTags(ctx context.Context, tagKVs map[string]interface{}) {}

0 commit comments

Comments
 (0)