Skip to content

Commit 2aef6ff

Browse files
Update middlewares.go
1 parent d03c528 commit 2aef6ff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+3046
-1593
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ help: ## Show this help message
3737
cleanup-enterprise: ## Clean up enterprise directories if present
3838
@echo "$(GREEN)Cleaning up enterprise...$(NC)"
3939
@if [ -d "ui/app/enterprise" ]; then rm -rf ui/app/enterprise; fi
40-
@if [ -d "ui/app/enterprise" ]; then rm -rf ui/app/enterprise; fi
4140
@echo "$(GREEN)Enterprise cleaned up$(NC)"
4241

4342
install-ui: cleanup-enterprise

core/bifrost.go

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,44 @@ func (bifrost *Bifrost) TranscriptionStreamRequest(ctx context.Context, req *sch
294294
return bifrost.handleStreamRequest(ctx, req, schemas.TranscriptionStreamRequest)
295295
}
296296

297+
// RemovePlugin removes a plugin from the server.
298+
func (bifrost *Bifrost) RemovePlugin(name string) error {
299+
var pluginToCleanup schemas.Plugin
300+
for {
301+
oldPlugins := bifrost.plugins.Load()
302+
if oldPlugins == nil {
303+
return nil
304+
}
305+
// Create new slice with replaced plugin
306+
newPlugins := make([]schemas.Plugin, len(*oldPlugins))
307+
copy(newPlugins, *oldPlugins)
308+
for i, p := range newPlugins {
309+
if p.GetName() == name {
310+
pluginToCleanup = p
311+
bifrost.logger.Debug("removing plugin %s", name)
312+
newPlugins = append(newPlugins[:i], newPlugins[i+1:]...)
313+
break
314+
}
315+
}
316+
// Atomic compare-and-swap
317+
if bifrost.plugins.CompareAndSwap(oldPlugins, &newPlugins) {
318+
// Cleanup the old plugin
319+
if pluginToCleanup != nil {
320+
err := pluginToCleanup.Cleanup()
321+
if err != nil {
322+
bifrost.logger.Warn("failed to cleanup old plugin %s: %v", pluginToCleanup.GetName(), err)
323+
}
324+
}
325+
return nil
326+
}
327+
// Retrying as swapping did not work
328+
}
329+
}
330+
297331
// ReloadPlugin reloads a plugin with new instance
298332
// During the reload - it's stop the world phase where we take a global lock on the plugin mutex
299-
func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
333+
func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
334+
var pluginToCleanup schemas.Plugin
300335
for {
301336
oldPlugins := bifrost.plugins.Load()
302337
if oldPlugins == nil {
@@ -308,22 +343,32 @@ func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
308343
found := false
309344
for i, p := range newPlugins {
310345
if p.GetName() == plugin.GetName() {
346+
// Cleaning up old plugin before replacing it
347+
pluginToCleanup = p
348+
bifrost.logger.Debug("replacing plugin %s with new instance", plugin.GetName())
311349
newPlugins[i] = plugin
312350
found = true
313351
break
314352
}
315353
}
316-
if !found{
354+
if !found {
317355
// This means that user is adding a new plugin
356+
bifrost.logger.Debug("adding new plugin %s", plugin.GetName())
318357
newPlugins = append(newPlugins, plugin)
319358
}
320359
// Atomic compare-and-swap
321360
if bifrost.plugins.CompareAndSwap(oldPlugins, &newPlugins) {
361+
// Cleanup the old plugin
362+
if pluginToCleanup != nil {
363+
err := pluginToCleanup.Cleanup()
364+
if err != nil {
365+
bifrost.logger.Warn("failed to cleanup old plugin %s: %v", pluginToCleanup.GetName(), err)
366+
}
367+
}
322368
return nil
323369
}
324370
// Retrying as swapping did not work
325371
}
326-
327372
}
328373

329374
// UpdateProviderConcurrency dynamically updates the queue size and concurrency for an existing provider.
@@ -1023,7 +1068,7 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
10231068

10241069
msg := bifrost.getChannelMessage(*preReq, requestType)
10251070
msg.Context = ctx
1026-
1071+
startTime := time.Now()
10271072
select {
10281073
case queue <- *msg:
10291074
// Message was sent successfully
@@ -1049,6 +1094,10 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
10491094
var resp *schemas.BifrostResponse
10501095
select {
10511096
case result = <-msg.Response:
1097+
latency := time.Since(startTime).Milliseconds()
1098+
if result.ExtraFields.Latency == nil {
1099+
result.ExtraFields.Latency = &latency
1100+
}
10521101
resp, bifrostErr := pipeline.RunPostHooks(&ctx, result, nil, len(*bifrost.plugins.Load()))
10531102
if bifrostErr != nil {
10541103
bifrost.releaseChannelMessage(msg)
@@ -1172,7 +1221,7 @@ func (bifrost *Bifrost) tryStreamRequest(req *schemas.BifrostRequest, ctx contex
11721221
// Marking final chunk
11731222
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
11741223
// On error we will complete post-hooks
1175-
recoveredResp, recoveredErr := pipeline.RunPostHooks(&ctx, nil, &bifrostErrVal, len(bifrost.plugins))
1224+
recoveredResp, recoveredErr := pipeline.RunPostHooks(&ctx, nil, &bifrostErrVal, len(*bifrost.plugins.Load()))
11761225
bifrost.releaseChannelMessage(msg)
11771226
if recoveredErr != nil {
11781227
return nil, recoveredErr
@@ -1375,6 +1424,7 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
13751424
var shortCircuit *schemas.PluginShortCircuit
13761425
var err error
13771426
for i, plugin := range p.plugins {
1427+
p.logger.Debug("running pre-hook for plugin %s", plugin.GetName())
13781428
req, shortCircuit, err = plugin.PreHook(ctx, req)
13791429
if err != nil {
13801430
p.preHookErrors = append(p.preHookErrors, err)
@@ -1391,17 +1441,19 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
13911441
// RunPostHooks executes PostHooks in reverse order for the plugins whose PreHook ran.
13921442
// Accepts the response and error, and allows plugins to transform either (e.g., recover from error, or invalidate a response).
13931443
// Returns the final response and error after all hooks. If both are set, error takes precedence unless error is nil.
1394-
func (p *PluginPipeline) RunPostHooks(ctx *context.Context, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError, count int) (*schemas.BifrostResponse, *schemas.BifrostError) {
1444+
// runFrom is the count of plugins whose PreHooks ran; PostHooks will run in reverse from index (runFrom - 1) down to 0
1445+
func (p *PluginPipeline) RunPostHooks(ctx *context.Context, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError, runFrom int) (*schemas.BifrostResponse, *schemas.BifrostError) {
13951446
// Defensive: ensure count is within valid bounds
1396-
if count < 0 {
1397-
count = 0
1447+
if runFrom < 0 {
1448+
runFrom = 0
13981449
}
1399-
if count > len(p.plugins) {
1400-
count = len(p.plugins)
1450+
if runFrom > len(p.plugins) {
1451+
runFrom = len(p.plugins)
14011452
}
14021453
var err error
1403-
for i := count - 1; i >= 0; i-- {
1454+
for i := runFrom - 1; i >= 0; i-- {
14041455
plugin := p.plugins[i]
1456+
p.logger.Debug("running post-hook for plugin %s", plugin.GetName())
14051457
resp, bifrostErr, err = plugin.PostHook(ctx, resp, bifrostErr)
14061458
if err != nil {
14071459
p.postHookErrors = append(p.postHookErrors, err)
@@ -1618,4 +1670,5 @@ func (bifrost *Bifrost) Shutdown() {
16181670
bifrost.logger.Warn(fmt.Sprintf("Error cleaning up plugin: %s", err.Error()))
16191671
}
16201672
}
1673+
bifrost.logger.Info("all request channels closed")
16211674
}

core/changelog.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
<!-- The pattern we follow here is to keep the changelog for the latest version -->
22
<!-- Old changelogs are automatically attached to the GitHub releases -->
33

4-
- Fix: Updates token calculation for streaming responses. #520
4+
- Feature: Adds dynamic reloads for plugins. This removes requirement of restarts on updating plugins.

core/schemas/bifrost.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ type BifrostContextKey string
100100

101101
// BifrostContextKeyRequestType is a context key for the request type.
102102
const (
103+
BifrostContextKeyRequestID BifrostContextKey = "request-id"
104+
BifrostContextKeyVirtualKeyHeader BifrostContextKey = "x-bf-vk"
103105
BifrostContextKeyDirectKey BifrostContextKey = "bifrost-direct-key"
104106
BifrostContextKeyStreamEndIndicator BifrostContextKey = "bifrost-stream-end-indicator"
105107
BifrostContextKeyRequestType BifrostContextKey = "bifrost-request-type"
@@ -792,7 +794,7 @@ type TranscriptionUsage struct {
792794
type BifrostResponseExtraFields struct {
793795
Provider ModelProvider `json:"provider"`
794796
Params ModelParameters `json:"model_params"`
795-
Latency *int64 `json:"latency,omitempty"`
797+
Latency *int64 `json:"latency,omitempty"`
796798
ChatHistory *[]BifrostMessage `json:"chat_history,omitempty"`
797799
BilledUsage *BilledLLMUsage `json:"billed_usage,omitempty"`
798800
ChunkIndex int `json:"chunk_index"` // used for streaming responses to identify the chunk index, will be 0 for non-streaming responses

core/version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.38
1+
1.1.39

framework/changelog.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<!-- The pattern we follow here is to keep the changelog for the latest version -->
22
<!-- Old changelogs are automatically attached to the GitHub releases -->
33

4-
- upgrade: core upgrades to 1.1.38
4+
- Chore: Adds ctx to each function to gracefully shutdown ongoing tasks and bring better concurrency management
5+
- Fix: Fixes pricing sync to make sure latest updates are synced at every restart.

framework/configstore/migrations.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,33 @@
11
package configstore
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/maximhq/bifrost/framework/configstore/internal/migration"
78
"gorm.io/gorm"
89
)
910

1011
// Migrate performs the necessary database migrations.
11-
func triggerMigrations(db *gorm.DB) error {
12-
if err := migrationInit(db); err != nil {
12+
func triggerMigrations(ctx context.Context, db *gorm.DB) error {
13+
if err := migrationInit(ctx, db); err != nil {
1314
return err
1415
}
15-
if err := migrationMany2ManyJoinTable(db); err != nil {
16+
if err := migrationMany2ManyJoinTable(ctx, db); err != nil {
1617
return err
1718
}
18-
if err := migrationAddCustomProviderConfigJSONColumn(db); err != nil {
19+
if err := migrationAddCustomProviderConfigJSONColumn(ctx, db); err != nil {
1920
return err
2021
}
2122
return nil
2223
}
2324

2425
// migrationInit is the first migration
25-
func migrationInit(db *gorm.DB) error {
26+
func migrationInit(ctx context.Context, db *gorm.DB) error {
2627
m := migration.New(db, migration.DefaultOptions, []*migration.Migration{{
2728
ID: "init",
2829
Migrate: func(tx *gorm.DB) error {
30+
tx = tx.WithContext(ctx)
2931
migrator := tx.Migrator()
3032
if !migrator.HasTable(&TableConfigHash{}) {
3133
if err := migrator.CreateTable(&TableConfigHash{}); err != nil {
@@ -120,6 +122,7 @@ func migrationInit(db *gorm.DB) error {
120122
return nil
121123
},
122124
Rollback: func(tx *gorm.DB) error {
125+
tx = tx.WithContext(ctx)
123126
migrator := tx.Migrator()
124127
// Drop children first, then parents (adjust if your actual FKs differ)
125128
if err := migrator.DropTable(&TableVirtualKey{}); err != nil {
@@ -184,10 +187,11 @@ func migrationInit(db *gorm.DB) error {
184187
}
185188

186189
// createMany2ManyJoinTable creates a many-to-many join table for the given tables.
187-
func migrationMany2ManyJoinTable(db *gorm.DB) error {
190+
func migrationMany2ManyJoinTable(ctx context.Context, db *gorm.DB) error {
188191
m := migration.New(db, migration.DefaultOptions, []*migration.Migration{{
189192
ID: "many2manyjoin",
190193
Migrate: func(tx *gorm.DB) error {
194+
tx = tx.WithContext(ctx)
191195
migrator := tx.Migrator()
192196

193197
// create the many-to-many join table for virtual keys and keys
@@ -222,10 +226,11 @@ func migrationMany2ManyJoinTable(db *gorm.DB) error {
222226
return nil
223227
}
224228

225-
func migrationAddCustomProviderConfigJSONColumn(db *gorm.DB) error {
229+
func migrationAddCustomProviderConfigJSONColumn(ctx context.Context, db *gorm.DB) error {
226230
m := migration.New(db, migration.DefaultOptions, []*migration.Migration{{
227231
ID: "addcustomproviderconfigjsoncolumn",
228232
Migrate: func(tx *gorm.DB) error {
233+
tx = tx.WithContext(ctx)
229234
migrator := tx.Migrator()
230235

231236
if !migrator.HasColumn(&TableProvider{}, "custom_provider_config_json") {

0 commit comments

Comments
 (0)