Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
848a8b3
Hard forked httpx_sse, moved into core_utilities, made fixes to line …
aditya-arolkar-swe Oct 6, 2025
574461e
updated parser to: copy http_sse code into output, use new in-house S…
aditya-arolkar-swe Oct 6, 2025
fe130b9
updated changelog
aditya-arolkar-swe Oct 6, 2025
5ea9466
Automated update of seed files
aditya-arolkar-swe Oct 6, 2025
c29f4ae
skip terminator sse events
aditya-arolkar-swe Oct 7, 2025
8e6ad98
Merge branch 'aa/in-house-sse-processing' of https://github.com/fern-…
aditya-arolkar-swe Oct 7, 2025
130ae61
Merge branch 'main' of https://github.com/fern-api/fern into aa/in-ho…
aditya-arolkar-swe Oct 7, 2025
9a8c4e8
typing fixes
aditya-arolkar-swe Oct 7, 2025
6d4128e
Fix formatting issues: remove trailing whitespace and apply ruff form…
aditya-arolkar-swe Oct 7, 2025
dd4c6f4
Automated update of seed files
fern-support Oct 7, 2025
f6491bb
improved changelog language
aditya-arolkar-swe Oct 7, 2025
7951ea7
Merge branch 'aa/in-house-sse-processing' of https://github.com/fern-…
aditya-arolkar-swe Oct 7, 2025
56d26d3
SSE Support arbitrary charsets from the headers - defaulting to utf-8
aditya-arolkar-swe Oct 7, 2025
7188132
added SSE unit tests
aditya-arolkar-swe Oct 7, 2025
ec6d9cc
Merge branch 'main' of https://github.com/fern-api/fern into aa/in-ho…
aditya-arolkar-swe Oct 7, 2025
2089bce
properly copy core/http_sse to src/.../core/http_sse
aditya-arolkar-swe Oct 7, 2025
2ba8ea5
better typing in test_http_sse
aditya-arolkar-swe Oct 7, 2025
77ef32c
formatting
aditya-arolkar-swe Oct 7, 2025
37b1c9b
refactored 'server-sent-event-examples' output (seed gen was broken),…
aditya-arolkar-swe Oct 7, 2025
1b8e8c8
Hard forked httpx_sse, moved into core_utilities, made fixes to line …
aditya-arolkar-swe Oct 6, 2025
6f55245
updated parser to: copy http_sse code into output, use new in-house S…
aditya-arolkar-swe Oct 6, 2025
c837357
updated changelog
aditya-arolkar-swe Oct 6, 2025
86accef
skip terminator sse events
aditya-arolkar-swe Oct 7, 2025
1485998
Automated update of seed files
aditya-arolkar-swe Oct 6, 2025
385ee44
typing fixes
aditya-arolkar-swe Oct 7, 2025
6388b30
Fix formatting issues: remove trailing whitespace and apply ruff form…
aditya-arolkar-swe Oct 7, 2025
abf5eea
improved changelog language
aditya-arolkar-swe Oct 7, 2025
0ad05ab
Automated update of seed files
fern-support Oct 7, 2025
f25543f
SSE Support arbitrary charsets from the headers - defaulting to utf-8
aditya-arolkar-swe Oct 7, 2025
f384986
added SSE unit tests
aditya-arolkar-swe Oct 7, 2025
e0266e0
properly copy core/http_sse to src/.../core/http_sse
aditya-arolkar-swe Oct 7, 2025
d2da4a7
better typing in test_http_sse
aditya-arolkar-swe Oct 7, 2025
706989d
formatting
aditya-arolkar-swe Oct 7, 2025
00aa224
refactored 'server-sent-event-examples' output (seed gen was broken),…
aditya-arolkar-swe Oct 7, 2025
c271c1e
Updated AsyncGenerator typing
aditya-arolkar-swe Oct 8, 2025
2c876f2
Properly generator stream terminator as a string i.e. was returning […
aditya-arolkar-swe Oct 8, 2025
a4bb7d5
updated http_sse's __init__.py generation
aditya-arolkar-swe Oct 8, 2025
b5c5f7f
manual seed updates to fixture 'server-sent-event-examples'
aditya-arolkar-swe Oct 8, 2025
2050678
merges resolved + added fern ignore for sse test
aditya-arolkar-swe Oct 8, 2025
a699f38
Merge branch 'main' of https://github.com/fern-api/fern into aa/in-ho…
aditya-arolkar-swe Oct 8, 2025
35b45dc
Automated update of seed files
fern-support Oct 8, 2025
49cbea8
updated sse tests
aditya-arolkar-swe Oct 8, 2025
c7f85c5
Merge branch 'aa/in-house-sse-processing' of https://github.com/fern-…
aditya-arolkar-swe Oct 8, 2025
940fbca
formatting fixes
aditya-arolkar-swe Oct 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
14 changes: 14 additions & 0 deletions generators/python/core_utilities/shared/http_sse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from ._api import EventSource, aconnect_sse, connect_sse
from ._exceptions import SSEError
from ._models import ServerSentEvent

__version__ = "0.4.1"

__all__ = [
"__version__",
"EventSource",
"connect_sse",
"aconnect_sse",
"ServerSentEvent",
"SSEError",
]
110 changes: 110 additions & 0 deletions generators/python/core_utilities/shared/http_sse/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import re
from contextlib import asynccontextmanager, contextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Iterator, cast

import httpx
from ._decoders import SSEDecoder
from ._exceptions import SSEError
from ._models import ServerSentEvent


class EventSource:
def __init__(self, response: httpx.Response) -> None:
self._response = response

def _check_content_type(self) -> None:
content_type = self._response.headers.get("content-type", "").partition(";")[0]
if "text/event-stream" not in content_type:
raise SSEError(
f"Expected response header Content-Type to contain 'text/event-stream', got {content_type!r}"
)

def _get_charset(self) -> str:
"""Extract charset from Content-Type header, fallback to UTF-8."""
content_type = self._response.headers.get("content-type", "")

# Parse charset parameter using regex
charset_match = re.search(r"charset=([^;\s]+)", content_type, re.IGNORECASE)
if charset_match:
charset = charset_match.group(1).strip("\"'")
# Validate that it's a known encoding
try:
# Test if the charset is valid by trying to encode/decode
"test".encode(charset).decode(charset)
return charset
except (LookupError, UnicodeError):
# If charset is invalid, fall back to UTF-8
pass

# Default to UTF-8 if no charset specified or invalid charset
return "utf-8"

@property
def response(self) -> httpx.Response:
return self._response

def iter_sse(self) -> Iterator[ServerSentEvent]:
self._check_content_type()
decoder = SSEDecoder()
charset = self._get_charset()

buffer = ""
for chunk in self._response.iter_bytes():
# Decode chunk using detected charset
text_chunk = chunk.decode(charset, errors="replace")
buffer += text_chunk

# Process complete lines
while "\n" in buffer:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you test this with a multiline event (like a JSON blob containing a content field with extra lines)? This doesn't appear to handle that to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it out with Cohere's bug. The extra lines in the JSON blob must be escaped ("\n\n") and this works fine. If they are regular double new lines ("\n\n"), we will interpret it as a new event.

line, buffer = buffer.split("\n", 1)
line = line.rstrip("\r")
sse = decoder.decode(line)
# when we reach a "\n\n" => line = ''
# => decoder will attempt to return an SSE Event
if sse is not None:
yield sse

# Process any remaining data in buffer
if buffer.strip():
line = buffer.rstrip("\r")
sse = decoder.decode(line)
if sse is not None:
yield sse

async def aiter_sse(self) -> AsyncGenerator[ServerSentEvent, None]:
self._check_content_type()
decoder = SSEDecoder()
lines = cast(AsyncGenerator[str, None], self._response.aiter_lines())
try:
async for line in lines:
line = line.rstrip("\n")
sse = decoder.decode(line)
if sse is not None:
yield sse
finally:
await lines.aclose()


@contextmanager
def connect_sse(client: httpx.Client, method: str, url: str, **kwargs: Any) -> Iterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"

with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)


@asynccontextmanager
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: str,
**kwargs: Any,
) -> AsyncIterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"

async with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)
59 changes: 59 additions & 0 deletions generators/python/core_utilities/shared/http_sse/_decoders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import List, Optional

from ._models import ServerSentEvent


class SSEDecoder:
def __init__(self) -> None:
self._event = ""
self._data: List[str] = []
self._last_event_id = ""
self._retry: Optional[int] = None

def decode(self, line: str) -> Optional[ServerSentEvent]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decode method should strip any trailing '\r' characters from the input line before processing it, similar to how it's done in the iter_sse method. Consider adding line = line.rstrip('\r') at the beginning of this method.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

# See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501

if not line:
if not self._event and not self._data and not self._last_event_id and self._retry is None:
return None

sse = ServerSentEvent(
event=self._event,
data="\n".join(self._data),
id=self._last_event_id,
retry=self._retry,
)

# NOTE: as per the SSE spec, do not reset last_event_id.
self._event = ""
self._data = []
self._retry = None

return sse

if line.startswith(":"):
return None

fieldname, _, value = line.partition(":")

if value.startswith(" "):
value = value[1:]

if fieldname == "event":
self._event = value
elif fieldname == "data":
self._data.append(value)
elif fieldname == "id":
if "\0" in value:
pass
else:
self._last_event_id = value
elif fieldname == "retry":
try:
self._retry = int(value)
except (TypeError, ValueError):
pass
else:
pass # Field is ignored.

return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import httpx


class SSEError(httpx.TransportError):
pass
15 changes: 15 additions & 0 deletions generators/python/core_utilities/shared/http_sse/_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json
from dataclasses import dataclass
from typing import Any, Optional


@dataclass(frozen=True)
class ServerSentEvent:
event: str = "message"
data: str = ""
id: str = ""
retry: Optional[int] = None

def json(self) -> Any:
"""Parse the data field as JSON."""
return json.loads(self.data)
9 changes: 9 additions & 0 deletions generators/python/sdk/versions.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# yaml-language-server: $schema=../../../fern-versions-yml.schema.json
# For unreleased changes, use unreleased.yml
- version: 4.31.0
changelogEntry:
- summary: |
- Removed external dependency on httpx-sse by bringing SSE handling in-house.
- Fixed SSE handling of events longer than or containing escaped newlines.
type: feat
createdAt: "2025-10-06"
irVersion: 60

- version: 4.30.4-rc1
changelogEntry:
- summary: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from ..context.sdk_generator_context import SdkGeneratorContext
from fern_python.codegen import AST
from fern_python.external_dependencies.httpx_sse import HttpxSSE
from fern_python.external_dependencies.json import Json
from fern_python.generators.sdk.client_generator.constants import CHUNK_VARIABLE, RESPONSE_VARIABLE
from fern_python.generators.sdk.client_generator.pagination.abstract_paginator import (
Expand Down Expand Up @@ -99,7 +98,18 @@ def _handle_success_stream(self, *, writer: AST.NodeWriter, stream_response: ir_
AST.VariableDeclaration(
name=EndpointResponseCodeWriter.EVENT_SOURCE_VARIABLE,
initializer=AST.Expression(
AST.ClassInstantiation(HttpxSSE.EVENT_SOURCE, [AST.Expression(RESPONSE_VARIABLE)])
AST.ClassInstantiation(
class_=AST.ClassReference(
qualified_name_excluding_import=(),
import_=AST.ReferenceImport(
module=AST.Module.local(
*self._context.core_utilities._module_path, "http_sse", "_api"
),
named_import="EventSource",
),
),
args=[AST.Expression(RESPONSE_VARIABLE)],
)
),
),
AST.ForStatement(
Expand All @@ -119,7 +129,7 @@ def _handle_success_stream(self, *, writer: AST.NodeWriter, stream_response: ir_
conditions=[
AST.IfConditionLeaf(
condition=AST.Expression(
f"{EndpointResponseCodeWriter.SSE_VARIABLE}.data == {stream_response_union.terminator}"
f"{EndpointResponseCodeWriter.SSE_VARIABLE}.data == {repr(stream_response_union.terminator)}"
),
code=[AST.ReturnStatement()],
),
Expand All @@ -131,16 +141,77 @@ def _handle_success_stream(self, *, writer: AST.NodeWriter, stream_response: ir_
AST.YieldStatement(
self._context.core_utilities.get_construct(
self._get_streaming_response_data_type(stream_response),
AST.Expression(
Json.loads(
AST.Expression(f"{EndpointResponseCodeWriter.SSE_VARIABLE}.data")
)
),
AST.Expression(f"{EndpointResponseCodeWriter.SSE_VARIABLE}.json()"),
),
),
],
handlers=[
noop_except_handler,
AST.ExceptHandler(
body=[
AST.Expression(
AST.FunctionInvocation(
function_definition=AST.Reference(
qualified_name_excluding_import=(),
import_=AST.ReferenceImport(
module=AST.Module.built_in(("logging",)),
named_import="warning",
),
),
args=[
AST.Expression(
f'f"Skipping SSE event with invalid JSON: {{e}}, sse: {{{EndpointResponseCodeWriter.SSE_VARIABLE}!r}}"'
)
],
)
),
],
exception_type="JSONDecodeError",
name="e",
),
AST.ExceptHandler(
body=[
AST.Expression(
AST.FunctionInvocation(
function_definition=AST.Reference(
qualified_name_excluding_import=(),
import_=AST.ReferenceImport(
module=AST.Module.built_in(("logging",)),
named_import="warning",
),
),
args=[
AST.Expression(
f'f"Skipping SSE event due to model construction error: {{type(e).__name__}}: {{e}}, sse: {{{EndpointResponseCodeWriter.SSE_VARIABLE}!r}}"'
)
],
)
),
],
exception_type="(TypeError, ValueError, KeyError, AttributeError)",
name="e",
),
AST.ExceptHandler(
body=[
AST.Expression(
AST.FunctionInvocation(
function_definition=AST.Reference(
qualified_name_excluding_import=(),
import_=AST.ReferenceImport(
module=AST.Module.built_in(("logging",)),
named_import="error",
),
),
args=[
AST.Expression(
f'f"Unexpected error processing SSE event: {{type(e).__name__}}: {{e}}, sse: {{{EndpointResponseCodeWriter.SSE_VARIABLE}!r}}"'
)
],
)
),
],
exception_type="Exception",
name="e",
),
],
),
],
Expand All @@ -156,7 +227,7 @@ def _handle_success_stream(self, *, writer: AST.NodeWriter, stream_response: ir_
conditions=[
AST.IfConditionLeaf(
condition=AST.Expression(
f"{EndpointResponseCodeWriter.STREAM_TEXT_VARIABLE} == {stream_response_union.terminator}"
f"{EndpointResponseCodeWriter.STREAM_TEXT_VARIABLE} == {repr(stream_response_union.terminator)}"
),
code=[AST.ReturnStatement()],
),
Expand Down
Loading
Loading