Skip to content

Commit 6f0bc96

Browse files
committed
add shutdown
1 parent 9633942 commit 6f0bc96

File tree

4 files changed

+130
-2
lines changed

4 files changed

+130
-2
lines changed

errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ package golangfuse
33
import "errors"
44

55
var AlreadyStartedErr = errors.New("already started")
6+
var AlreadyShutdownErr = errors.New("already shutdown")

event_buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (i *eventBuffer) Start(ctx context.Context, period time.Duration) {
2626
for {
2727
select {
2828
case <-ticker.C:
29-
i.flush(ctx)
29+
i.Flush(ctx)
3030
case <-ctx.Done():
3131
return
3232
}
@@ -39,7 +39,7 @@ func (i *eventBuffer) Add(event IngestionEvent) {
3939
i.bufferedEvents = append(i.bufferedEvents, event)
4040
}
4141

42-
func (i *eventBuffer) flush(ctx context.Context) {
42+
func (i *eventBuffer) Flush(ctx context.Context) {
4343
i.mu.Lock()
4444
items := i.bufferedEvents
4545
i.bufferedEvents = nil

langfuse.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414

1515
type Langfuse interface {
1616
StartSendingEvents(ctx context.Context, period time.Duration) error
17+
// Shutdown stops the event buffer and cleans up resources.
18+
// It also ensures that any pending events are sent before shutdown.
19+
Shutdown(ctx context.Context) error
1720
Trace(input, output any, options ...TraceOption)
1821
GetSystemPromptTemplate(ctx context.Context, promptName string) (string, error)
1922
}
@@ -22,6 +25,8 @@ type langfuseImpl struct {
2225
restClient *resty.Client
2326
eventBuffer *eventBuffer
2427
isSendingEventsStarted atomic.Bool
28+
isShutdown atomic.Bool
29+
cancelFunc context.CancelFunc
2530
endpoint string
2631
promptLabel string
2732
}
@@ -42,14 +47,39 @@ func NewWithHttpClient(httpClient *http.Client, endpoint, publicKey, secretKey s
4247
}
4348

4449
func (c *langfuseImpl) StartSendingEvents(ctx context.Context, period time.Duration) error {
50+
if c.isShutdown.Load() {
51+
return AlreadyShutdownErr
52+
}
4553
if c.isSendingEventsStarted.CompareAndSwap(false, true) {
54+
ctx, cancel := context.WithCancel(ctx)
55+
c.cancelFunc = cancel
4656
go c.eventBuffer.Start(ctx, period)
4757
return nil
4858
} else {
4959
return AlreadyStartedErr
5060
}
5161
}
5262

63+
func (c *langfuseImpl) Shutdown(ctx context.Context) error {
64+
// Check if already shutdown
65+
if c.isShutdown.CompareAndSwap(false, true) {
66+
// Check if events were started
67+
if !c.isSendingEventsStarted.Load() {
68+
return nil
69+
}
70+
71+
// Cancel the event buffer goroutine
72+
c.cancelFunc()
73+
74+
// Flush any remaining events before shutdown
75+
c.eventBuffer.Flush(ctx)
76+
77+
return nil
78+
} else {
79+
return AlreadyShutdownErr
80+
}
81+
}
82+
5383
func (c *langfuseImpl) GetSystemPromptTemplate(ctx context.Context, promptName string) (string, error) {
5484
promptObject := ChatPrompt{}
5585
resp, err := c.restClient.R().

langfuse_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,103 @@ func (s *ClientTest) getClientWithMockedHttpTransport(transport httpmock.RoundTr
209209
)
210210
}
211211

212+
func (s *ClientTest) TestShutdownShouldReturnErrorWhenNotStarted() {
213+
// Given
214+
c := s.getClient()
215+
216+
// When
217+
err := c.Shutdown(s.ctx)
218+
219+
// Then
220+
s.Require().ErrorContains(err, "not started")
221+
}
222+
223+
func (s *ClientTest) TestShutdownShouldReturnErrorWhenAlreadyShutdown() {
224+
// Given
225+
c := s.getClient()
226+
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
227+
s.Require().NoError(err)
228+
err = c.Shutdown(s.ctx)
229+
s.Require().NoError(err)
230+
231+
// When
232+
err = c.Shutdown(s.ctx)
233+
234+
// Then
235+
s.Require().ErrorContains(err, "already shutdown")
236+
}
237+
238+
func (s *ClientTest) TestShutdownShouldSucceedAfterStart() {
239+
// Given
240+
c := s.getClient()
241+
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
242+
s.Require().NoError(err)
243+
244+
// When
245+
err = c.Shutdown(s.ctx)
246+
247+
// Then
248+
s.Require().NoError(err)
249+
}
250+
251+
func (s *ClientTest) TestShutdownShouldFlushPendingEvents() {
252+
// Given
253+
var sentRequestBody []byte
254+
wg := &sync.WaitGroup{}
255+
wg.Add(1)
256+
c := s.getClientWithMockedHttpTransport(func(req *http.Request) (*http.Response, error) {
257+
defer wg.Done()
258+
body, err := io.ReadAll(req.Body)
259+
if err != nil {
260+
return nil, err
261+
}
262+
sentRequestBody = body
263+
return &http.Response{
264+
StatusCode: http.StatusMultiStatus,
265+
Body: io.NopCloser(strings.NewReader("{}")),
266+
}, nil
267+
})
268+
err := c.StartSendingEvents(s.ctx, 1*time.Hour) // Long period to prevent automatic Flush
269+
s.Require().NoError(err)
270+
271+
// When
272+
c.Trace("input", "output")
273+
err = c.Shutdown(s.ctx)
274+
s.Require().NoError(err)
275+
wg.Wait()
276+
277+
// Then
278+
type requestBody struct {
279+
Batch []struct {
280+
Body struct {
281+
Input string `json:"input"`
282+
Output string `json:"output"`
283+
}
284+
} `json:"batch"`
285+
}
286+
bodyObj := requestBody{}
287+
err = json.Unmarshal(sentRequestBody, &bodyObj)
288+
s.Require().NoError(err)
289+
s.Require().Len(bodyObj.Batch, 1)
290+
s.Require().Equal("input", bodyObj.Batch[0].Body.Input)
291+
s.Require().Equal("output", bodyObj.Batch[0].Body.Output)
292+
}
293+
294+
func (s *ClientTest) TestStartSendingEventsShouldReturnErrorAfterShutdown() {
295+
// Given
296+
c := s.getClient()
297+
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
298+
s.Require().NoError(err)
299+
err = c.Shutdown(s.ctx)
300+
s.Require().NoError(err)
301+
302+
// When
303+
err = c.StartSendingEvents(s.ctx, 1*time.Microsecond)
304+
305+
// Then
306+
s.Require().ErrorContains(err, "already shutdown")
307+
}
308+
212309
func TestLangfuseClient(t *testing.T) {
213310
suite.Run(t, new(ClientTest))
214311
}

0 commit comments

Comments
 (0)