From 1e6bd15ad94f3f30e0c719e19274881463007806 Mon Sep 17 00:00:00 2001 From: Assaf Date: Tue, 24 Jun 2025 12:55:06 +0300 Subject: [PATCH 1/3] Update opik_filter_pipeline.py --- examples/filters/opik_filter_pipeline.py | 264 ++++++++++++++--------- 1 file changed, 167 insertions(+), 97 deletions(-) diff --git a/examples/filters/opik_filter_pipeline.py b/examples/filters/opik_filter_pipeline.py index ab768b0a..5494488d 100644 --- a/examples/filters/opik_filter_pipeline.py +++ b/examples/filters/opik_filter_pipeline.py @@ -2,9 +2,9 @@ title: Opik Filter Pipeline author: open-webui date: 2025-03-12 -version: 1.0 +version: 1.1 license: MIT -description: A filter pipeline that uses Opik for LLM observability. +description: A filter pipeline that uses Opik for LLM observability with improved error handling. requirements: opik """ @@ -12,6 +12,7 @@ import os import uuid import json +import time from pydantic import BaseModel from opik import Opik @@ -97,11 +98,50 @@ def set_opik(self): f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings." ) + def cleanup_existing_trace(self, chat_id: str): + """Safely cleanup existing trace and span for a chat_id""" + try: + if chat_id in self.chat_spans: + existing_span = self.chat_spans[chat_id] + try: + existing_span.end(output={"status": "interrupted", "reason": "new_request_received"}) + self.log(f"Ended existing span for chat_id: {chat_id}") + except Exception as e: + self.log(f"Warning: Could not end existing span: {e}") + + if chat_id in self.chat_traces: + existing_trace = self.chat_traces[chat_id] + try: + existing_trace.end(output={"status": "interrupted", "reason": "new_request_received"}) + self.log(f"Ended existing trace for chat_id: {chat_id}") + except Exception as e: + self.log(f"Warning: Could not end existing trace: {e}") + + # Clean up the dictionaries + self.chat_traces.pop(chat_id, None) + self.chat_spans.pop(chat_id, None) + self.log(f"Cleaned up existing trace/span for chat_id: {chat_id}") + + except Exception as e: + self.log(f"Error during cleanup for chat_id {chat_id}: {e}") + # Force cleanup even if there are errors + self.chat_traces.pop(chat_id, None) + self.chat_spans.pop(chat_id, None) + + def cleanup_stale_traces(self, max_count: int = 100): + """Clean up traces if we have too many active ones""" + if len(self.chat_traces) > max_count: + self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones") + # Clean up oldest traces (simple FIFO approach) + chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10] + for chat_id in chat_ids_to_remove: + self.cleanup_existing_trace(chat_id) + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: """ Inlet handles the incoming request (usually a user message). - If no trace exists yet for this chat_id, we create a new trace. - - If a trace does exist, we simply create a new span for the new user message. + - If a trace does exist, we clean it up and create a new one. """ if self.valves.debug: print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") @@ -117,7 +157,8 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: return body if "chat_id" not in metadata: - chat_id = str(uuid.uuid4()) # Regular chat messages + # Generate unique chat_id with timestamp for extra uniqueness + chat_id = f"{uuid.uuid4()}-{int(time.time() * 1000)}" self.log(f"Assigned normal chat_id: {chat_id}") metadata["chat_id"] = chat_id @@ -136,65 +177,78 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: user_email = user.get("email") if user else None - assert chat_id not in self.chat_traces, ( - f"There shouldn't be a trace already exists for chat_id {chat_id}" - ) + # FIXED: Instead of asserting, clean up any existing trace/span + if chat_id in self.chat_traces: + self.log(f"Found existing trace for chat_id {chat_id}, cleaning up...") + self.cleanup_existing_trace(chat_id) + + # Periodic cleanup to prevent memory leaks + self.cleanup_stale_traces() # Create a new trace and span self.log(f"Creating new chat trace for chat_id: {chat_id}") - # Body copy for traces and span - trace_body = body.copy() - span_body = body.copy() - - # Extract metadata from body - metadata = trace_body.pop("metadata", {}) - metadata.update({"chat_id": chat_id, "user_id": user_email}) + try: + # Body copy for traces and span + trace_body = body.copy() + span_body = body.copy() + + # Extract metadata from body + metadata = trace_body.pop("metadata", {}) + metadata.update({"chat_id": chat_id, "user_id": user_email}) + + # We don't need the model at the trace level + trace_body.pop("model", None) + + trace_payload = { + "name": f"{__name__}", + "input": trace_body, + "metadata": metadata, + "thread_id": chat_id, + } - # We don't need the model at the trace level - trace_body.pop("model", None) + if self.valves.debug: + print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") - trace_payload = { - "name": f"{__name__}", - "input": trace_body, - "metadata": metadata, - "thread_id": chat_id, - } + trace = self.opik.trace(**trace_payload) - if self.valves.debug: - print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") + span_metadata = metadata.copy() + span_metadata.update({"interface": "open-webui"}) - trace = self.opik.trace(**trace_payload) + # Extract the model from body + span_body.pop("model", None) + # We don't need the metadata in the input for the span + span_body.pop("metadata", None) - span_metadata = metadata.copy() - span_metadata.update({"interface": "open-webui"}) + # Extract the model and provider from metadata + model = span_metadata.get("model", {}).get("id", None) + provider = span_metadata.get("model", {}).get("owned_by", None) - # Extract the model from body - span_body.pop("model", None) - # We don't need the metadata in the input for the span - span_body.pop("metadata", None) + span_payload = { + "name": chat_id, + "model": model, + "provider": provider, + "input": span_body, + "metadata": span_metadata, + "type": "llm", + } - # Extract the model and provider from metadata - model = span_metadata.get("model", {}).get("id", None) - provider = span_metadata.get("model", {}).get("owned_by", None) + if self.valves.debug: + print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}") - span_payload = { - "name": chat_id, - "model": model, - "provider": provider, - "input": span_body, - "metadata": span_metadata, - "type": "llm", - } + span = trace.span(**span_payload) - if self.valves.debug: - print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}") + self.chat_traces[chat_id] = trace + self.chat_spans[chat_id] = span + self.log(f"Trace and span objects successfully created for chat_id: {chat_id}") - span = trace.span(**span_payload) - - self.chat_traces[chat_id] = trace - self.chat_spans[chat_id] = span - self.log(f"Trace and span objects successfully created for chat_id: {chat_id}") + except Exception as e: + self.log(f"Error creating Opik trace/span for chat_id {chat_id}: {e}") + # Clean up on error + self.chat_traces.pop(chat_id, None) + self.chat_spans.pop(chat_id, None) + # Don't fail the request, just log the error + self.log(f"Continuing without Opik logging for this request") return body @@ -217,58 +271,74 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: trace = self.chat_traces[chat_id] span = self.chat_spans[chat_id] - # Body copy for traces and span - trace_body = body.copy() - span_body = body.copy() - - # Get the last assistant message from the conversation - assistant_message_obj = get_last_assistant_message_obj(body["messages"]) - - # Extract usage if available - usage = None - self.log(f"Assistant message obj: {assistant_message_obj}") - if assistant_message_obj: - message_usage = assistant_message_obj.get("usage", {}) - if isinstance(message_usage, dict): - input_tokens = message_usage.get( - "prompt_eval_count" - ) or message_usage.get("prompt_tokens") - output_tokens = message_usage.get("eval_count") or message_usage.get( - "completion_tokens" - ) - if input_tokens is not None and output_tokens is not None: - usage = { - "prompt_tokens": input_tokens, - "completion_tokens": output_tokens, - "total_tokens": input_tokens + output_tokens, - } - self.log(f"Usage data extracted: {usage}") - - # Chat_id is already logged as trace thread - span_body.pop("chat_id", None) - - # End the span with the final assistant message and updated conversation - span_payload = { - "output": span_body, # include the entire conversation - "usage": usage, - } + try: + # Body copy for traces and span + trace_body = body.copy() + span_body = body.copy() + + # Get the last assistant message from the conversation + assistant_message_obj = get_last_assistant_message_obj(body["messages"]) + + # Extract usage if available + usage = None + self.log(f"Assistant message obj: {assistant_message_obj}") + if assistant_message_obj: + message_usage = assistant_message_obj.get("usage", {}) + if isinstance(message_usage, dict): + input_tokens = message_usage.get( + "prompt_eval_count" + ) or message_usage.get("prompt_tokens") + output_tokens = message_usage.get("eval_count") or message_usage.get( + "completion_tokens" + ) + if input_tokens is not None and output_tokens is not None: + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + } + self.log(f"Usage data extracted: {usage}") + + # Chat_id is already logged as trace thread + span_body.pop("chat_id", None) + + # End the span with the final assistant message and updated conversation + span_payload = { + "output": span_body, # include the entire conversation + "usage": usage, + } - if self.valves.debug: - print( - f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}" - ) + if self.valves.debug: + print( + f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}" + ) - span.end(**span_payload) - self.log(f"span ended for chat_id: {chat_id}") + span.end(**span_payload) + self.log(f"span ended for chat_id: {chat_id}") - # Chat_id is already logged as trace thread - span_body.pop("chat_id", None) + # Chat_id is already logged as trace thread + trace_body.pop("chat_id", None) - # Optionally update the trace with the final assistant output - trace.end(output=trace_body) + # Optionally update the trace with the final assistant output + trace.end(output=trace_body) + self.log(f"trace ended for chat_id: {chat_id}") - # Force the creation of a new trace and span for the next chat even if they are part of the same thread - del self.chat_traces[chat_id] - del self.chat_spans[chat_id] + except Exception as e: + self.log(f"Error ending Opik trace/span for chat_id {chat_id}: {e}") + # Try to end gracefully even if there are errors + try: + span.end(output={"status": "error", "error": str(e)}) + except: + pass + try: + trace.end(output={"status": "error", "error": str(e)}) + except: + pass + + finally: + # Always clean up the dictionaries, even if ending the trace/span failed + self.chat_traces.pop(chat_id, None) + self.chat_spans.pop(chat_id, None) + self.log(f"Cleaned up trace/span references for chat_id: {chat_id}") return body From b6cc9ae1375460221994109b572e198e56b31bb4 Mon Sep 17 00:00:00 2001 From: Assaf Date: Tue, 24 Jun 2025 12:59:39 +0300 Subject: [PATCH 2/3] Update opik_filter_pipeline.py --- examples/filters/opik_filter_pipeline.py | 182 ++++++++++++----------- 1 file changed, 92 insertions(+), 90 deletions(-) diff --git a/examples/filters/opik_filter_pipeline.py b/examples/filters/opik_filter_pipeline.py index 5494488d..3098e140 100644 --- a/examples/filters/opik_filter_pipeline.py +++ b/examples/filters/opik_filter_pipeline.py @@ -98,50 +98,47 @@ def set_opik(self): f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings." ) - def cleanup_existing_trace(self, chat_id: str): - """Safely cleanup existing trace and span for a chat_id""" - try: - if chat_id in self.chat_spans: + def cleanup_stale_spans(self, chat_id: str): + """Clean up any existing span for a chat_id to prepare for a new one""" + if chat_id in self.chat_spans: + try: existing_span = self.chat_spans[chat_id] - try: - existing_span.end(output={"status": "interrupted", "reason": "new_request_received"}) - self.log(f"Ended existing span for chat_id: {chat_id}") - except Exception as e: - self.log(f"Warning: Could not end existing span: {e}") - - if chat_id in self.chat_traces: - existing_trace = self.chat_traces[chat_id] - try: - existing_trace.end(output={"status": "interrupted", "reason": "new_request_received"}) - self.log(f"Ended existing trace for chat_id: {chat_id}") - except Exception as e: - self.log(f"Warning: Could not end existing trace: {e}") - - # Clean up the dictionaries - self.chat_traces.pop(chat_id, None) - self.chat_spans.pop(chat_id, None) - self.log(f"Cleaned up existing trace/span for chat_id: {chat_id}") - - except Exception as e: - self.log(f"Error during cleanup for chat_id {chat_id}: {e}") - # Force cleanup even if there are errors - self.chat_traces.pop(chat_id, None) - self.chat_spans.pop(chat_id, None) - - def cleanup_stale_traces(self, max_count: int = 100): + # End the previous span before creating a new one + existing_span.end(output={"status": "interrupted", "reason": "new_message_received"}) + self.log(f"Ended previous span for chat_id: {chat_id}") + except Exception as e: + self.log(f"Warning: Could not end existing span for {chat_id}: {e}") + finally: + # Always remove from tracking + self.chat_spans.pop(chat_id, None) + + def cleanup_orphaned_traces(self, max_count: int = 100): """Clean up traces if we have too many active ones""" if len(self.chat_traces) > max_count: self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones") # Clean up oldest traces (simple FIFO approach) chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10] for chat_id in chat_ids_to_remove: - self.cleanup_existing_trace(chat_id) + try: + if chat_id in self.chat_spans: + span = self.chat_spans[chat_id] + span.end(output={"status": "cleanup", "reason": "too_many_active_traces"}) + except: + pass + try: + if chat_id in self.chat_traces: + trace = self.chat_traces[chat_id] + trace.end(output={"status": "cleanup", "reason": "too_many_active_traces"}) + except: + pass + self.chat_traces.pop(chat_id, None) + self.chat_spans.pop(chat_id, None) async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: """ Inlet handles the incoming request (usually a user message). - If no trace exists yet for this chat_id, we create a new trace. - - If a trace does exist, we clean it up and create a new one. + - If a trace does exist, we reuse it and create a new span for the new user message. """ if self.valves.debug: print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") @@ -177,43 +174,59 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: user_email = user.get("email") if user else None - # FIXED: Instead of asserting, clean up any existing trace/span - if chat_id in self.chat_traces: - self.log(f"Found existing trace for chat_id {chat_id}, cleaning up...") - self.cleanup_existing_trace(chat_id) - # Periodic cleanup to prevent memory leaks - self.cleanup_stale_traces() + self.cleanup_orphaned_traces() - # Create a new trace and span - self.log(f"Creating new chat trace for chat_id: {chat_id}") + # FIXED: Check if trace already exists + trace = None + if chat_id in self.chat_traces: + # Reuse existing trace for continuing conversation + trace = self.chat_traces[chat_id] + self.log(f"Reusing existing trace for chat_id: {chat_id}") + + # Clean up any existing span to prepare for new one + self.cleanup_stale_spans(chat_id) + else: + # Create a new trace for new conversation + self.log(f"Creating new chat trace for chat_id: {chat_id}") + + try: + # Body copy for trace + trace_body = body.copy() - try: - # Body copy for traces and span - trace_body = body.copy() - span_body = body.copy() + # Extract metadata from body + trace_metadata = trace_body.pop("metadata", {}) + trace_metadata.update({"chat_id": chat_id, "user_id": user_email}) - # Extract metadata from body - metadata = trace_body.pop("metadata", {}) - metadata.update({"chat_id": chat_id, "user_id": user_email}) + # We don't need the model at the trace level + trace_body.pop("model", None) - # We don't need the model at the trace level - trace_body.pop("model", None) + trace_payload = { + "name": f"{__name__}", + "input": trace_body, + "metadata": trace_metadata, + "thread_id": chat_id, + } - trace_payload = { - "name": f"{__name__}", - "input": trace_body, - "metadata": metadata, - "thread_id": chat_id, - } + if self.valves.debug: + print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") - if self.valves.debug: - print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") + trace = self.opik.trace(**trace_payload) + self.chat_traces[chat_id] = trace + self.log(f"New trace created for chat_id: {chat_id}") - trace = self.opik.trace(**trace_payload) + except Exception as e: + self.log(f"Error creating Opik trace for chat_id {chat_id}: {e}") + # Continue without Opik logging for this request + return body + + # Create a new span (whether trace is new or existing) + try: + # Body copy for span + span_body = body.copy() span_metadata = metadata.copy() - span_metadata.update({"interface": "open-webui"}) + span_metadata.update({"interface": "open-webui", "user_id": user_email}) # Extract the model from body span_body.pop("model", None) @@ -224,8 +237,11 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: model = span_metadata.get("model", {}).get("id", None) provider = span_metadata.get("model", {}).get("owned_by", None) + # Generate unique span name with timestamp + span_name = f"{chat_id}-{int(time.time() * 1000)}" + span_payload = { - "name": chat_id, + "name": span_name, "model": model, "provider": provider, "input": span_body, @@ -237,18 +253,12 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}") span = trace.span(**span_payload) - - self.chat_traces[chat_id] = trace self.chat_spans[chat_id] = span - self.log(f"Trace and span objects successfully created for chat_id: {chat_id}") + self.log(f"New span created for chat_id: {chat_id}") except Exception as e: - self.log(f"Error creating Opik trace/span for chat_id {chat_id}: {e}") - # Clean up on error - self.chat_traces.pop(chat_id, None) - self.chat_spans.pop(chat_id, None) + self.log(f"Error creating Opik span for chat_id {chat_id}: {e}") # Don't fail the request, just log the error - self.log(f"Continuing without Opik logging for this request") return body @@ -261,19 +271,17 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: chat_id = body.get("chat_id") - # If no trace or span exist, attempt to register again - if chat_id not in self.chat_traces or chat_id not in self.chat_spans: + # If no span exists, we can't log this response + if chat_id not in self.chat_spans: self.log( - f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged." + f"[WARNING] No active span found for chat_id: {chat_id}, response won't be logged." ) return body - trace = self.chat_traces[chat_id] span = self.chat_spans[chat_id] try: - # Body copy for traces and span - trace_body = body.copy() + # Body copy for span span_body = body.copy() # Get the last assistant message from the conversation @@ -314,31 +322,25 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: ) span.end(**span_payload) - self.log(f"span ended for chat_id: {chat_id}") - - # Chat_id is already logged as trace thread - trace_body.pop("chat_id", None) - - # Optionally update the trace with the final assistant output - trace.end(output=trace_body) - self.log(f"trace ended for chat_id: {chat_id}") + self.log(f"Span ended for chat_id: {chat_id}") except Exception as e: - self.log(f"Error ending Opik trace/span for chat_id {chat_id}: {e}") + self.log(f"Error ending Opik span for chat_id {chat_id}: {e}") # Try to end gracefully even if there are errors try: span.end(output={"status": "error", "error": str(e)}) except: pass - try: - trace.end(output={"status": "error", "error": str(e)}) - except: - pass finally: - # Always clean up the dictionaries, even if ending the trace/span failed - self.chat_traces.pop(chat_id, None) + # Clean up the span reference (but keep the trace for potential future messages) self.chat_spans.pop(chat_id, None) - self.log(f"Cleaned up trace/span references for chat_id: {chat_id}") + self.log(f"Cleaned up span reference for chat_id: {chat_id}") + + # NOTE: We deliberately DON'T clean up the trace here, as it should persist + # for the duration of the conversation. Traces will be cleaned up by: + # 1. The cleanup_orphaned_traces method when there are too many + # 2. Server restart/shutdown + # 3. Manual cleanup if needed return body From 6f4d86392dcccae6dcafc2d45a312ce335086882 Mon Sep 17 00:00:00 2001 From: Assaf Date: Tue, 24 Jun 2025 13:15:56 +0300 Subject: [PATCH 3/3] Update opik_filter_pipeline.py --- examples/filters/opik_filter_pipeline.py | 271 +++++++++++++++++++++-- 1 file changed, 247 insertions(+), 24 deletions(-) diff --git a/examples/filters/opik_filter_pipeline.py b/examples/filters/opik_filter_pipeline.py index 3098e140..a74f101e 100644 --- a/examples/filters/opik_filter_pipeline.py +++ b/examples/filters/opik_filter_pipeline.py @@ -2,9 +2,9 @@ title: Opik Filter Pipeline author: open-webui date: 2025-03-12 -version: 1.1 +version: 1.3 license: MIT -description: A filter pipeline that uses Opik for LLM observability with improved error handling. +description: A comprehensive filter pipeline that uses Opik for LLM observability with improved error handling and universal usage tracking. Supports token counting and billing data for all major LLM providers including Anthropic (Claude), OpenAI (GPT), Google (Gemini), Meta (Llama), Mistral, Cohere, and others. requirements: opik """ @@ -134,6 +134,230 @@ def cleanup_orphaned_traces(self, max_count: int = 100): self.chat_traces.pop(chat_id, None) self.chat_spans.pop(chat_id, None) + def detect_provider_type(self, body: dict, metadata: dict) -> str: + """Detect the LLM provider type based on model name and response structure""" + model_info = metadata.get("model", {}) + model_id = model_info.get("id", "").lower() + model_name = body.get("model", "").lower() + + # Check model names/IDs for provider detection + if any(x in model_id or x in model_name for x in ["claude", "anthropic"]): + return "anthropic" + elif any(x in model_id or x in model_name for x in ["gpt", "openai", "o1"]): + return "openai" + elif any(x in model_id or x in model_name for x in ["gemini", "palm", "bard", "google"]): + return "google" + elif any(x in model_id or x in model_name for x in ["llama", "meta"]): + return "meta" + elif any(x in model_id or x in model_name for x in ["mistral"]): + return "mistral" + elif any(x in model_id or x in model_name for x in ["cohere"]): + return "cohere" + + # Check response structure for provider hints + if "usage" in body and "input_tokens" in body.get("usage", {}): + return "anthropic" + elif "usage" in body and "prompt_tokens" in body.get("usage", {}): + return "openai" + elif "usageMetadata" in body: + return "google" + + return "unknown" + + def extract_usage_data(self, body: dict, metadata: dict = None) -> Optional[dict]: + """Extract token usage data with support for multiple API providers (Anthropic, OpenAI, Gemini, etc.)""" + if metadata is None: + metadata = {} + + provider = self.detect_provider_type(body, metadata) + self.log(f"Detected provider: {provider}") + + usage = None + + # Method 1: Check for usage in response body (multiple provider formats) + if "usage" in body and isinstance(body["usage"], dict): + usage_data = body["usage"] + self.log(f"Found usage data in response body: {usage_data}") + + # Anthropic API format: input_tokens, output_tokens + input_tokens = usage_data.get("input_tokens") + output_tokens = usage_data.get("output_tokens") + + # OpenAI API format: prompt_tokens, completion_tokens + if input_tokens is None or output_tokens is None: + input_tokens = usage_data.get("prompt_tokens") + output_tokens = usage_data.get("completion_tokens") + + # Some variations use different field names + if input_tokens is None or output_tokens is None: + input_tokens = usage_data.get("promptTokens") or usage_data.get("prompt_token_count") + output_tokens = usage_data.get("completionTokens") or usage_data.get("completion_token_count") + + if input_tokens is not None and output_tokens is not None: + total_tokens = usage_data.get("total_tokens") or usage_data.get("totalTokens") or (input_tokens + output_tokens) + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": total_tokens, + } + self.log(f"Extracted usage data from response body: {usage}") + return usage + + # Method 2: Check for Gemini API format (usageMetadata) + if "usageMetadata" in body and isinstance(body["usageMetadata"], dict): + gemini_usage = body["usageMetadata"] + self.log(f"Found Gemini usage metadata: {gemini_usage}") + + input_tokens = ( + gemini_usage.get("promptTokenCount") or + gemini_usage.get("prompt_token_count") or + gemini_usage.get("inputTokens") + ) + output_tokens = ( + gemini_usage.get("candidatesTokenCount") or + gemini_usage.get("candidates_token_count") or + gemini_usage.get("outputTokens") or + gemini_usage.get("completionTokens") + ) + total_tokens = ( + gemini_usage.get("totalTokenCount") or + gemini_usage.get("total_token_count") or + gemini_usage.get("totalTokens") + ) + + if input_tokens is not None and output_tokens is not None: + if total_tokens is None: + total_tokens = input_tokens + output_tokens + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": total_tokens, + } + self.log(f"Extracted Gemini usage data: {usage}") + return usage + + # Method 3: Check assistant message for usage data (various formats) + assistant_message_obj = get_last_assistant_message_obj(body.get("messages", [])) + if assistant_message_obj: + message_usage = assistant_message_obj.get("usage", {}) + self.log(f"Assistant message usage: {message_usage}") + + if isinstance(message_usage, dict): + # Try multiple field name variations for different providers + input_tokens = ( + message_usage.get("input_tokens") or # Anthropic + message_usage.get("prompt_tokens") or # OpenAI + message_usage.get("prompt_eval_count") or # Some local models + message_usage.get("promptTokenCount") or # Gemini variants + message_usage.get("prompt_token_count") # Alternative naming + ) + output_tokens = ( + message_usage.get("output_tokens") or # Anthropic + message_usage.get("completion_tokens") or # OpenAI + message_usage.get("eval_count") or # Some local models + message_usage.get("candidatesTokenCount") or # Gemini variants + message_usage.get("completion_token_count") # Alternative naming + ) + total_tokens = ( + message_usage.get("total_tokens") or + message_usage.get("totalTokens") or + message_usage.get("totalTokenCount") + ) + + if input_tokens is not None and output_tokens is not None: + if total_tokens is None: + total_tokens = input_tokens + output_tokens + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": total_tokens, + } + self.log(f"Extracted message-level usage data: {usage}") + return usage + + # Method 4: Check for usage at individual message level (some APIs put it there) + if "messages" in body and isinstance(body["messages"], list): + for message in reversed(body["messages"]): + if message.get("role") == "assistant": + # Check multiple possible usage field locations + usage_sources = [ + message.get("usage", {}), + message.get("usageMetadata", {}), + message.get("metadata", {}).get("usage", {}) if message.get("metadata") else {} + ] + + for msg_usage in usage_sources: + if isinstance(msg_usage, dict) and msg_usage: + self.log(f"Found message usage: {msg_usage}") + + input_tokens = ( + msg_usage.get("input_tokens") or + msg_usage.get("prompt_tokens") or + msg_usage.get("promptTokenCount") or + msg_usage.get("prompt_eval_count") + ) + output_tokens = ( + msg_usage.get("output_tokens") or + msg_usage.get("completion_tokens") or + msg_usage.get("candidatesTokenCount") or + msg_usage.get("eval_count") + ) + total_tokens = ( + msg_usage.get("total_tokens") or + msg_usage.get("totalTokens") or + msg_usage.get("totalTokenCount") + ) + + if input_tokens is not None and output_tokens is not None: + if total_tokens is None: + total_tokens = input_tokens + output_tokens + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": total_tokens, + } + self.log(f"Extracted individual message usage: {usage}") + return usage + + # Method 5: Check alternative response structures (some proxies/wrappers) + for alt_key in ["token_usage", "billing", "cost_info", "metrics"]: + if alt_key in body and isinstance(body[alt_key], dict): + alt_usage = body[alt_key] + self.log(f"Found alternative usage data in {alt_key}: {alt_usage}") + + input_tokens = ( + alt_usage.get("input_tokens") or + alt_usage.get("prompt_tokens") or + alt_usage.get("input") or + alt_usage.get("prompt") + ) + output_tokens = ( + alt_usage.get("output_tokens") or + alt_usage.get("completion_tokens") or + alt_usage.get("output") or + alt_usage.get("completion") + ) + + if input_tokens is not None and output_tokens is not None: + total_tokens = alt_usage.get("total_tokens") or alt_usage.get("total") or (input_tokens + output_tokens) + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": total_tokens, + } + self.log(f"Extracted alternative usage data: {usage}") + return usage + + self.log("No usage data found in any expected location") + if self.valves.debug: + # Log the full body structure to help debug + self.log(f"Full response body keys: {list(body.keys())}") + if "messages" in body and body["messages"]: + last_message = body["messages"][-1] if body["messages"] else {} + self.log(f"Last message keys: {list(last_message.keys()) if isinstance(last_message, dict) else 'Not a dict'}") + + return None + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: """ Inlet handles the incoming request (usually a user message). @@ -284,28 +508,27 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: # Body copy for span span_body = body.copy() - # Get the last assistant message from the conversation - assistant_message_obj = get_last_assistant_message_obj(body["messages"]) - - # Extract usage if available - usage = None - self.log(f"Assistant message obj: {assistant_message_obj}") - if assistant_message_obj: - message_usage = assistant_message_obj.get("usage", {}) - if isinstance(message_usage, dict): - input_tokens = message_usage.get( - "prompt_eval_count" - ) or message_usage.get("prompt_tokens") - output_tokens = message_usage.get("eval_count") or message_usage.get( - "completion_tokens" - ) - if input_tokens is not None and output_tokens is not None: - usage = { - "prompt_tokens": input_tokens, - "completion_tokens": output_tokens, - "total_tokens": input_tokens + output_tokens, - } - self.log(f"Usage data extracted: {usage}") + # FIXED: Extract usage data using improved method that supports multiple providers + metadata = body.get("metadata", {}) + usage = self.extract_usage_data(body, metadata) + + # Add provider and model information to usage data for better analytics + if usage: + provider = self.detect_provider_type(body, metadata) + model_info = metadata.get("model", {}) + + # Enhance usage data with provider context + usage.update({ + "provider": provider, + "model_id": model_info.get("id", "unknown"), + "model_name": model_info.get("name", "unknown"), + }) + self.log(f"Enhanced usage data with provider info: {usage}") + + if usage: + self.log(f"Successfully extracted usage data: {usage}") + else: + self.log("No usage data found - this might indicate an API integration issue") # Chat_id is already logged as trace thread span_body.pop("chat_id", None)