From 006fd3345b1cd12731bbb8ecd367791317c109a9 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Fri, 20 Jun 2025 12:27:20 +0530 Subject: [PATCH 1/2] feat(tracing): resolve merge conflicts and mypy errors --- .../opik/decorator/base_track_decorator.py | 97 ++++++++++++------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/sdks/python/src/opik/decorator/base_track_decorator.py b/sdks/python/src/opik/decorator/base_track_decorator.py index 6cca282f7df..6813464dbb9 100644 --- a/sdks/python/src/opik/decorator/base_track_decorator.py +++ b/sdks/python/src/opik/decorator/base_track_decorator.py @@ -419,56 +419,83 @@ def _after_call( generators_trace_to_end: Optional[trace.TraceData] = None, flush: bool = False, ) -> None: - if self.disabled: - return - try: - if generators_span_to_end is None: - span_data_to_end, trace_data_to_end = pop_end_candidates() - else: - span_data_to_end, trace_data_to_end = ( - generators_span_to_end, - generators_trace_to_end, - ) + self.__after_call_unsafe( + output=output, + error_info=error_info, + capture_output=capture_output, + generators_span_to_end=generators_span_to_end, + generators_trace_to_end=generators_trace_to_end, + flush=flush, + ) + except Exception as exception: + LOGGER.error( + logging_messages.UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION, + output, + str(exception), + exc_info=True, + ) + + def __after_call_unsafe( + self, + output: Optional[Any], + error_info: Optional[ErrorInfoDict], + capture_output: bool, + generators_span_to_end: Optional[span.SpanData] = None, + generators_trace_to_end: Optional[trace.TraceData] = None, + flush: bool = False, + ) -> None: + # Always pop from context to keep it clean, even if tracing is disabled. + if generators_span_to_end is None: + span_data_to_end, trace_data_to_end = pop_end_candidates() + else: + span_data_to_end, trace_data_to_end = ( + generators_span_to_end, + generators_trace_to_end, + ) + + # Now, check if we should actually submit the trace. + if self.disabled or not is_tracing_active(): + return - if output is not None: + if output is not None: + try: end_arguments = self._end_span_inputs_preprocessor( output=output, capture_output=capture_output, current_span_data=span_data_to_end, ) - else: - end_arguments = arguments_helpers.EndSpanParameters( - error_info=error_info + except Exception as e: + LOGGER.error( + logging_messages.UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION, + output, + str(e), + exc_info=True, ) - client = opik_client.get_client_cached() + end_arguments = arguments_helpers.EndSpanParameters( + output={"output": output} + ) + else: + end_arguments = arguments_helpers.EndSpanParameters(error_info=error_info) - span_data_to_end.init_end_time().update( - **end_arguments.to_kwargs(), - ) + client = opik_client.get_client_cached() - client.span(**span_data_to_end.__dict__) + span_data_to_end.init_end_time().update( + **end_arguments.to_kwargs(), + ) - if trace_data_to_end is not None: - trace_data_to_end.init_end_time().update( - **end_arguments.to_kwargs( - ignore_keys=["usage", "model", "provider"] - ), - ) + client.span(**span_data_to_end.as_parameters) - client.trace(**trace_data_to_end.__dict__) + if trace_data_to_end is not None: + trace_data_to_end.init_end_time().update( + **end_arguments.to_kwargs(ignore_keys=["usage", "model", "provider"]), + ) - if flush: - client.flush() + client.trace(**trace_data_to_end.as_parameters) - except Exception as exception: - LOGGER.error( - logging_messages.UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION, - output, - str(exception), - exc_info=True, - ) + if flush: + client.flush() @abc.abstractmethod def _streams_handler( From 2d99f005db7917a255bfd45f8e47972f7e8f97d0 Mon Sep 17 00:00:00 2001 From: utnim2 Date: Fri, 20 Jun 2025 21:27:06 +0530 Subject: [PATCH 2/2] feat: added openai tts --- .../opik/decorator/base_track_decorator.py | 55 +-- sdks/python/src/opik/dict_utils.py | 10 + .../openai/openai_audio_speech_decorator.py | 150 +++++++ .../opik/integrations/openai/opik_tracker.py | 33 ++ .../openai/speech_chunks_aggregator.py | 14 + .../integrations/openai/stream_patchers.py | 39 +- .../llm_usage/openai_audio_speech_usage.py | 31 ++ sdks/python/src/opik/llm_usage/opik_usage.py | 15 + .../src/opik/llm_usage/opik_usage_factory.py | 1 + .../openai/requirements.txt | 1 + .../openai/test_openai_audio_speech.py | 382 ++++++++++++++++++ 11 files changed, 677 insertions(+), 54 deletions(-) create mode 100644 sdks/python/src/opik/integrations/openai/openai_audio_speech_decorator.py create mode 100644 sdks/python/src/opik/integrations/openai/speech_chunks_aggregator.py create mode 100644 sdks/python/src/opik/llm_usage/openai_audio_speech_usage.py create mode 100644 sdks/python/tests/library_integration/openai/test_openai_audio_speech.py diff --git a/sdks/python/src/opik/decorator/base_track_decorator.py b/sdks/python/src/opik/decorator/base_track_decorator.py index 0177fbbbda9..b4097858d2a 100644 --- a/sdks/python/src/opik/decorator/base_track_decorator.py +++ b/sdks/python/src/opik/decorator/base_track_decorator.py @@ -485,33 +485,9 @@ def __after_call_unsafe( generators_trace_to_end: Optional[trace.TraceData] = None, flush: bool = False, ) -> None: - try: - self.__after_call_unsafe( - output=output, - error_info=error_info, - capture_output=capture_output, - generators_span_to_end=generators_span_to_end, - generators_trace_to_end=generators_trace_to_end, - flush=flush, - ) - except Exception as exception: - LOGGER.error( - logging_messages.UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION, - output, - str(exception), - exc_info=True, - ) + if self.disabled: + return - def __after_call_unsafe( - self, - output: Optional[Any], - error_info: Optional[ErrorInfoDict], - capture_output: bool, - generators_span_to_end: Optional[span.SpanData] = None, - generators_trace_to_end: Optional[trace.TraceData] = None, - flush: bool = False, - ) -> None: - # Always pop from context to keep it clean, even if tracing is disabled. if generators_span_to_end is None: span_data_to_end, trace_data_to_end = pop_end_candidates() else: @@ -520,10 +496,6 @@ def __after_call_unsafe( generators_trace_to_end, ) - # Now, check if we should actually submit the trace. - if self.disabled or not is_tracing_active(): - return - if output is not None: try: end_arguments = self._end_span_inputs_preprocessor( @@ -531,14 +503,6 @@ def __after_call_unsafe( capture_output=capture_output, current_span_data=span_data_to_end, ) - except Exception as e: - LOGGER.error( - logging_messages.UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION, - output, - str(e), - exc_info=True, - ) - except Exception as e: LOGGER.error( logging_messages.UNEXPECTED_EXCEPTION_ON_SPAN_FINALIZATION_FOR_TRACKED_FUNCTION, @@ -549,42 +513,27 @@ def __after_call_unsafe( end_arguments = arguments_helpers.EndSpanParameters( output={"output": output} - output={"output": output} ) else: end_arguments = arguments_helpers.EndSpanParameters(error_info=error_info) - else: - end_arguments = arguments_helpers.EndSpanParameters(error_info=error_info) client = opik_client.get_client_cached() - client = opik_client.get_client_cached() - span_data_to_end.init_end_time().update( - **end_arguments.to_kwargs(), - ) span_data_to_end.init_end_time().update( **end_arguments.to_kwargs(), ) - client.span(**span_data_to_end.as_parameters) client.span(**span_data_to_end.as_parameters) - if trace_data_to_end is not None: - trace_data_to_end.init_end_time().update( - **end_arguments.to_kwargs(ignore_keys=["usage", "model", "provider"]), - ) if trace_data_to_end is not None: trace_data_to_end.init_end_time().update( **end_arguments.to_kwargs(ignore_keys=["usage", "model", "provider"]), ) client.trace(**trace_data_to_end.as_parameters) - client.trace(**trace_data_to_end.as_parameters) if flush: client.flush() - if flush: - client.flush() @abc.abstractmethod def _streams_handler( diff --git a/sdks/python/src/opik/dict_utils.py b/sdks/python/src/opik/dict_utils.py index 17e8feb6e94..754f7717cdc 100644 --- a/sdks/python/src/opik/dict_utils.py +++ b/sdks/python/src/opik/dict_utils.py @@ -80,3 +80,13 @@ def split_dict_by_keys(input_dict: Dict, keys: List) -> Tuple[Dict, Dict]: def _is_dict(item: Any) -> bool: return isinstance(item, dict) + + +def add_prefix_to_keys_of_a_dict( + d: Dict[str, Any], prefix: str, delim: str = "." +) -> Dict[str, Any]: + """ + For example, if d = {"key1": "value1"}, prefix = "parent", delim = "." + the result will be {"parent.key1": "value1"} + """ + return {f"{prefix}{delim}{key}": value for key, value in d.items()} diff --git a/sdks/python/src/opik/integrations/openai/openai_audio_speech_decorator.py b/sdks/python/src/opik/integrations/openai/openai_audio_speech_decorator.py new file mode 100644 index 00000000000..acbe5b4eaf6 --- /dev/null +++ b/sdks/python/src/opik/integrations/openai/openai_audio_speech_decorator.py @@ -0,0 +1,150 @@ +import logging +import inspect +from typing import ( + Any, + AsyncIterator, + Callable, + Dict, + Iterator, + List, + Optional, + Tuple, + Union, +) + +# from openai.types.audio import speech_create_params +from typing_extensions import override + +from opik import dict_utils, llm_usage +from opik.api_objects import span +from opik.decorator import ( + arguments_helpers, + base_track_decorator, +) +from opik.types import LLMProvider +from openai import Stream, AsyncStream +from openai._response import ResponseContextManager + +from . import stream_patchers + +LOGGER = logging.getLogger(__name__) + +KWARGS_KEYS_TO_LOG_AS_INPUTS = ["input"] +RESPONSE_KEYS_TO_LOG_AS_OUTPUT = [] + + +class OpenaiAudioSpeechTrackDecorator(base_track_decorator.BaseTrackDecorator): + """ + An implementation of BaseTrackDecorator designed specifically for tracking + calls of OpenAI's `audio.speech.create` function. + """ + + def __init__(self) -> None: + super().__init__() + self.provider = "openai" + + @override + def _start_span_inputs_preprocessor( + self, + func: Callable, + track_options: arguments_helpers.TrackOptions, + args: Optional[Tuple], + kwargs: Optional[Dict[str, Any]], + ) -> arguments_helpers.StartSpanParameters: + assert ( + kwargs is not None + ), "Expected kwargs to be not None in audio.speech.create(**kwargs)" + + name = track_options.name if track_options.name is not None else func.__name__ + metadata = track_options.metadata if track_options.metadata is not None else {} + + input, new_metadata = dict_utils.split_dict_by_keys( + kwargs, keys=KWARGS_KEYS_TO_LOG_AS_INPUTS + ) + metadata = dict_utils.deepmerge(metadata, new_metadata) + metadata.update( + { + "created_from": "openai", + "type": "openai_audio_speech", + } + ) + tags = ["openai"] + + result = arguments_helpers.StartSpanParameters( + name=name, + input=input, + type=track_options.type, + tags=tags, + metadata=metadata, + project_name=track_options.project_name, + model=kwargs.get("model", None), + provider=self.provider, + ) + + return result + + @override + def _end_span_inputs_preprocessor( + self, + output: Any, + capture_output: bool, + current_span_data: span.SpanData, + ) -> arguments_helpers.EndSpanParameters: + opik_usage = None + if current_span_data.input and current_span_data.input.get("input"): + opik_usage = llm_usage.try_build_opik_usage_or_log_error( + provider=LLMProvider.OPENAI, + usage={"total_tokens": len(current_span_data.input["input"])}, + logger=LOGGER, + error_message="Failed to log token usage from openai call", + ) + result = arguments_helpers.EndSpanParameters( + output={}, + usage=opik_usage, + metadata={}, + model=current_span_data.model, + provider=self.provider, + ) + return result + + @override + def _streams_handler( + self, + output: Any, + capture_output: bool, + generations_aggregator: Optional[Callable[[List[Any]], str]], + ) -> Optional[Union[Iterator, AsyncIterator]]: + if not capture_output: + return output + + if isinstance(output, (Stream, ResponseContextManager)): + span_to_end, trace_to_end = base_track_decorator.pop_end_candidates() + return stream_patchers.patch_sync_stream( + stream=output, + span_to_end=span_to_end, + trace_to_end=trace_to_end, + generations_aggregator=generations_aggregator, + finally_callback=self._after_call, + ) + if isinstance(output, AsyncStream): + span_to_end, trace_to_end = base_track_decorator.pop_end_candidates() + return stream_patchers.patch_async_stream( + stream=output, + span_to_end=span_to_end, + trace_to_end=trace_to_end, + generations_aggregator=generations_aggregator, + finally_callback=self._after_call, + ) + + return None + + async def _acall_and_repack(self, func: Callable, *args: Any, **kwargs: Any) -> Any: + if inspect.iscoroutinefunction(func): + result = await func(*args, **kwargs) + else: + result = func(*args, **kwargs) + return self._handle_response(result=result, **kwargs) + + def _handle_response(self, result: Any, **kwargs: Any) -> Any: + is_stream_response = self._is_stream_response(result) + is_stream_manager_response = self._is_stream_manager_response(result) diff --git a/sdks/python/src/opik/integrations/openai/opik_tracker.py b/sdks/python/src/opik/integrations/openai/opik_tracker.py index 55ff865d9b1..3e3a9f5c0cc 100644 --- a/sdks/python/src/opik/integrations/openai/opik_tracker.py +++ b/sdks/python/src/opik/integrations/openai/opik_tracker.py @@ -4,7 +4,9 @@ from . import ( chat_completion_chunks_aggregator, + openai_audio_speech_decorator, openai_chat_completions_decorator, + speech_chunks_aggregator, ) OpenAIClient = TypeVar("OpenAIClient", openai.OpenAI, openai.AsyncOpenAI) @@ -38,12 +40,43 @@ def track_openai( _patch_openai_chat_completions(openai_client, project_name) + if hasattr(openai_client, "audio"): + _patch_openai_audio_speech(openai_client, project_name) + if hasattr(openai_client, "responses"): _patch_openai_responses(openai_client, project_name) return openai_client +def _patch_openai_audio_speech( + openai_client: OpenAIClient, + project_name: Optional[str] = None, +) -> None: + audio_speech_decorator_factory = ( + openai_audio_speech_decorator.OpenaiAudioSpeechTrackDecorator() + ) + if openai_client.base_url.host != "api.openai.com": + audio_speech_decorator_factory.provider = openai_client.base_url.host + + audio_speech_create_decorator = audio_speech_decorator_factory.track( + type="llm", + name="audio_speech_create", + generations_aggregator=speech_chunks_aggregator.aggregate, + project_name=project_name, + ) + + openai_client.audio.speech.create = audio_speech_create_decorator( + openai_client.audio.speech.create + ) + if hasattr(openai_client.audio.speech, "with_streaming_response"): + openai_client.audio.speech.with_streaming_response.create = ( + audio_speech_create_decorator( + openai_client.audio.speech.with_streaming_response.create + ) + ) + + def _patch_openai_chat_completions( openai_client: OpenAIClient, project_name: Optional[str] = None, diff --git a/sdks/python/src/opik/integrations/openai/speech_chunks_aggregator.py b/sdks/python/src/opik/integrations/openai/speech_chunks_aggregator.py new file mode 100644 index 00000000000..61a8ab8a78a --- /dev/null +++ b/sdks/python/src/opik/integrations/openai/speech_chunks_aggregator.py @@ -0,0 +1,14 @@ +import logging +from typing import Any, Dict, List, Optional + +import pydantic + +LOGGER = logging.getLogger(__name__) + + +class SpeechChunksAggregated(pydantic.BaseModel): + pass + + +def aggregate(chunks: List[bytes]) -> SpeechChunksAggregated: + return SpeechChunksAggregated() \ No newline at end of file diff --git a/sdks/python/src/opik/integrations/openai/stream_patchers.py b/sdks/python/src/opik/integrations/openai/stream_patchers.py index 428c29b15cb..6c367509d22 100644 --- a/sdks/python/src/opik/integrations/openai/stream_patchers.py +++ b/sdks/python/src/opik/integrations/openai/stream_patchers.py @@ -6,11 +6,16 @@ from opik.decorator import generator_wrappers, error_info_collector import functools import openai +from openai._response import ResponseContextManager LOGGER = logging.getLogger(__name__) original_stream_iter_method = openai.Stream.__iter__ original_async_stream_aiter_method = openai.AsyncStream.__aiter__ +if hasattr(ResponseContextManager, "__enter__"): + original_sync_context_manager_enter_method = ResponseContextManager.__enter__ +if hasattr(ResponseContextManager, "__aenter__"): + original_async_context_manager_aenter_method = ResponseContextManager.__aenter__ StreamItem = TypeVar("StreamItem") AggregatedResult = TypeVar("AggregatedResult") @@ -32,6 +37,31 @@ def patch_sync_stream( ``` """ + if isinstance(stream, ResponseContextManager): + + def ContextManager__enter__decorator(dunder_enter_func: Callable) -> Callable: + @functools.wraps(dunder_enter_func) + def wrapper( + self: ResponseContextManager, + ) -> Iterator[StreamItem]: + response = dunder_enter_func(self) + return patch_sync_stream( + response, + self.span_to_end, + self.trace_to_end, + generations_aggregator, + finally_callback, + ) + + return wrapper + + ResponseContextManager.__enter__ = ContextManager__enter__decorator( + original_sync_context_manager_enter_method + ) + stream.opik_tracked_instance = True + stream.span_to_end = span_to_end + stream.trace_to_end = trace_to_end + return stream def Stream__iter__decorator(dunder_iter_func: Callable) -> Callable: @functools.wraps(dunder_iter_func) @@ -41,7 +71,14 @@ def wrapper( try: accumulated_items: List[StreamItem] = [] error_info: Optional[ErrorInfoDict] = None - for item in dunder_iter_func(self): + # HACK: a bit ugly, but for openai audio speech, the stream object is not an iterator + # but an object with `iter_bytes` method + items_iterator = ( + dunder_iter_func(self) + if not hasattr(self, "iter_bytes") + else self.iter_bytes() + ) + for item in items_iterator: accumulated_items.append(item) yield item except Exception as exception: diff --git a/sdks/python/src/opik/llm_usage/openai_audio_speech_usage.py b/sdks/python/src/opik/llm_usage/openai_audio_speech_usage.py new file mode 100644 index 00000000000..b7ed365bf61 --- /dev/null +++ b/sdks/python/src/opik/llm_usage/openai_audio_speech_usage.py @@ -0,0 +1,31 @@ +from typing import Dict, Any + +from opik import dict_utils +import pydantic + + +class OpenAIAudioSpeechUsage(pydantic.BaseModel): + """ + A class used to represent the token usage of a call to OpenAI's audio speech API. + """ + + total_tokens: int + + def to_backend_compatible_flat_dict(self, parent_key_prefix: str) -> Dict[str, Any]: + """ + For example: + { + "original_usage.total_tokens": 12, + } + """ + original_usage: Dict[ + str, int + ] = dict_utils.add_prefix_to_keys_of_a_dict( # type: ignore + self.model_dump(), parent_key_prefix + ) + + return original_usage + + @classmethod + def from_original_usage_dict(cls, usage: Dict[str, Any]) -> "OpenAIAudioSpeechUsage": + return cls(**usage) \ No newline at end of file diff --git a/sdks/python/src/opik/llm_usage/opik_usage.py b/sdks/python/src/opik/llm_usage/opik_usage.py index ac39fc18cdd..b62a18c2786 100644 --- a/sdks/python/src/opik/llm_usage/opik_usage.py +++ b/sdks/python/src/opik/llm_usage/opik_usage.py @@ -7,6 +7,7 @@ unknown_usage, bedrock_usage, openai_responses_usage, + openai_audio_speech_usage, ) from opik import dict_utils @@ -16,6 +17,7 @@ anthropic_usage.AnthropicUsage, bedrock_usage.BedrockUsage, openai_responses_usage.OpenAIResponsesUsage, + openai_audio_speech_usage.OpenAIAudioSpeechUsage, unknown_usage.UnknownUsage, ] @@ -141,3 +143,16 @@ def from_openai_responses_dict(cls, usage: Dict[str, Any]) -> "OpikUsage": total_tokens=provider_usage.total_tokens, provider_usage=provider_usage, ) + + @classmethod + def from_openai_audio_speech_dict(cls, usage: Dict[str, Any]) -> "OpikUsage": + provider_usage = ( + openai_audio_speech_usage.OpenAIAudioSpeechUsage.from_original_usage_dict( + usage + ) + ) + + return cls( + total_tokens=provider_usage.total_tokens, + provider_usage=provider_usage, + ) diff --git a/sdks/python/src/opik/llm_usage/opik_usage_factory.py b/sdks/python/src/opik/llm_usage/opik_usage_factory.py index 831336e62b4..194edc25874 100644 --- a/sdks/python/src/opik/llm_usage/opik_usage_factory.py +++ b/sdks/python/src/opik/llm_usage/opik_usage_factory.py @@ -15,6 +15,7 @@ LLMProvider.OPENAI: [ opik_usage.OpikUsage.from_openai_completions_dict, opik_usage.OpikUsage.from_openai_responses_dict, + opik_usage.OpikUsage.from_openai_audio_speech_dict, ], LLMProvider.GOOGLE_VERTEXAI: [opik_usage.OpikUsage.from_google_dict], LLMProvider.GOOGLE_AI: [opik_usage.OpikUsage.from_google_dict], diff --git a/sdks/python/tests/library_integration/openai/requirements.txt b/sdks/python/tests/library_integration/openai/requirements.txt index 0ac042e2517..db3502a07b0 100644 --- a/sdks/python/tests/library_integration/openai/requirements.txt +++ b/sdks/python/tests/library_integration/openai/requirements.txt @@ -1,2 +1,3 @@ openai openai-agents +respx diff --git a/sdks/python/tests/library_integration/openai/test_openai_audio_speech.py b/sdks/python/tests/library_integration/openai/test_openai_audio_speech.py new file mode 100644 index 00000000000..1f59e19ebd7 --- /dev/null +++ b/sdks/python/tests/library_integration/openai/test_openai_audio_speech.py @@ -0,0 +1,382 @@ +import asyncio +import opik +import openai +import pytest +from opik.config import OPIK_PROJECT_DEFAULT_NAME +from opik.integrations.openai import track_openai +from ...testlib import ( + ANY_BUT_NONE, + ANY_DICT, + ANY_STRING, + SpanModel, + TraceModel, + assert_dict_has_keys, + assert_equal, +) + +MODEL_FOR_TESTS = "tts-1" +INPUT_FOR_TESTS = "Hello, world!" + + +def _assert_metadata_contains_required_keys(metadata): + REQUIRED_METADATA_KEYS = [ + "model", + "voice", + "created_from", + "type", + ] + assert_dict_has_keys(metadata, REQUIRED_METADATA_KEYS) + + +@pytest.mark.parametrize( + "project_name, expected_project_name", + [ + (None, OPIK_PROJECT_DEFAULT_NAME), + ("openai-integration-test", "openai-integration-test"), + ], +) +def test_openai_client_audio_speech_create__happyflow( + respx_mock, fake_backend, project_name, expected_project_name +): + respx_mock.post("https://api.aimlapi.com/v1/audio/speech").respond( + 200, content=b"audio data" + ) + + client = openai.OpenAI(api_key="fake-key", base_url="https://api.aimlapi.com/v1") + wrapped_client = track_openai( + openai_client=client, + project_name=project_name, + ) + _ = wrapped_client.audio.speech.create( + model=MODEL_FOR_TESTS, + input=INPUT_FOR_TESTS, + voice="alloy", + ) + + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + type="llm", + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + usage={ + "total_tokens": len(INPUT_FOR_TESTS), + "original_usage.total_tokens": len(INPUT_FOR_TESTS), + }, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[], + model=MODEL_FOR_TESTS, + provider="api.aimlapi.com", + ) + ], + ) + assert len(fake_backend.trace_trees) == 1 + trace_tree = fake_backend.trace_trees[0] + + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + + llm_span_metadata = trace_tree.spans[0].metadata + _assert_metadata_contains_required_keys(llm_span_metadata) + + +def test_openai_client_audio_speech_create__raises_an_error__span_and_trace_finished_gracefully__error_info_is_logged( + respx_mock, + fake_backend, +): + respx_mock.post("https://api.aimlapi.com/v1/audio/speech").respond( + 400, json={"error": "Bad Request"} + ) + client = openai.OpenAI(api_key="fake-key", base_url="https://api.aimlapi.com/v1") + wrapped_client = track_openai(client) + + with pytest.raises(openai.OpenAIError): + _ = wrapped_client.audio.speech.create( + model=MODEL_FOR_TESTS, + input=INPUT_FOR_TESTS, + voice="alloy", + ) + + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output=None, + tags=["openai"], + metadata={ + "created_from": "openai", + "type": "openai_audio_speech", + "model": MODEL_FOR_TESTS, + "voice": "alloy", + }, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + project_name=ANY_BUT_NONE, + error_info={ + "exception_type": ANY_STRING, + "message": ANY_STRING, + "traceback": ANY_STRING, + }, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + type="llm", + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output=None, + tags=["openai"], + metadata={ + "created_from": "openai", + "type": "openai_audio_speech", + "model": MODEL_FOR_TESTS, + "voice": "alloy", + }, + usage=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=ANY_BUT_NONE, + model=MODEL_FOR_TESTS, + provider="api.aimlapi.com", + error_info={ + "exception_type": ANY_STRING, + "message": ANY_STRING, + "traceback": ANY_STRING, + }, + spans=[], + ) + ], + ) + + assert len(fake_backend.trace_trees) == 1 + + trace_tree = fake_backend.trace_trees[0] + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + + +def test_openai_client_audio_speech_create__openai_call_made_in_another_tracked_function__openai_span_attached_to_existing_trace( + respx_mock, + fake_backend, +): + respx_mock.post("https://api.aimlapi.com/v1/audio/speech").respond( + 200, content=b"audio data" + ) + project_name = "openai-integration-test" + + @opik.track(project_name=project_name) + def f(): + client = openai.OpenAI( + api_key="fake-key", base_url="https://api.aimlapi.com/v1" + ) + wrapped_client = track_openai( + openai_client=client, + project_name="openai-integration-test-nested-level", + ) + + _ = wrapped_client.audio.speech.create( + model=MODEL_FOR_TESTS, + input=INPUT_FOR_TESTS, + voice="alloy", + ) + + f() + + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="f", + input={}, + output=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name="f", + input={}, + output=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + model=None, + provider=None, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + type="llm", + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + usage={ + "total_tokens": len(INPUT_FOR_TESTS), + "original_usage.total_tokens": len(INPUT_FOR_TESTS), + }, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + model=MODEL_FOR_TESTS, + provider="api.aimlapi.com", + ) + ], + ) + ], + ) + + assert len(fake_backend.trace_trees) == 1 + + trace_tree = fake_backend.trace_trees[0] + + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + + llm_span_metadata = trace_tree.spans[0].spans[0].metadata + _assert_metadata_contains_required_keys(llm_span_metadata) + + +def test_openai_client_audio_speech_create__stream_mode_is_on__generator_tracked_correctly( + respx_mock, + fake_backend, +): + respx_mock.post("https://api.aimlapi.com/v1/audio/speech").respond( + 200, content=b"audio data" + ) + client = openai.OpenAI(api_key="fake-key", base_url="https://api.aimlapi.com/v1") + wrapped_client = track_openai(client) + + with wrapped_client.audio.speech.with_streaming_response.create( + model=MODEL_FOR_TESTS, + input=INPUT_FOR_TESTS, + voice="alloy", + ) as stream: + for item in stream.iter_bytes(): + pass + + opik.flush_tracker() + + assert len(fake_backend.trace_trees) == 1 + trace_tree = fake_backend.trace_trees[0] + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + type="llm", + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + usage={ + "total_tokens": len(INPUT_FOR_TESTS), + "original_usage.total_tokens": len(INPUT_FOR_TESTS), + }, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[], + model=MODEL_FOR_TESTS, + provider="api.aimlapi.com", + ) + ], + ) + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + llm_span_metadata = trace_tree.spans[0].metadata + _assert_metadata_contains_required_keys(llm_span_metadata) + + +@pytest.mark.asyncio +async def test_async_openai_client_audio_speech_create__happyflow( + respx_mock, fake_backend +): + respx_mock.post("https://api.aimlapi.com/v1/audio/speech").respond( + 200, content=b"audio data" + ) + client = openai.AsyncOpenAI( + api_key="fake-key", base_url="https://api.aimlapi.com/v1" + ) + wrapped_client = track_openai( + openai_client=client, + ) + + await wrapped_client.audio.speech.create( + model=MODEL_FOR_TESTS, + input=INPUT_FOR_TESTS, + voice="alloy", + ) + + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + type="llm", + name="audio_speech_create", + input={"input": INPUT_FOR_TESTS}, + output={}, + tags=["openai"], + metadata=ANY_DICT, + usage={ + "total_tokens": len(INPUT_FOR_TESTS), + "original_usage.total_tokens": len(INPUT_FOR_TESTS), + }, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[], + model=MODEL_FOR_TESTS, + provider="api.aimlapi.com", + ) + ], + ) + assert len(fake_backend.trace_trees) == 1 + trace_tree = fake_backend.trace_trees[0] + + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + + llm_span_metadata = trace_tree.spans[0].metadata + _assert_metadata_contains_required_keys(llm_span_metadata)