Skip to content

Commit 8362705

Browse files
committed
make start events private
1 parent 86a39f8 commit 8362705

File tree

4 files changed

+78
-90
lines changed

4 files changed

+78
-90
lines changed

errors.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@ package golangfuse
22

33
import "errors"
44

5-
var AlreadyStartedErr = errors.New("already started")
65
var AlreadyShutdownErr = errors.New("already shutdown")

langfuse.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package golangfuse
33
import (
44
"context"
55
"fmt"
6-
"net/http"
76
"sync/atomic"
87
"time"
98

@@ -13,7 +12,6 @@ import (
1312
)
1413

1514
type Langfuse interface {
16-
StartSendingEvents(ctx context.Context, period time.Duration) error
1715
// Shutdown stops the event buffer and cleans up resources.
1816
// It also ensures that any pending events are sent before shutdown.
1917
Shutdown(ctx context.Context) error
@@ -22,52 +20,46 @@ type Langfuse interface {
2220
}
2321

2422
type langfuseImpl struct {
25-
restClient *resty.Client
26-
eventBuffer *eventBuffer
27-
isSendingEventsStarted atomic.Bool
28-
isShutdown atomic.Bool
29-
cancelFunc context.CancelFunc
30-
endpoint string
31-
promptLabel string
23+
restClient *resty.Client
24+
eventBuffer *eventBuffer
25+
isShutdown atomic.Bool
26+
cancelFunc context.CancelFunc
27+
endpoint string
28+
promptLabel string
29+
flushPeriod time.Duration
3230
}
3331

34-
func New(endpoint, publicKey, secretKey string) Langfuse {
35-
return NewWithHttpClient(http.DefaultClient, endpoint, publicKey, secretKey)
36-
}
37-
38-
func NewWithHttpClient(httpClient *http.Client, endpoint, publicKey, secretKey string) Langfuse {
39-
client := resty.NewWithClient(httpClient).SetBasicAuth(publicKey, secretKey)
32+
func New(
33+
ctx context.Context,
34+
endpoint,
35+
publicKey,
36+
secretKey string,
37+
opts ...Option,
38+
) Langfuse {
4039
c := &langfuseImpl{
41-
restClient: client,
40+
restClient: resty.New(),
4241
endpoint: endpoint,
43-
promptLabel: "production", // TODO: use option pattern to override this default if needed
42+
promptLabel: "production",
43+
flushPeriod: 1 * time.Second,
44+
}
45+
for _, opt := range opts {
46+
opt(c)
4447
}
48+
c.restClient = c.restClient.SetBasicAuth(publicKey, secretKey)
4549
c.eventBuffer = newEventBufferer(c.sendEvents)
50+
c.startSendingEvents(ctx, c.flushPeriod)
4651
return c
4752
}
4853

49-
func (c *langfuseImpl) StartSendingEvents(ctx context.Context, period time.Duration) error {
50-
if c.isShutdown.Load() {
51-
return AlreadyShutdownErr
52-
}
53-
if c.isSendingEventsStarted.CompareAndSwap(false, true) {
54-
ctx, cancel := context.WithCancel(ctx)
55-
c.cancelFunc = cancel
56-
go c.eventBuffer.Start(ctx, period)
57-
return nil
58-
} else {
59-
return AlreadyStartedErr
60-
}
54+
func (c *langfuseImpl) startSendingEvents(ctx context.Context, period time.Duration) {
55+
ctx, cancel := context.WithCancel(ctx)
56+
c.cancelFunc = cancel
57+
go c.eventBuffer.Start(ctx, period)
6158
}
6259

6360
func (c *langfuseImpl) Shutdown(ctx context.Context) error {
6461
// Check if already shutdown
6562
if c.isShutdown.CompareAndSwap(false, true) {
66-
// Check if events were started
67-
if !c.isSendingEventsStarted.Load() {
68-
return nil
69-
}
70-
7163
// Cancel the event buffer goroutine
7264
c.cancelFunc()
7365

langfuse_test.go

Lines changed: 21 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,8 @@ func (s *ClientTest) getLangfuseClientForTest(promptName, promptContent string)
6767
"commitMessage" : null,
6868
"resolutionGraph" : null
6969
}`, promptContent, promptName)
70-
return golangfuse.NewWithHttpClient(
71-
httpmock.NewMockClient(http.StatusOK, apiResponse),
72-
"https://langfuse3.data.divar.cloud", "pk", "sk")
70+
return golangfuse.New(s.ctx, "https://langfuse3.data.divar.cloud", "pk", "sk",
71+
golangfuse.WithHTTPClient(httpmock.NewMockClient(http.StatusOK, apiResponse)))
7372
}
7473

7574
func (s *ClientTest) TestShouldSetBasicAuth() {
@@ -91,19 +90,6 @@ func (s *ClientTest) TestShouldSetBasicAuth() {
9190
)
9291
}
9392

94-
func (s *ClientTest) TestStartSendingShouldReturnErrorIfAlreadyStarted() {
95-
// Given
96-
c := s.getClient()
97-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
98-
s.Require().NoError(err)
99-
100-
// When
101-
err = c.StartSendingEvents(s.ctx, 1*time.Microsecond)
102-
103-
// Then
104-
s.Require().ErrorContains(err, "already started")
105-
}
106-
10793
func (s *ClientTest) TestShouldSendEvent() {
10894
// Given
10995
var sentRequestBody []byte
@@ -118,8 +104,6 @@ func (s *ClientTest) TestShouldSendEvent() {
118104
sentRequestBody = body
119105
return &http.Response{}, nil
120106
})
121-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
122-
s.Require().NoError(err)
123107

124108
// When
125109
c.Trace("input", "output")
@@ -135,7 +119,7 @@ func (s *ClientTest) TestShouldSendEvent() {
135119
} `json:"batch"`
136120
}
137121
bodyObj := requestBody{}
138-
err = json.Unmarshal(sentRequestBody, &bodyObj)
122+
err := json.Unmarshal(sentRequestBody, &bodyObj)
139123
s.Require().NoError(err)
140124
s.Require().Len(bodyObj.Batch, 1)
141125
s.Require().Equal("input", bodyObj.Batch[0].Body.Input)
@@ -145,15 +129,13 @@ func (s *ClientTest) TestShouldSendEvent() {
145129
func (s *ClientTest) TestShouldNotSendAnythingWhenNoEventIsReported() {
146130
// Given
147131
httpCallHappened := false
148-
c := s.getClientWithMockedHttpTransport(func(req *http.Request) (*http.Response, error) {
132+
s.getClientWithMockedHttpTransport(func(req *http.Request) (*http.Response, error) {
149133
httpCallHappened = true
150134
return &http.Response{}, nil
151135
})
152136

153137
// When
154-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
155-
s.Require().NoError(err)
156-
time.Sleep(1 * time.Millisecond)
138+
time.Sleep(2 * time.Millisecond)
157139

158140
// Then
159141
s.Require().False(httpCallHappened, "http call happened, unexpectedly")
@@ -173,8 +155,6 @@ func (s *ClientTest) TestShouldSendEventsInBatch() {
173155
sentRequestBody = body
174156
return &http.Response{}, nil
175157
})
176-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
177-
s.Require().NoError(err)
178158

179159
// When
180160
c.Trace("input", "output")
@@ -186,7 +166,7 @@ func (s *ClientTest) TestShouldSendEventsInBatch() {
186166
Batch []struct{} `json:"batch"`
187167
}
188168
bodyObj := requestBody{}
189-
err = json.Unmarshal(sentRequestBody, &bodyObj)
169+
err := json.Unmarshal(sentRequestBody, &bodyObj)
190170
s.Require().NoError(err)
191171
s.Require().Len(bodyObj.Batch, 2)
192172
}
@@ -200,21 +180,24 @@ func (s *ClientTest) getClient() golangfuse.Langfuse {
200180
})
201181
}
202182

203-
func (s *ClientTest) getClientWithMockedHttpTransport(transport httpmock.RoundTripFunc) golangfuse.Langfuse {
204-
return golangfuse.NewWithHttpClient(
205-
&http.Client{Transport: transport},
183+
func (s *ClientTest) getClientWithMockedHttpTransport(
184+
transport httpmock.RoundTripFunc,
185+
opts ...golangfuse.Option,
186+
) golangfuse.Langfuse {
187+
opts = append(opts, golangfuse.WithHTTPClient(&http.Client{Transport: transport}))
188+
return golangfuse.New(
189+
s.ctx,
206190
"https://test.com",
207191
"test-pk",
208192
"test-sk",
193+
opts...,
209194
)
210195
}
211196

212197
func (s *ClientTest) TestShutdownShouldReturnErrorWhenAlreadyShutdown() {
213198
// Given
214199
c := s.getClient()
215-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
216-
s.Require().NoError(err)
217-
err = c.Shutdown(s.ctx)
200+
err := c.Shutdown(s.ctx)
218201
s.Require().NoError(err)
219202

220203
// When
@@ -224,14 +207,12 @@ func (s *ClientTest) TestShutdownShouldReturnErrorWhenAlreadyShutdown() {
224207
s.Require().ErrorContains(err, "already shutdown")
225208
}
226209

227-
func (s *ClientTest) TestShutdownShouldSucceedAfterStart() {
210+
func (s *ClientTest) TestShutdownShouldSucceedInHappyPath() {
228211
// Given
229212
c := s.getClient()
230-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
231-
s.Require().NoError(err)
232213

233214
// When
234-
err = c.Shutdown(s.ctx)
215+
err := c.Shutdown(s.ctx)
235216

236217
// Then
237218
s.Require().NoError(err)
@@ -253,13 +234,13 @@ func (s *ClientTest) TestShutdownShouldFlushPendingEvents() {
253234
StatusCode: http.StatusMultiStatus,
254235
Body: io.NopCloser(strings.NewReader("{}")),
255236
}, nil
256-
})
257-
err := c.StartSendingEvents(s.ctx, 1*time.Hour) // Long period to prevent automatic Flush
258-
s.Require().NoError(err)
237+
},
238+
golangfuse.WithFlushPeriod(1*time.Hour), // Long period to prevent automatic Flush
239+
)
259240

260241
// When
261242
c.Trace("input", "output")
262-
err = c.Shutdown(s.ctx)
243+
err := c.Shutdown(s.ctx)
263244
s.Require().NoError(err)
264245
wg.Wait()
265246

@@ -280,21 +261,6 @@ func (s *ClientTest) TestShutdownShouldFlushPendingEvents() {
280261
s.Require().Equal("output", bodyObj.Batch[0].Body.Output)
281262
}
282263

283-
func (s *ClientTest) TestStartSendingEventsShouldReturnErrorAfterShutdown() {
284-
// Given
285-
c := s.getClient()
286-
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
287-
s.Require().NoError(err)
288-
err = c.Shutdown(s.ctx)
289-
s.Require().NoError(err)
290-
291-
// When
292-
err = c.StartSendingEvents(s.ctx, 1*time.Microsecond)
293-
294-
// Then
295-
s.Require().ErrorContains(err, "already shutdown")
296-
}
297-
298264
func TestLangfuseClient(t *testing.T) {
299265
suite.Run(t, new(ClientTest))
300266
}

options.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package golangfuse
2+
3+
import (
4+
"net/http"
5+
"time"
6+
7+
"resty.dev/v3"
8+
)
9+
10+
type Option func(l *langfuseImpl)
11+
12+
func WithDefaultPromptLabel(label string) Option {
13+
return func(l *langfuseImpl) {
14+
l.promptLabel = label
15+
}
16+
}
17+
18+
func WithHTTPClient(httpClient *http.Client) Option {
19+
return func(l *langfuseImpl) {
20+
l.restClient = resty.NewWithClient(httpClient)
21+
}
22+
}
23+
24+
func WithFlushPeriod(period time.Duration) Option {
25+
return func(l *langfuseImpl) {
26+
if period <= 0 {
27+
period = 1 * time.Second // Default to 1 second if invalid period is provided
28+
}
29+
l.flushPeriod = period
30+
}
31+
}

0 commit comments

Comments
 (0)