@@ -294,9 +294,44 @@ func (bifrost *Bifrost) TranscriptionStreamRequest(ctx context.Context, req *sch
294294 return bifrost .handleStreamRequest (ctx , req , schemas .TranscriptionStreamRequest )
295295}
296296
297+ // RemovePlugin removes a plugin from the server.
298+ func (bifrost * Bifrost ) RemovePlugin (name string ) error {
299+ var pluginToCleanup schemas.Plugin
300+ for {
301+ oldPlugins := bifrost .plugins .Load ()
302+ if oldPlugins == nil {
303+ return nil
304+ }
305+ // Create new slice with replaced plugin
306+ newPlugins := make ([]schemas.Plugin , len (* oldPlugins ))
307+ copy (newPlugins , * oldPlugins )
308+ for i , p := range newPlugins {
309+ if p .GetName () == name {
310+ pluginToCleanup = p
311+ bifrost .logger .Debug ("removing plugin %s" , name )
312+ newPlugins = append (newPlugins [:i ], newPlugins [i + 1 :]... )
313+ break
314+ }
315+ }
316+ // Atomic compare-and-swap
317+ if bifrost .plugins .CompareAndSwap (oldPlugins , & newPlugins ) {
318+ // Cleanup the old plugin
319+ if pluginToCleanup != nil {
320+ err := pluginToCleanup .Cleanup ()
321+ if err != nil {
322+ bifrost .logger .Warn ("failed to cleanup old plugin %s: %v" , pluginToCleanup .GetName (), err )
323+ }
324+ }
325+ return nil
326+ }
327+ // Retrying as swapping did not work
328+ }
329+ }
330+
297331// ReloadPlugin reloads a plugin with new instance
298332// During the reload - it's stop the world phase where we take a global lock on the plugin mutex
299- func (bifrost * Bifrost ) ReloadPlugin (plugin schemas.Plugin ) error {
333+ func (bifrost * Bifrost ) ReloadPlugin (plugin schemas.Plugin ) error {
334+ var pluginToCleanup schemas.Plugin
300335 for {
301336 oldPlugins := bifrost .plugins .Load ()
302337 if oldPlugins == nil {
@@ -308,22 +343,32 @@ func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
308343 found := false
309344 for i , p := range newPlugins {
310345 if p .GetName () == plugin .GetName () {
346+ // Cleaning up old plugin before replacing it
347+ pluginToCleanup = p
348+ bifrost .logger .Debug ("replacing plugin %s with new instance" , plugin .GetName ())
311349 newPlugins [i ] = plugin
312350 found = true
313351 break
314352 }
315353 }
316- if ! found {
354+ if ! found {
317355 // This means that user is adding a new plugin
356+ bifrost .logger .Debug ("adding new plugin %s" , plugin .GetName ())
318357 newPlugins = append (newPlugins , plugin )
319358 }
320359 // Atomic compare-and-swap
321360 if bifrost .plugins .CompareAndSwap (oldPlugins , & newPlugins ) {
361+ // Cleanup the old plugin
362+ if pluginToCleanup != nil {
363+ err := pluginToCleanup .Cleanup ()
364+ if err != nil {
365+ bifrost .logger .Warn ("failed to cleanup old plugin %s: %v" , pluginToCleanup .GetName (), err )
366+ }
367+ }
322368 return nil
323369 }
324370 // Retrying as swapping did not work
325371 }
326-
327372}
328373
329374// UpdateProviderConcurrency dynamically updates the queue size and concurrency for an existing provider.
@@ -1023,7 +1068,7 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
10231068
10241069 msg := bifrost .getChannelMessage (* preReq , requestType )
10251070 msg .Context = ctx
1026-
1071+ startTime := time . Now ()
10271072 select {
10281073 case queue <- * msg :
10291074 // Message was sent successfully
@@ -1049,6 +1094,10 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
10491094 var resp * schemas.BifrostResponse
10501095 select {
10511096 case result = <- msg .Response :
1097+ latency := time .Since (startTime ).Milliseconds ()
1098+ if result .ExtraFields .Latency == nil {
1099+ result .ExtraFields .Latency = & latency
1100+ }
10521101 resp , bifrostErr := pipeline .RunPostHooks (& ctx , result , nil , len (* bifrost .plugins .Load ()))
10531102 if bifrostErr != nil {
10541103 bifrost .releaseChannelMessage (msg )
@@ -1172,7 +1221,7 @@ func (bifrost *Bifrost) tryStreamRequest(req *schemas.BifrostRequest, ctx contex
11721221 // Marking final chunk
11731222 ctx = context .WithValue (ctx , schemas .BifrostContextKeyStreamEndIndicator , true )
11741223 // On error we will complete post-hooks
1175- recoveredResp , recoveredErr := pipeline .RunPostHooks (& ctx , nil , & bifrostErrVal , len (bifrost .plugins ))
1224+ recoveredResp , recoveredErr := pipeline .RunPostHooks (& ctx , nil , & bifrostErrVal , len (* bifrost .plugins . Load () ))
11761225 bifrost .releaseChannelMessage (msg )
11771226 if recoveredErr != nil {
11781227 return nil , recoveredErr
@@ -1332,7 +1381,7 @@ func handleProviderRequest(provider schemas.Provider, req *ChannelMessage, key s
13321381 case schemas .TextCompletionRequest :
13331382 return provider .TextCompletion (req .Context , req .Model , key , * req .Input .TextCompletionInput , req .Params )
13341383 case schemas .ChatCompletionRequest :
1335- return provider .ChatCompletion (req .Context , req .Model , key , * req .Input .ChatCompletionInput , req .Params )
1384+ return provider .ChatCompletion (req .Context , req .Model , key , req .Input .ChatCompletionInput , req .Params )
13361385 case schemas .EmbeddingRequest :
13371386 return provider .Embedding (req .Context , req .Model , key , req .Input .EmbeddingInput , req .Params )
13381387 case schemas .SpeechRequest :
@@ -1353,7 +1402,7 @@ func handleProviderRequest(provider schemas.Provider, req *ChannelMessage, key s
13531402func handleProviderStreamRequest (provider schemas.Provider , req * ChannelMessage , key schemas.Key , postHookRunner schemas.PostHookRunner , reqType schemas.RequestType ) (chan * schemas.BifrostStream , * schemas.BifrostError ) {
13541403 switch reqType {
13551404 case schemas .ChatCompletionStreamRequest :
1356- return provider .ChatCompletionStream (req .Context , postHookRunner , req .Model , key , * req .Input .ChatCompletionInput , req .Params )
1405+ return provider .ChatCompletionStream (req .Context , postHookRunner , req .Model , key , req .Input .ChatCompletionInput , req .Params )
13571406 case schemas .SpeechStreamRequest :
13581407 return provider .SpeechStream (req .Context , postHookRunner , req .Model , key , req .Input .SpeechInput , req .Params )
13591408 case schemas .TranscriptionStreamRequest :
@@ -1375,6 +1424,7 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
13751424 var shortCircuit * schemas.PluginShortCircuit
13761425 var err error
13771426 for i , plugin := range p .plugins {
1427+ p .logger .Debug ("running pre-hook for plugin %s" , plugin .GetName ())
13781428 req , shortCircuit , err = plugin .PreHook (ctx , req )
13791429 if err != nil {
13801430 p .preHookErrors = append (p .preHookErrors , err )
@@ -1391,17 +1441,19 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
13911441// RunPostHooks executes PostHooks in reverse order for the plugins whose PreHook ran.
13921442// Accepts the response and error, and allows plugins to transform either (e.g., recover from error, or invalidate a response).
13931443// Returns the final response and error after all hooks. If both are set, error takes precedence unless error is nil.
1394- func (p * PluginPipeline ) RunPostHooks (ctx * context.Context , resp * schemas.BifrostResponse , bifrostErr * schemas.BifrostError , count int ) (* schemas.BifrostResponse , * schemas.BifrostError ) {
1444+ // runFrom is the count of plugins whose PreHooks ran; PostHooks will run in reverse from index (runFrom - 1) down to 0
1445+ func (p * PluginPipeline ) RunPostHooks (ctx * context.Context , resp * schemas.BifrostResponse , bifrostErr * schemas.BifrostError , runFrom int ) (* schemas.BifrostResponse , * schemas.BifrostError ) {
13951446 // Defensive: ensure count is within valid bounds
1396- if count < 0 {
1397- count = 0
1447+ if runFrom < 0 {
1448+ runFrom = 0
13981449 }
1399- if count > len (p .plugins ) {
1400- count = len (p .plugins )
1450+ if runFrom > len (p .plugins ) {
1451+ runFrom = len (p .plugins )
14011452 }
14021453 var err error
1403- for i := count - 1 ; i >= 0 ; i -- {
1454+ for i := runFrom - 1 ; i >= 0 ; i -- {
14041455 plugin := p .plugins [i ]
1456+ p .logger .Debug ("running post-hook for plugin %s" , plugin .GetName ())
14051457 resp , bifrostErr , err = plugin .PostHook (ctx , resp , bifrostErr )
14061458 if err != nil {
14071459 p .postHookErrors = append (p .postHookErrors , err )
@@ -1618,4 +1670,5 @@ func (bifrost *Bifrost) Shutdown() {
16181670 bifrost .logger .Warn (fmt .Sprintf ("Error cleaning up plugin: %s" , err .Error ()))
16191671 }
16201672 }
1673+ bifrost .logger .Info ("all request channels closed" )
16211674}
0 commit comments