Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdks/python/src/opik/dict_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,13 @@ def split_dict_by_keys(input_dict: Dict, keys: List) -> Tuple[Dict, Dict]:

def _is_dict(item: Any) -> bool:
return isinstance(item, dict)


def add_prefix_to_keys_of_a_dict(
d: Dict[str, Any], prefix: str, delim: str = "."
) -> Dict[str, Any]:
"""
For example, if d = {"key1": "value1"}, prefix = "parent", delim = "."
the result will be {"parent.key1": "value1"}
"""
return {f"{prefix}{delim}{key}": value for key, value in d.items()}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import logging
import inspect
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
)

# from openai.types.audio import speech_create_params
from typing_extensions import override

from opik import dict_utils, llm_usage
from opik.api_objects import span
from opik.decorator import (
arguments_helpers,
base_track_decorator,
)
from opik.types import LLMProvider
from openai import Stream, AsyncStream
from openai._response import ResponseContextManager

from . import stream_patchers

LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["input"]
RESPONSE_KEYS_TO_LOG_AS_OUTPUT = []


class OpenaiAudioSpeechTrackDecorator(base_track_decorator.BaseTrackDecorator):
"""
An implementation of BaseTrackDecorator designed specifically for tracking
calls of OpenAI's `audio.speech.create` function.
"""

def __init__(self) -> None:
super().__init__()
self.provider = "openai"

@override
def _start_span_inputs_preprocessor(
self,
func: Callable,
track_options: arguments_helpers.TrackOptions,
args: Optional[Tuple],
kwargs: Optional[Dict[str, Any]],
) -> arguments_helpers.StartSpanParameters:
assert (
kwargs is not None
), "Expected kwargs to be not None in audio.speech.create(**kwargs)"

name = track_options.name if track_options.name is not None else func.__name__
metadata = track_options.metadata if track_options.metadata is not None else {}

input, new_metadata = dict_utils.split_dict_by_keys(
kwargs, keys=KWARGS_KEYS_TO_LOG_AS_INPUTS
)
metadata = dict_utils.deepmerge(metadata, new_metadata)
metadata.update(
{
"created_from": "openai",
"type": "openai_audio_speech",
}
)
tags = ["openai"]

result = arguments_helpers.StartSpanParameters(
name=name,
input=input,
type=track_options.type,
tags=tags,
metadata=metadata,
project_name=track_options.project_name,
model=kwargs.get("model", None),
provider=self.provider,
)

return result

@override
def _end_span_inputs_preprocessor(
self,
output: Any,
capture_output: bool,
current_span_data: span.SpanData,
) -> arguments_helpers.EndSpanParameters:
opik_usage = None
if current_span_data.input and current_span_data.input.get("input"):
opik_usage = llm_usage.try_build_opik_usage_or_log_error(
provider=LLMProvider.OPENAI,
usage={"total_tokens": len(current_span_data.input["input"])},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct (already mentioned in the tests comment).
We need to pass the real usage data from the response here.

logger=LOGGER,
error_message="Failed to log token usage from openai call",
)
result = arguments_helpers.EndSpanParameters(
output={},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be None, let's encode it with base64.

usage=opik_usage,
metadata={},
model=current_span_data.model,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should extract the exact model name from the provider response

provider=self.provider,
)
return result

@override
def _streams_handler(
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], str]],
) -> Optional[Union[Iterator, AsyncIterator]]:
if not capture_output:
return output

if isinstance(output, (Stream, ResponseContextManager)):
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return stream_patchers.patch_sync_stream(
stream=output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
generations_aggregator=generations_aggregator,
finally_callback=self._after_call,
)
if isinstance(output, AsyncStream):
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return stream_patchers.patch_async_stream(
stream=output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
generations_aggregator=generations_aggregator,
finally_callback=self._after_call,
)

return None

async def _acall_and_repack(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
if inspect.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
return self._handle_response(result=result, **kwargs)

def _handle_response(self, result: Any, **kwargs: Any) -> Any:
is_stream_response = self._is_stream_response(result)
is_stream_manager_response = self._is_stream_manager_response(result)
33 changes: 33 additions & 0 deletions sdks/python/src/opik/integrations/openai/opik_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from . import (
chat_completion_chunks_aggregator,
openai_audio_speech_decorator,
openai_chat_completions_decorator,
speech_chunks_aggregator,
)
import opik.semantic_version as semantic_version

Expand Down Expand Up @@ -44,12 +46,43 @@ def track_openai(

_patch_openai_chat_completions(openai_client, project_name)

if hasattr(openai_client, "audio"):
_patch_openai_audio_speech(openai_client, project_name)

if hasattr(openai_client, "responses"):
_patch_openai_responses(openai_client, project_name)

return openai_client


def _patch_openai_audio_speech(
openai_client: OpenAIClient,
project_name: Optional[str] = None,
) -> None:
audio_speech_decorator_factory = (
openai_audio_speech_decorator.OpenaiAudioSpeechTrackDecorator()
)
if openai_client.base_url.host != "api.openai.com":
audio_speech_decorator_factory.provider = openai_client.base_url.host

audio_speech_create_decorator = audio_speech_decorator_factory.track(
type="llm",
name="audio_speech_create",
generations_aggregator=speech_chunks_aggregator.aggregate,
project_name=project_name,
)

openai_client.audio.speech.create = audio_speech_create_decorator(
openai_client.audio.speech.create
)
if hasattr(openai_client.audio.speech, "with_streaming_response"):
openai_client.audio.speech.with_streaming_response.create = (
audio_speech_create_decorator(
openai_client.audio.speech.with_streaming_response.create
)
)


def _patch_openai_chat_completions(
openai_client: OpenAIClient,
project_name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import logging
from typing import Any, Dict, List, Optional

import pydantic

LOGGER = logging.getLogger(__name__)


class SpeechChunksAggregated(pydantic.BaseModel):
pass


def aggregate(chunks: List[bytes]) -> SpeechChunksAggregated:
return SpeechChunksAggregated()
40 changes: 38 additions & 2 deletions sdks/python/src/opik/integrations/openai/stream_patchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
from opik.decorator import generator_wrappers, error_info_collector
import functools
import openai
from openai._response import ResponseContextManager
import openai.lib.streaming.chat


LOGGER = logging.getLogger(__name__)

# Raw low-level stream methods
original_stream_iter_method = openai.Stream.__iter__
original_async_stream_aiter_method = openai.AsyncStream.__aiter__
if hasattr(ResponseContextManager, "__enter__"):
original_sync_context_manager_enter_method = ResponseContextManager.__enter__
if hasattr(ResponseContextManager, "__aenter__"):
original_async_context_manager_aenter_method = ResponseContextManager.__aenter__

# Stream manager (factory object) methods
original_chat_completion_stream_manager_enter_method = (
Expand Down Expand Up @@ -43,6 +47,31 @@ def patch_sync_stream(
```

"""
if isinstance(stream, ResponseContextManager):

def ContextManager__enter__decorator(dunder_enter_func: Callable) -> Callable:
@functools.wraps(dunder_enter_func)
def wrapper(
self: ResponseContextManager,
) -> Iterator[StreamItem]:
response = dunder_enter_func(self)
return patch_sync_stream(
response,
self.span_to_end,
self.trace_to_end,
generations_aggregator,
finally_callback,
)

return wrapper

ResponseContextManager.__enter__ = ContextManager__enter__decorator(
original_sync_context_manager_enter_method
)
stream.opik_tracked_instance = True
stream.span_to_end = span_to_end
stream.trace_to_end = trace_to_end
return stream

def Stream__iter__decorator(dunder_iter_func: Callable) -> Callable:
@functools.wraps(dunder_iter_func)
Expand All @@ -52,7 +81,14 @@ def wrapper(
try:
accumulated_items: List[StreamItem] = []
error_info: Optional[ErrorInfoDict] = None
for item in dunder_iter_func(self):
# HACK: a bit ugly, but for openai audio speech, the stream object is not an iterator
# but an object with `iter_bytes` method
items_iterator = (
dunder_iter_func(self)
if not hasattr(self, "iter_bytes")
else self.iter_bytes()
)
for item in items_iterator:
accumulated_items.append(item)
yield item
except Exception as exception:
Expand Down
31 changes: 31 additions & 0 deletions sdks/python/src/opik/llm_usage/openai_audio_speech_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Dict, Any

from opik import dict_utils
import pydantic


class OpenAIAudioSpeechUsage(pydantic.BaseModel):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this class, audio tokens are already part of the existing and known openai token usage dicts.

"""
A class used to represent the token usage of a call to OpenAI's audio speech API.
"""

total_tokens: int

def to_backend_compatible_flat_dict(self, parent_key_prefix: str) -> Dict[str, Any]:
"""
For example:
{
"original_usage.total_tokens": 12,
}
"""
original_usage: Dict[
str, int
] = dict_utils.add_prefix_to_keys_of_a_dict( # type: ignore
self.model_dump(), parent_key_prefix
)

return original_usage

@classmethod
def from_original_usage_dict(cls, usage: Dict[str, Any]) -> "OpenAIAudioSpeechUsage":
return cls(**usage)
15 changes: 15 additions & 0 deletions sdks/python/src/opik/llm_usage/opik_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
unknown_usage,
bedrock_usage,
openai_responses_usage,
openai_audio_speech_usage,
)
import opik.dict_utils as dict_utils

Expand All @@ -17,6 +18,7 @@
anthropic_usage.AnthropicUsage,
bedrock_usage.BedrockUsage,
openai_responses_usage.OpenAIResponsesUsage,
openai_audio_speech_usage.OpenAIAudioSpeechUsage,
unknown_usage.UnknownUsage,
]

Expand Down Expand Up @@ -168,3 +170,16 @@ def from_openai_responses_dict(cls, usage: Dict[str, Any]) -> "OpikUsage":
total_tokens=provider_usage.total_tokens,
provider_usage=provider_usage,
)

@classmethod
def from_openai_audio_speech_dict(cls, usage: Dict[str, Any]) -> "OpikUsage":
provider_usage = (
openai_audio_speech_usage.OpenAIAudioSpeechUsage.from_original_usage_dict(
usage
)
)

return cls(
total_tokens=provider_usage.total_tokens,
provider_usage=provider_usage,
)
1 change: 1 addition & 0 deletions sdks/python/src/opik/llm_usage/opik_usage_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
LLMProvider.OPENAI: [
opik_usage.OpikUsage.from_openai_completions_dict,
opik_usage.OpikUsage.from_openai_responses_dict,
opik_usage.OpikUsage.from_openai_audio_speech_dict,
],
LLMProvider.GOOGLE_VERTEXAI: [opik_usage.OpikUsage.from_google_dict],
LLMProvider.GOOGLE_AI: [opik_usage.OpikUsage.from_google_dict],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
openai
openai-agents
respx
eval-type-backport # to support Python 3.9 TypeError: Unable to evaluate type annotation 'str | None'.

Loading