Skip to content

Commit 0b9a810

Browse files
committed
ingestion: implement strategy architecture and PydanticAI strategy
1 parent a3df987 commit 0b9a810

File tree

4 files changed

+78
-1
lines changed

4 files changed

+78
-1
lines changed

src/polar_sdk/ingestion/_base.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import atexit
22
import contextlib
3+
import datetime
34
import logging
45
import queue
56
import threading
@@ -12,6 +13,8 @@
1213
if TYPE_CHECKING:
1314
from polar_sdk import EventsModelTypedDict
1415

16+
from .strategies._base import S
17+
1518
logger = logging.getLogger("polar_sdk.ingestion")
1619

1720

@@ -20,7 +23,7 @@ class Ingestion:
2023
Event ingestion client for Polar.
2124
2225
This class handles the ingestion of events into the Polar API without blocking
23-
the main thread, by using background thread sending them in batches.
26+
the main thread, by using a background thread sending them in batches.
2427
2528
:param access_token: The access_token required for authentication
2629
:param server: The server by name to use for all methods
@@ -58,12 +61,23 @@ def __init__(
5861

5962
atexit.register(self.close)
6063

64+
def strategy(self, strategy_class: type["S"], event_name: str) -> "S":
65+
"""
66+
Instantiate a strategy tied to this ingestion client.
67+
68+
:param strategy_class: The strategy class to instantiate.
69+
:param event_name: The name of the events that'll be reported by the strategy.
70+
"""
71+
return strategy_class(event_name, self)
72+
6173
def ingest(self, event: "EventsModelTypedDict") -> None:
6274
"""
6375
Send an event to the ingestion queue.
6476
6577
:param event: The event to send.
6678
"""
79+
if event.get("timestamp") is None:
80+
event["timestamp"] = datetime.datetime.now(datetime.timezone.utc)
6781
self._queue.put(event, block=False)
6882

6983
def flush(self, max_batch_size: Union[int, None] = None) -> None:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from ._base import BaseStrategy
2+
from .pydantic_ai import PydanticAIStrategy
3+
4+
__all__ = ["BaseStrategy", "PydanticAIStrategy"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from typing import Any, TypeVar
2+
3+
from .._base import Ingestion
4+
5+
6+
class BaseStrategy:
7+
"""
8+
Base structure of an event ingestion strategy.
9+
10+
:param event_name: The name of the events that'll be reported by the strategy.
11+
:param ingestion: The ingestion client to use for sending events.
12+
"""
13+
14+
def __init__(self, event_name: str, ingestion: Ingestion) -> None:
15+
self.event_name = event_name
16+
self._ingestion = ingestion
17+
18+
def ingest(self, external_customer_id: str, *args: Any, **kwargs: Any) -> None:
19+
"""
20+
Send an event to the ingestion queue.
21+
22+
:param external_customer_id: The external customer ID to associate with the event.
23+
"""
24+
raise NotImplementedError()
25+
26+
27+
S = TypeVar("S", bound=BaseStrategy)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import Any
2+
3+
from pydantic_ai.agent import AgentRunResult
4+
5+
from ._base import BaseStrategy
6+
7+
8+
class PydanticAIStrategy(BaseStrategy):
9+
"""
10+
An ingestion strategy to report usage from PydanticAI agent runs.
11+
"""
12+
13+
def ingest(self, external_customer_id: str, result: AgentRunResult[Any]) -> None:
14+
"""
15+
Report usage from a PydanticAI agent run.
16+
17+
:param external_customer_id: The external customer ID to associate with the event.
18+
:param result: The result of the agent run.
19+
"""
20+
usage = result.usage()
21+
self._ingestion.ingest(
22+
{
23+
"name": self.event_name,
24+
"external_customer_id": external_customer_id,
25+
"metadata": {
26+
"request_tokens": usage.request_tokens or 0,
27+
"response_tokens": usage.response_tokens or 0,
28+
"total_tokens": usage.total_tokens or 0,
29+
"requests": usage.requests or 0,
30+
},
31+
}
32+
)

0 commit comments

Comments
 (0)