Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
825b602
feat(crewai): Enable Tracing Support For CrewAI AgentAction
ialisaleh Oct 16, 2025
883c73b
feat(crewai): WIP - Verify process_llm_response Method Approach
ialisaleh Oct 17, 2025
c5eb1ba
feat(crewai): WIP - Fix process_llm_response Method Approach
ialisaleh Oct 17, 2025
df856e4
feat(crewai): Refactor _handle_agent_action Method Approach
ialisaleh Oct 20, 2025
692cf5e
feat(crewai): Fix Ruff Format Check Errors
ialisaleh Oct 20, 2025
8610d3b
feat(crewai): Fix Ruff Format Check Errors
ialisaleh Oct 20, 2025
725f42a
feat(crewai): Fix MyPy Check Errors
ialisaleh Oct 20, 2025
0c6473d
feat(crewai): Fix Testcase
ialisaleh Oct 20, 2025
7faa184
feat(crewai): Updated Testcase
ialisaleh Oct 20, 2025
009fa4e
feat(crewai): Fix Ruff Format Check Errors
ialisaleh Oct 20, 2025
d2a71ac
Merge branch 'Arize-ai:main' into alisaleh/ticket-2037
ialisaleh Oct 21, 2025
637cf29
feat(crewai): Fix MyPy Check Errors
ialisaleh Oct 21, 2025
80d31d6
feat(crewai): Resolve Issues & Add Testcase For AgentAction
ialisaleh Oct 21, 2025
9df7efd
feat(crewai): Fix Errors & Add AgentFinish For Output
ialisaleh Oct 21, 2025
4d63ddd
feat(crewai): Fix Ruff Format Check Errors
ialisaleh Oct 21, 2025
b92bbff
feat(crewai): Refactor & Add _serialize_agent_object Method
ialisaleh Oct 21, 2025
72ce06f
feat(crewai): Fix MyPy Check Errors
ialisaleh Oct 21, 2025
f379059
Merge branch 'Arize-ai:main' into alisaleh/ticket-2037
ialisaleh Oct 21, 2025
91caaee
Merge branch 'Arize-ai:main' into alisaleh/ticket-2037
ialisaleh Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
os.environ["CREWAI_DISABLE_TELEMETRY"] = "true"

# Make sure to set the OPENAI_API_KEY environment variable
os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
os.environ["OPENAI_API_KEY"] = "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"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: API Key Hardcoded in Repository

A real OpenAI API key, identified by the sk-proj-... pattern, was hardcoded and committed to the repository. This exposes sensitive credentials, creating a security vulnerability that could lead to unauthorized API usage.

Fix in Cursor Fix in Web



def create_research_writer_crew(crew_name: Optional[str] = None) -> Crew:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
TraceConfig,
)
from openinference.instrumentation.crewai._wrappers import (
_AgentActionWrapper,
_CrewKickoffWrapper,
_ExecuteCoreWrapper,
_FlowKickoffWrapper,
Expand All @@ -25,6 +26,7 @@

class CrewAIInstrumentor(BaseInstrumentor): # type: ignore
__slots__ = (
"_original_agent_action",
"_original_execute_core",
"_original_crew_kickoff",
"_original_flow_kickoff",
Expand All @@ -47,6 +49,18 @@ def _instrument(self, **kwargs: Any) -> None:
config=config,
)

agent_action_wrapper = _AgentActionWrapper(tracer=self._tracer)
self._original_agent_action = getattr(
import_module("crewai.agents.crew_agent_executor").CrewAgentExecutor,
"_handle_agent_action",
None,
)
wrap_function_wrapper(
module="crewai.agents.crew_agent_executor",
name="CrewAgentExecutor._handle_agent_action",
wrapper=agent_action_wrapper,
)

execute_core_wrapper = _ExecuteCoreWrapper(tracer=self._tracer)
self._original_execute_core = getattr(import_module("crewai").Task, "_execute_core", None)
wrap_function_wrapper(
Expand Down Expand Up @@ -82,6 +96,13 @@ def _instrument(self, **kwargs: Any) -> None:
)

def _uninstrument(self, **kwargs: Any) -> None:
if self._original_agent_action is not None:
crew_agent_executor_module = import_module("crewai.agents.crew_agent_executor")
crew_agent_executor_module.CrewAgentExecutor._handle_agent_action = (
self._original_agent_action
)
self._original_agent_action = None

if self._original_execute_core is not None:
task_module = import_module("crewai")
task_module.Task._execute_core = self._original_execute_core
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from dataclasses import asdict, is_dataclass
from enum import Enum
from inspect import signature
from typing import Any, Callable, Iterator, List, Mapping, Optional, Tuple, cast
Expand All @@ -13,7 +14,11 @@
get_output_attributes,
safe_json_dumps,
)
from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes
from openinference.semconv.trace import (
OpenInferenceMimeTypeValues,
OpenInferenceSpanKindValues,
SpanAttributes,
)


class SafeJSONEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -158,6 +163,32 @@ def _get_execute_core_span_name(instance: Any, wrapped: Callable[..., Any], agen
return str(base_method)


def _get_agent_action(obj: Any) -> Tuple[str | None, dict[str, Any] | None]:
"""Generate a JSON string & Python dict from AgentAction object."""
try:
# Detect class name without importing CrewAI directly
if obj.__class__.__name__ == "AgentAction":
# Handle both dataclass & normal class versions
data = asdict(obj) if is_dataclass(obj) else vars(obj) # type: ignore[call-overload]
return safe_json_dumps(data, cls=SafeJSONEncoder), data
except Exception as e:
return f"SerializationError: {str(e)}", None
return None, None


def _get_agent_finish(obj: Any) -> Tuple[str | None, dict[str, Any] | None]:
"""Generate a JSON string & Python dict from AgentFinish object."""
try:
# Detect class name without importing CrewAI directly
if obj.__class__.__name__ == "AgentFinish":
# Handle both dataclass & normal class versions
data = asdict(obj) if is_dataclass(obj) else vars(obj) # type: ignore[call-overload]
return safe_json_dumps(data, cls=SafeJSONEncoder), data
except Exception as e:
return f"SerializationError: {str(e)}", None
return None, None


def _find_parent_agent(current_role: str, agents: List[Any]) -> Optional[str]:
for i, a in enumerate(agents):
if a.role == current_role and i != 0:
Expand All @@ -167,6 +198,71 @@ def _find_parent_agent(current_role: str, agents: List[Any]) -> Optional[str]:
return None


class _AgentActionWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer

def __call__(
self,
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
) -> Any:
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)
span_name = "AgentAction"
with self._tracer.start_as_current_span(
span_name,
record_exception=False,
set_status_on_exception=False,
attributes=dict(
_flatten(
{
OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.UNKNOWN,
}
)
),
) as span:
# Get AgentAction object from args
agent_action_obj = args[0] if args else None
agent_action_json, agent_action_dict = _get_agent_action(agent_action_obj)

if agent_action_json:
span.set_attributes(
dict(
get_input_attributes(
agent_action_json, mime_type=OpenInferenceMimeTypeValues.JSON
)
)
)
if agent_action_dict:
span.set_attributes(dict(_flatten(agent_action_dict)))

try:
response = wrapped(*args, **kwargs)
except Exception as exception:
span.set_status(trace_api.Status(trace_api.StatusCode.ERROR, str(exception)))
span.record_exception(exception)
raise

span.set_status(trace_api.StatusCode.OK)

# Get AgentFinish object from response
agent_finish_json, _ = _get_agent_finish(response)

if agent_finish_json:
span.set_attributes(
dict(
get_output_attributes(
agent_finish_json, mime_type=OpenInferenceMimeTypeValues.JSON
)
)
)
span.set_attributes(dict(get_attributes_from_context()))
return response


class _ExecuteCoreWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import pytest
from crewai import LLM, Agent, Crew, Task
from crewai.flow.flow import Flow, listen, start # type: ignore[import-untyped]
from crewai.tools import BaseTool # type: ignore[import-untyped]
from crewai.flow.flow import Flow, listen, start
from crewai.tools import BaseTool
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.util._importlib_metadata import entry_points
Expand All @@ -22,7 +22,7 @@
os.environ["CREWAI_DISABLE_TELEMETRY"] = "true"


class MockScrapeWebsiteTool(BaseTool): # type: ignore[misc]
class MockScrapeWebsiteTool(BaseTool):
"""Mock tool to replace ScrapeWebsiteTool and avoid chromadb dependency."""

name: str = "scrape_website"
Expand Down Expand Up @@ -60,7 +60,9 @@ def test_crewai_instrumentation(in_memory_span_exporter: InMemorySpanExporter) -
analyze_task, scrape_task = kickoff_crew()

spans = in_memory_span_exporter.get_finished_spans()
assert len(spans) == 5, f"Expected 5 spans (2 tool + 2 agent + 1 crew), got {len(spans)}"
assert len(spans) == 7, (
f"Expected 7 spans (1 crew + 2 agents + 2 tools + 2 agent_actions), got {len(spans)}"
)

crew_spans = get_spans_by_kind(spans, OpenInferenceSpanKindValues.CHAIN.value)
assert len(crew_spans) == 1
Expand All @@ -75,6 +77,12 @@ def test_crewai_instrumentation(in_memory_span_exporter: InMemorySpanExporter) -
_verify_agent_span(agent_spans[0], agent_spans[0].name, scrape_task.description)
_verify_agent_span(agent_spans[1], agent_spans[1].name, analyze_task.description)

tool_spans = get_spans_by_kind(spans, OpenInferenceSpanKindValues.TOOL.value)
assert len(tool_spans) == 2

agent_action_spans = get_spans_by_kind(spans, OpenInferenceSpanKindValues.UNKNOWN.value)
assert len(agent_action_spans) == 2

# Clear spans exporter
in_memory_span_exporter.clear()

Expand Down Expand Up @@ -149,13 +157,13 @@ def kickoff_crew() -> Tuple[Task, Task]:
def kickoff_flow() -> Flow[Any]:
"""Initialize a CrewAI setup with a minimal Flow."""

class SimpleFlow(Flow[Any]): # type: ignore[misc]
@start() # type: ignore[misc]
class SimpleFlow(Flow[Any]):
@start()
def step_one(self) -> str:
"""First step that produces an output."""
return "Step One Output"

@listen(step_one) # type: ignore[misc]
@listen(step_one)
def step_two(self, step_one_output: str) -> str:
"""Second step that consumes the output from first step."""
return f"Step Two Received: {step_one_output}"
Expand Down
Loading