Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ test = [
"opentelemetry-sdk",
"pytest-recording",
"openai",
"ddgs",
"duckduckgo-search",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this to be ddgs as it got renamed for agno

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool.

"yfinance",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ instruments = [
test = [
"beeai-framework >= 0.1.36",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp"
"opentelemetry-exporter-otlp",
"pytest",
"pytest-vcr",
"pytest-asyncio",
"vcrpy"
]

[project.entry-points.opentelemetry_instrumentor]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Optional

if TYPE_CHECKING:
from beeai_framework.context import RunContextFinishEvent, RunContextStartEvent
Expand All @@ -18,7 +18,9 @@
class Processor:
kind: ClassVar[OpenInferenceSpanKindValues] = OpenInferenceSpanKindValues.UNKNOWN

def __init__(self, event: "RunContextStartEvent", meta: "EventMeta"):
def __init__(
self, event: "RunContextStartEvent", meta: "EventMeta", span_name: Optional[str] = None
):
from beeai_framework.context import RunContext

assert isinstance(meta.creator, RunContext)
Expand All @@ -27,7 +29,7 @@ def __init__(self, event: "RunContextStartEvent", meta: "EventMeta"):
assert meta.trace is not None
self.run_id = meta.trace.run_id

self.span = SpanWrapper(name=target_cls.__name__, kind=type(self).kind)
self.span = SpanWrapper(name=span_name or target_cls.__name__, kind=type(self).kind)
self.span.started_at = meta.created_at
self.span.attributes.update(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
from beeai_framework.context import RunContext
from typing_extensions import override

from openinference.instrumentation import safe_json_dumps
from openinference.instrumentation.beeai.processors.base import Processor
from openinference.semconv.trace import (
EmbeddingAttributes,
OpenInferenceSpanKindValues,
SpanAttributes,
)

# TODO: Update to use SpanAttributes.EMBEDDING_INVOCATION_PARAMETERS when released in semconv
_EMBEDDING_INVOCATION_PARAMETERS = "embedding.invocation_parameters"


class EmbeddingModelProcessor(Processor):
kind: ClassVar[OpenInferenceSpanKindValues] = OpenInferenceSpanKindValues.EMBEDDING

def __init__(self, event: "RunContextStartEvent", meta: "EventMeta"):
super().__init__(event, meta)
super().__init__(event, meta, span_name="CreateEmbeddings")

assert isinstance(meta.creator, RunContext)
assert isinstance(meta.creator.instance, EmbeddingModel)
Expand All @@ -34,6 +38,7 @@ def __init__(self, event: "RunContextStartEvent", meta: "EventMeta"):
{
SpanAttributes.EMBEDDING_MODEL_NAME: llm.model_id,
SpanAttributes.LLM_PROVIDER: llm.provider_id,
SpanAttributes.LLM_SYSTEM: "beeai",
}
)

Expand All @@ -45,20 +50,35 @@ async def update(
) -> None:
await super().update(event, meta)

# Add event to the span but don't create child spans
self.span.add_event(f"{meta.name} ({meta.path})", timestamp=meta.created_at)
self.span.child(meta.name, event=(event, meta))

if isinstance(event, EmbeddingModelStartEvent):
# Extract invocation parameters
invocation_params = {}
if hasattr(event.input, "__dict__"):
input_dict = vars(event.input)
# Remove the actual text values from invocation parameters
invocation_params = {k: v for k, v in input_dict.items() if k != "values"}
if invocation_params:
self.span.set_attribute(
_EMBEDDING_INVOCATION_PARAMETERS,
safe_json_dumps(invocation_params),
)

for idx, txt in enumerate(event.input.values):
self.span.set_attribute(
f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.{idx}.{EmbeddingAttributes.EMBEDDING_TEXT}",
txt,
)
elif isinstance(event, EmbeddingModelSuccessEvent):
for idx, embedding in enumerate(event.value.embeddings):
# Ensure the embedding vector is a list, not a tuple
# Always convert to list to handle tuples from BeeAI framework
vector = list(embedding)
self.span.set_attribute(
f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.{idx}.{EmbeddingAttributes.EMBEDDING_VECTOR}",
embedding,
vector,
)

if event.value.usage:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# BeeAI Instrumentation Tests

## Re-recording VCR Cassettes

When tests fail due to outdated VCR cassettes (e.g., API authentication errors or changed responses), follow these steps to re-record:

### Prerequisites
1. Ensure `OPENAI_API_KEY` is set in your environment with a valid API key
2. The `passenv = OPENAI_API_KEY` directive must be present in the root `tox.ini` file

### Steps to Re-record

1. Delete the existing cassette file:
```bash
rm tests/cassettes/test_openai_embeddings.yaml
```

2. Run the tests with VCR in record mode using tox:
```bash
OPENAI_API_KEY=$OPENAI_API_KEY uvx --with tox-uv tox -r -e py313-ci-beeai -- tests/test_instrumentor.py::test_openai_embeddings -xvs --vcr-record=once
```

### Important Notes
- The test reads `OPENAI_API_KEY` from the environment, falling back to "sk-test" if not set
- VCR will cache responses including authentication errors (401), so always delete the cassette before re-recording
- The `--vcr-record=once` flag ensures the cassette is only recorded when it doesn't exist
- Use `-r` flag with tox to ensure a clean environment when re-recording

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Generator

import pytest
from opentelemetry import trace as trace_api
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

from openinference.instrumentation.beeai import BeeAIInstrumentor


@pytest.fixture(scope="session")
def in_memory_span_exporter() -> InMemorySpanExporter:
return InMemorySpanExporter()


@pytest.fixture(scope="session")
def tracer_provider(
in_memory_span_exporter: InMemorySpanExporter,
) -> trace_api.TracerProvider:
tracer_provider = trace_sdk.TracerProvider()
span_processor = SimpleSpanProcessor(span_exporter=in_memory_span_exporter)
tracer_provider.add_span_processor(span_processor=span_processor)
return tracer_provider


@pytest.fixture(autouse=True)
def instrument(
tracer_provider: trace_api.TracerProvider,
in_memory_span_exporter: InMemorySpanExporter,
) -> Generator[None, None, None]:
BeeAIInstrumentor().instrument(tracer_provider=tracer_provider)
in_memory_span_exporter.clear()
yield
BeeAIInstrumentor().uninstrument()
in_memory_span_exporter.clear()

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import os
from typing import Mapping, cast

import pytest
from beeai_framework.adapters.openai import OpenAIEmbeddingModel
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.util.types import AttributeValue

from openinference.semconv.trace import (
EmbeddingAttributes,
OpenInferenceSpanKindValues,
SpanAttributes,
)


@pytest.mark.vcr(
decode_compressed_response=True,
before_record_request=lambda _: _.headers.clear() or _,
before_record_response=lambda _: {**_, "headers": {}},
)
@pytest.mark.asyncio
async def test_openai_embeddings(in_memory_span_exporter: InMemorySpanExporter) -> None:
"""Test that BeeAI OpenAI embeddings are properly traced."""
# API key from environment - only used when re-recording the cassette
# When using the cassette, the key is not needed
api_key = os.getenv("OPENAI_API_KEY", "sk-test")

# Create an embedding model
embedding_model = OpenAIEmbeddingModel(
model_id="text-embedding-3-small",
api_key=api_key,
)

# Create embeddings for test texts
texts = ["Hello world", "Test embedding"]

# Run the embedding request
response = await embedding_model.create(texts)

# Verify we got embeddings back
assert response is not None
assert response.embeddings is not None
assert len(response.embeddings) == 2

# Get the spans
spans = in_memory_span_exporter.get_finished_spans()
assert len(spans) == 1

# Get the single span
openinference_span = spans[0]
assert openinference_span is not None

# Verify span attributes
attributes = dict(cast(Mapping[str, AttributeValue], openinference_span.attributes))

# Check basic attributes as per spec
assert (
attributes.get(SpanAttributes.OPENINFERENCE_SPAN_KIND)
== OpenInferenceSpanKindValues.EMBEDDING.value
)
assert attributes.get(SpanAttributes.EMBEDDING_MODEL_NAME) == "text-embedding-3-small"
assert attributes.get(SpanAttributes.LLM_SYSTEM) == "beeai"
assert attributes.get(SpanAttributes.LLM_PROVIDER) == "openai"

# Check embedding texts
assert (
attributes.get(
f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.0.{EmbeddingAttributes.EMBEDDING_TEXT}"
)
== "Hello world"
)
assert (
attributes.get(
f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.1.{EmbeddingAttributes.EMBEDDING_TEXT}"
)
== "Test embedding"
)

# Check embedding vectors exist and have correct structure
vector_0 = attributes.get(
f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.0.{EmbeddingAttributes.EMBEDDING_VECTOR}"
)
vector_1 = attributes.get(
f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.1.{EmbeddingAttributes.EMBEDDING_VECTOR}"
)

assert vector_0 is not None
assert vector_1 is not None
# Vectors are tuples in the cassette, check exact length from recorded data
assert isinstance(vector_0, (list, tuple))
assert isinstance(vector_1, (list, tuple))
assert len(vector_0) == 1536 # text-embedding-3-small dimension
assert len(vector_1) == 1536 # text-embedding-3-small dimension
# Check first few values are correct floats from cassette
assert vector_0[0] == pytest.approx(-0.002078542485833168)
assert vector_0[1] == pytest.approx(-0.04908587411046028)
assert vector_1[0] == pytest.approx(-0.005330947693437338)
assert vector_1[1] == pytest.approx(-0.03916504979133606)

# Check invocation parameters
invocation_params = attributes.get("embedding.invocation_parameters")
assert isinstance(invocation_params, str)
assert json.loads(invocation_params) == {"abort_signal": None, "max_retries": 0}

# Check token counts
assert attributes.get(SpanAttributes.LLM_TOKEN_COUNT_TOTAL) == 4
assert attributes.get(SpanAttributes.LLM_TOKEN_COUNT_PROMPT) == 4
assert attributes.get(SpanAttributes.LLM_TOKEN_COUNT_COMPLETION) == 0
Loading
Loading