Skip to content

Commit eec7987

Browse files
Update middlewares.go
1 parent d03c528 commit eec7987

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+3199
-1919
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ help: ## Show this help message
3737
cleanup-enterprise: ## Clean up enterprise directories if present
3838
@echo "$(GREEN)Cleaning up enterprise...$(NC)"
3939
@if [ -d "ui/app/enterprise" ]; then rm -rf ui/app/enterprise; fi
40-
@if [ -d "ui/app/enterprise" ]; then rm -rf ui/app/enterprise; fi
4140
@echo "$(GREEN)Enterprise cleaned up$(NC)"
4241

4342
install-ui: cleanup-enterprise

core/bifrost.go

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,44 @@ func (bifrost *Bifrost) TranscriptionStreamRequest(ctx context.Context, req *sch
294294
return bifrost.handleStreamRequest(ctx, req, schemas.TranscriptionStreamRequest)
295295
}
296296

297+
// RemovePlugin removes a plugin from the server.
298+
func (bifrost *Bifrost) RemovePlugin(name string) error {
299+
var pluginToCleanup schemas.Plugin
300+
for {
301+
oldPlugins := bifrost.plugins.Load()
302+
if oldPlugins == nil {
303+
return nil
304+
}
305+
// Create new slice with replaced plugin
306+
newPlugins := make([]schemas.Plugin, len(*oldPlugins))
307+
copy(newPlugins, *oldPlugins)
308+
for i, p := range newPlugins {
309+
if p.GetName() == name {
310+
pluginToCleanup = p
311+
bifrost.logger.Debug("removing plugin %s", name)
312+
newPlugins = append(newPlugins[:i], newPlugins[i+1:]...)
313+
break
314+
}
315+
}
316+
// Atomic compare-and-swap
317+
if bifrost.plugins.CompareAndSwap(oldPlugins, &newPlugins) {
318+
// Cleanup the old plugin
319+
if pluginToCleanup != nil {
320+
err := pluginToCleanup.Cleanup()
321+
if err != nil {
322+
bifrost.logger.Warn("failed to cleanup old plugin %s: %v", pluginToCleanup.GetName(), err)
323+
}
324+
}
325+
return nil
326+
}
327+
// Retrying as swapping did not work
328+
}
329+
}
330+
297331
// ReloadPlugin reloads a plugin with new instance
298332
// During the reload - it's stop the world phase where we take a global lock on the plugin mutex
299-
func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
333+
func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
334+
var pluginToCleanup schemas.Plugin
300335
for {
301336
oldPlugins := bifrost.plugins.Load()
302337
if oldPlugins == nil {
@@ -308,22 +343,32 @@ func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
308343
found := false
309344
for i, p := range newPlugins {
310345
if p.GetName() == plugin.GetName() {
346+
// Cleaning up old plugin before replacing it
347+
pluginToCleanup = p
348+
bifrost.logger.Debug("replacing plugin %s with new instance", plugin.GetName())
311349
newPlugins[i] = plugin
312350
found = true
313351
break
314352
}
315353
}
316-
if !found{
354+
if !found {
317355
// This means that user is adding a new plugin
356+
bifrost.logger.Debug("adding new plugin %s", plugin.GetName())
318357
newPlugins = append(newPlugins, plugin)
319358
}
320359
// Atomic compare-and-swap
321360
if bifrost.plugins.CompareAndSwap(oldPlugins, &newPlugins) {
361+
// Cleanup the old plugin
362+
if pluginToCleanup != nil {
363+
err := pluginToCleanup.Cleanup()
364+
if err != nil {
365+
bifrost.logger.Warn("failed to cleanup old plugin %s: %v", pluginToCleanup.GetName(), err)
366+
}
367+
}
322368
return nil
323369
}
324370
// Retrying as swapping did not work
325371
}
326-
327372
}
328373

329374
// UpdateProviderConcurrency dynamically updates the queue size and concurrency for an existing provider.
@@ -1023,7 +1068,7 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
10231068

10241069
msg := bifrost.getChannelMessage(*preReq, requestType)
10251070
msg.Context = ctx
1026-
1071+
startTime := time.Now()
10271072
select {
10281073
case queue <- *msg:
10291074
// Message was sent successfully
@@ -1049,6 +1094,10 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
10491094
var resp *schemas.BifrostResponse
10501095
select {
10511096
case result = <-msg.Response:
1097+
latency := time.Since(startTime).Milliseconds()
1098+
if result.ExtraFields.Latency == nil {
1099+
result.ExtraFields.Latency = &latency
1100+
}
10521101
resp, bifrostErr := pipeline.RunPostHooks(&ctx, result, nil, len(*bifrost.plugins.Load()))
10531102
if bifrostErr != nil {
10541103
bifrost.releaseChannelMessage(msg)
@@ -1172,7 +1221,7 @@ func (bifrost *Bifrost) tryStreamRequest(req *schemas.BifrostRequest, ctx contex
11721221
// Marking final chunk
11731222
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
11741223
// On error we will complete post-hooks
1175-
recoveredResp, recoveredErr := pipeline.RunPostHooks(&ctx, nil, &bifrostErrVal, len(bifrost.plugins))
1224+
recoveredResp, recoveredErr := pipeline.RunPostHooks(&ctx, nil, &bifrostErrVal, len(*bifrost.plugins.Load()))
11761225
bifrost.releaseChannelMessage(msg)
11771226
if recoveredErr != nil {
11781227
return nil, recoveredErr
@@ -1375,6 +1424,7 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
13751424
var shortCircuit *schemas.PluginShortCircuit
13761425
var err error
13771426
for i, plugin := range p.plugins {
1427+
p.logger.Debug("running pre-hook for plugin %s", plugin.GetName())
13781428
req, shortCircuit, err = plugin.PreHook(ctx, req)
13791429
if err != nil {
13801430
p.preHookErrors = append(p.preHookErrors, err)
@@ -1391,17 +1441,19 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
13911441
// RunPostHooks executes PostHooks in reverse order for the plugins whose PreHook ran.
13921442
// Accepts the response and error, and allows plugins to transform either (e.g., recover from error, or invalidate a response).
13931443
// Returns the final response and error after all hooks. If both are set, error takes precedence unless error is nil.
1394-
func (p *PluginPipeline) RunPostHooks(ctx *context.Context, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError, count int) (*schemas.BifrostResponse, *schemas.BifrostError) {
1444+
// runFrom is the count of plugins whose PreHooks ran; PostHooks will run in reverse from index (runFrom - 1) down to 0
1445+
func (p *PluginPipeline) RunPostHooks(ctx *context.Context, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError, runFrom int) (*schemas.BifrostResponse, *schemas.BifrostError) {
13951446
// Defensive: ensure count is within valid bounds
1396-
if count < 0 {
1397-
count = 0
1447+
if runFrom < 0 {
1448+
runFrom = 0
13981449
}
1399-
if count > len(p.plugins) {
1400-
count = len(p.plugins)
1450+
if runFrom > len(p.plugins) {
1451+
runFrom = len(p.plugins)
14011452
}
14021453
var err error
1403-
for i := count - 1; i >= 0; i-- {
1454+
for i := runFrom - 1; i >= 0; i-- {
14041455
plugin := p.plugins[i]
1456+
p.logger.Debug("running post-hook for plugin %s", plugin.GetName())
14051457
resp, bifrostErr, err = plugin.PostHook(ctx, resp, bifrostErr)
14061458
if err != nil {
14071459
p.postHookErrors = append(p.postHookErrors, err)
@@ -1618,4 +1670,5 @@ func (bifrost *Bifrost) Shutdown() {
16181670
bifrost.logger.Warn(fmt.Sprintf("Error cleaning up plugin: %s", err.Error()))
16191671
}
16201672
}
1673+
bifrost.logger.Info("all request channels closed")
16211674
}

core/changelog.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
<!-- The pattern we follow here is to keep the changelog for the latest version -->
22
<!-- Old changelogs are automatically attached to the GitHub releases -->
33

4-
- Fix: Updates token calculation for streaming responses. #520
4+
- Feature: Adds dynamic reloads for plugins. This removes requirement of restarts on updating plugins.

core/providers/anthropic.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche
516516
Text: *msg.Content.ContentStr,
517517
})
518518
} else if msg.Content.ContentBlocks != nil {
519-
for _, block := range *msg.Content.ContentBlocks {
519+
for _, block := range msg.Content.ContentBlocks {
520520
if block.Text != nil {
521521
systemMessages = append(systemMessages, BedrockAnthropicSystemMessage{
522522
Text: *block.Text,
@@ -547,7 +547,7 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche
547547
"text": *msg.Content.ContentStr,
548548
})
549549
} else if msg.Content.ContentBlocks != nil {
550-
for _, block := range *msg.Content.ContentBlocks {
550+
for _, block := range msg.Content.ContentBlocks {
551551
if block.Text != nil {
552552
toolCallResultContent = append(toolCallResultContent, map[string]interface{}{
553553
"type": "text",
@@ -567,7 +567,7 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche
567567
"text": *msg.Content.ContentStr,
568568
})
569569
} else if msg.Content.ContentBlocks != nil {
570-
for _, block := range *msg.Content.ContentBlocks {
570+
for _, block := range msg.Content.ContentBlocks {
571571
if block.Text != nil && *block.Text != "" {
572572
content = append(content, map[string]interface{}{
573573
"type": "text",
@@ -596,7 +596,7 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche
596596

597597
// Add tool calls as content if present
598598
if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
599-
for _, toolCall := range *msg.AssistantMessage.ToolCalls {
599+
for _, toolCall := range msg.AssistantMessage.ToolCalls {
600600
if toolCall.Function.Name != nil {
601601
var input map[string]interface{}
602602
if toolCall.Function.Arguments != "" {
@@ -639,9 +639,9 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche
639639
}
640640

641641
// Transform tools if present
642-
if params != nil && params.Tools != nil && len(*params.Tools) > 0 {
642+
if params != nil && params.Tools != nil && len(params.Tools) > 0 {
643643
var tools []map[string]interface{}
644-
for _, tool := range *params.Tools {
644+
for _, tool := range params.Tools {
645645
tools = append(tools, map[string]interface{}{
646646
"name": tool.Function.Name,
647647
"description": tool.Function.Description,
@@ -791,7 +791,7 @@ func parseAnthropicResponse(response *AnthropicChatResponse, bifrostResponse *sc
791791
if len(toolCalls) > 0 || thinking != "" {
792792
assistantMessage = &schemas.AssistantMessage{}
793793
if len(toolCalls) > 0 {
794-
assistantMessage.ToolCalls = &toolCalls
794+
assistantMessage.ToolCalls = toolCalls
795795
}
796796
if thinking != "" {
797797
assistantMessage.Thought = &thinking

core/providers/bedrock.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type BedrockMistralContent struct {
8888
type BedrockMistralChatMessage struct {
8989
Role schemas.ModelChatMessageRole `json:"role"` // Role of the message sender
9090
Content []BedrockMistralContent `json:"content"` // Array of message content
91-
ToolCalls *[]BedrockAnthropicToolCall `json:"tool_calls,omitempty"` // Optional tool calls
91+
ToolCalls []BedrockAnthropicToolCall `json:"tool_calls,omitempty"` // Optional tool calls
9292
ToolCallID *string `json:"tool_call_id,omitempty"` // Optional tool call ID
9393
}
9494

@@ -514,7 +514,7 @@ func (provider *BedrockProvider) prepareChatCompletionMessages(messages []schema
514514
} else {
515515
// Bedrock wants only toolUse block on content, text blocks are not allowed when tools are called.
516516
if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
517-
for _, toolCall := range *msg.AssistantMessage.ToolCalls {
517+
for _, toolCall := range msg.AssistantMessage.ToolCalls {
518518
var input map[string]interface{}
519519
if toolCall.Function.Arguments != "" {
520520
if err := sonic.Unmarshal([]byte(toolCall.Function.Arguments), &input); err != nil {
@@ -685,7 +685,7 @@ func (provider *BedrockProvider) prepareChatCompletionMessages(messages []schema
685685

686686
var filteredToolCalls []BedrockAnthropicToolCall
687687
if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
688-
for _, toolCall := range *msg.AssistantMessage.ToolCalls {
688+
for _, toolCall := range msg.AssistantMessage.ToolCalls {
689689
if toolCall.ID != nil && toolCall.Function.Name != nil {
690690
// Parse the arguments to get parameters
691691
var params interface{}
@@ -880,7 +880,7 @@ func (provider *BedrockProvider) extractToolsFromHistory(messages []schemas.Bifr
880880
if msg.Role == schemas.ModelChatMessageRoleAssistant && msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
881881
hasToolContent = true
882882
// Extract tool definitions from tool calls for toolConfig
883-
for _, toolCall := range *msg.AssistantMessage.ToolCalls {
883+
for _, toolCall := range msg.AssistantMessage.ToolCalls {
884884
if toolCall.Function.Name != nil {
885885
toolName := *toolCall.Function.Name
886886
if _, exists := seenTools[toolName]; !exists {
@@ -1077,7 +1077,7 @@ func (provider *BedrockProvider) ChatCompletion(ctx context.Context, model strin
10771077
// Create AssistantMessage if we have tool calls
10781078
if len(toolCalls) > 0 {
10791079
assistantMessage = &schemas.AssistantMessage{
1080-
ToolCalls: &toolCalls,
1080+
ToolCalls: toolCalls,
10811081
}
10821082
}
10831083

core/providers/cohere.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ type CohereStreamTextEvent struct {
146146
Text string `json:"text"` // Text content being generated
147147
}
148148

149-
// CohereStreamToolEvent represents the tool use event.
149+
// CohereStreamToolCallEvent represents the tool use event.
150150
type CohereStreamToolCallEvent struct {
151151
EventType string `json:"event_type"` // tool-use
152152
ToolCall struct {
@@ -338,7 +338,7 @@ func (provider *CohereProvider) ChatCompletion(ctx context.Context, model string
338338
ContentStr: &content,
339339
},
340340
AssistantMessage: &schemas.AssistantMessage{
341-
ToolCalls: &toolCalls,
341+
ToolCalls: toolCalls,
342342
},
343343
},
344344
},
@@ -391,7 +391,7 @@ func prepareCohereChatRequest(messages []schemas.BifrostMessage, params *schemas
391391
if msg.Role == schemas.ModelChatMessageRoleAssistant {
392392
if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
393393
var toolCalls []map[string]interface{}
394-
for _, toolCall := range *msg.AssistantMessage.ToolCalls {
394+
for _, toolCall := range msg.AssistantMessage.ToolCalls {
395395
var arguments map[string]interface{}
396396
var parsedJSON interface{}
397397
err := sonic.Unmarshal([]byte(toolCall.Function.Arguments), &parsedJSON)
@@ -424,7 +424,7 @@ func prepareCohereChatRequest(messages []schemas.BifrostMessage, params *schemas
424424
prevMsg.AssistantMessage.ToolCalls != nil {
425425

426426
// Search through tool calls in this assistant message
427-
for _, toolCall := range *prevMsg.AssistantMessage.ToolCalls {
427+
for _, toolCall := range prevMsg.AssistantMessage.ToolCalls {
428428
if toolCall.ID != nil && msg.ToolMessage != nil && msg.ToolMessage.ToolCallID != nil &&
429429
*toolCall.ID == *msg.ToolMessage.ToolCallID {
430430

@@ -588,7 +588,7 @@ func convertChatHistory(history []struct {
588588
Role schemas.ModelChatMessageRole `json:"role"`
589589
Message string `json:"message"`
590590
ToolCalls []CohereToolCall `json:"tool_calls"`
591-
}) *[]schemas.BifrostMessage {
591+
}) []schemas.BifrostMessage {
592592
converted := make([]schemas.BifrostMessage, len(history))
593593
for i, msg := range history {
594594
var toolCalls []schemas.ToolCall
@@ -617,7 +617,7 @@ func convertChatHistory(history []struct {
617617
ContentStr: &msg.Message,
618618
},
619619
AssistantMessage: &schemas.AssistantMessage{
620-
ToolCalls: &toolCalls,
620+
ToolCalls: toolCalls,
621621
},
622622
}
623623
}

core/providers/gemini.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ import (
1717
"github.com/valyala/fasthttp"
1818
)
1919

20-
// Response message for PredictionService.GenerateContent.
20+
// GenerateContentResponse represents the response message for PredictionService.GenerateContent.
2121
type GenerateContentResponse struct {
2222
// Response variations returned by the model.
2323
Candidates []*Candidate `json:"candidates,omitempty"`
2424
// Usage metadata about the response(s).
2525
UsageMetadata *GenerateContentResponseUsageMetadata `json:"usageMetadata,omitempty"`
2626
}
2727

28-
// A response candidate generated from the model.
28+
// Candidate represents a response candidate generated from the model.
2929
type Candidate struct {
3030
// Optional. Contains the multi-part content of the response.
3131
Content *Content `json:"content,omitempty"`
@@ -36,7 +36,7 @@ type Candidate struct {
3636
Index int32 `json:"index,omitempty"`
3737
}
3838

39-
// Contains the multi-part content of a message.
39+
// Content represents the multi-part content of a message.
4040
type Content struct {
4141
// Optional. List of parts that constitute a single message. Each part may have
4242
// a different IANA MIME type.
@@ -47,7 +47,7 @@ type Content struct {
4747
Role string `json:"role,omitempty"`
4848
}
4949

50-
// A datatype containing media content.
50+
// Part represents a datatype containing media content.
5151
// Exactly one field within a Part should be set, representing the specific type
5252
// of content being conveyed. Using multiple fields within the same `Part`
5353
// instance is considered invalid.
@@ -58,13 +58,13 @@ type Part struct {
5858
Text string `json:"text,omitempty"`
5959
}
6060

61-
// Content blob.
61+
// Blob represents a content blob.
6262
type Blob struct {
6363
// Required. Raw bytes.
6464
Data []byte `json:"data,omitempty"`
6565
}
6666

67-
// Usage metadata about response(s).
67+
// GenerateContentResponseUsageMetadata represents usage metadata about response(s).
6868
type GenerateContentResponseUsageMetadata struct {
6969
// Number of tokens in the response(s). This includes all the generated response candidates.
7070
CandidatesTokenCount int32 `json:"candidatesTokenCount,omitempty"`
@@ -198,10 +198,10 @@ func (provider *GeminiProvider) ChatCompletion(ctx context.Context, model string
198198
if choice.Message.AssistantMessage == nil || choice.Message.AssistantMessage.ToolCalls == nil {
199199
continue
200200
}
201-
for i, toolCall := range *choice.Message.AssistantMessage.ToolCalls {
201+
for i, toolCall := range choice.Message.AssistantMessage.ToolCalls {
202202
if (toolCall.ID == nil || *toolCall.ID == "") && toolCall.Function.Name != nil && *toolCall.Function.Name != "" {
203203
id := *toolCall.Function.Name
204-
(*choice.Message.AssistantMessage.ToolCalls)[i].ID = &id
204+
choice.Message.AssistantMessage.ToolCalls[i].ID = &id
205205
}
206206
}
207207
}

core/providers/openai.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func prepareOpenAIChatRequest(messages []schemas.BifrostMessage, params *schemas
192192
"content": msg.Content,
193193
}
194194
if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
195-
assistantMessage["tool_calls"] = *msg.AssistantMessage.ToolCalls
195+
assistantMessage["tool_calls"] = msg.AssistantMessage.ToolCalls
196196
}
197197
formattedMessages = append(formattedMessages, assistantMessage)
198198
} else {

0 commit comments

Comments
 (0)