Skip to content

Commit 8904bac

Browse files
committed
ndjson support
1 parent 79f3578 commit 8904bac

3 files changed

Lines changed: 335 additions & 1 deletion

File tree

README.md

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Micro HTTP server for MicroPython and CPython.
1414
- Event mode for streaming large uploads
1515
- WebSocket support (RFC 6455)
1616
- Server-Sent Events (SSE) streaming
17+
- NDJSON streaming responses (`application/x-ndjson`)
1718
- Memory-efficient (~32KB RAM minimum)
1819

1920
## Installation
@@ -468,9 +469,21 @@ Parameters:
468469
- `retry` - Reconnection time in milliseconds
469470
- Returns `True` on success, `False` if socket is closed
470471

472+
**`response_ndjson(self, headers=None, cookies=None)`**
473+
474+
- Start NDJSON streaming response (`application/x-ndjson`)
475+
- Thin wrapper over `response_stream()` with NDJSON content type
476+
- Returns `True` on success, `False` if socket is closed
477+
478+
**`send_ndjson(self, obj)`**
479+
480+
- Serialize one JSON-serializable value as a single NDJSON line (`json.dumps(obj) + '\n'`)
481+
- `obj` - any JSON-serializable value (dict/list/str/int/float/bool/None)
482+
- Returns `True` on success, `False` if socket is closed
483+
471484
**`response_stream_end(self)`**
472485

473-
- End stream and close connection
486+
- End stream and close connection (used for both SSE and NDJSON)
474487

475488
**`accept_body(self, streaming=False, to_file=None)`** (event mode only)
476489

@@ -575,6 +588,79 @@ data: {"temp": 23.5}
575588
- `: comment` — ignored by client, used for keep-alive pings
576589

577590

591+
## NDJSON Streaming
592+
593+
NDJSON (Newline-Delimited JSON, `application/x-ndjson`) is a minimal one-way server-to-client streaming format: one JSON value per line, separated by `\n`. Compared to SSE it has no field structure (`event:`/`id:`/`retry:`) and no browser-side reconnect API — just raw JSON records. Useful for bulk data exports, log tailing, internal APIs, and any non-browser HTTP client that consumes a stream of records.
594+
595+
### Basic NDJSON Stream
596+
597+
```python
598+
import uhttp.server
599+
600+
server = uhttp.server.HttpServer(port=8080)
601+
602+
while True:
603+
client = server.wait(timeout=0.1)
604+
if not client:
605+
continue
606+
607+
if client.path == '/export':
608+
if client.response_ndjson():
609+
for row in db.iter_rows():
610+
if not client.send_ndjson(row):
611+
break # client disconnected
612+
client.response_stream_end()
613+
else:
614+
client.respond("hello")
615+
```
616+
617+
### Stream Termination
618+
619+
NDJSON has no in-band end-of-stream marker — the client detects end of stream by **TCP connection close** (EOF on `recv()`). Therefore `response_stream_end()` always closes the connection (no keep-alive).
620+
621+
If you need to signal *why* the stream ended (e.g. completion vs. server-initiated abort), send a sentinel record on the application level before closing:
622+
623+
```python
624+
client.send_ndjson({'_end': True, 'reason': 'done'})
625+
client.response_stream_end()
626+
```
627+
628+
### Wire Format
629+
630+
```
631+
{"id":1,"temp":23.5}
632+
{"id":2,"temp":23.7}
633+
{"id":3,"temp":23.6}
634+
```
635+
636+
- One JSON value per line, terminated by `\n`
637+
- No leading/trailing wrapping; concatenation of records is the body
638+
- Each `send_ndjson()` call emits exactly one line
639+
- `json.dumps()` escapes embedded newlines, so records cannot break the framing
640+
641+
### Client Example
642+
643+
```python
644+
import requests, json
645+
with requests.get('http://localhost:8080/export', stream=True) as r:
646+
for line in r.iter_lines():
647+
if line:
648+
record = json.loads(line)
649+
print(record)
650+
```
651+
652+
### NDJSON vs SSE
653+
654+
| Aspect | NDJSON | SSE |
655+
|---------------------|------------------------------|------------------------------|
656+
| Content type | `application/x-ndjson` | `text/event-stream` |
657+
| Per-record metadata | none (just JSON) | `event:`, `id:`, `retry:` |
658+
| Browser API | none (manual `fetch` stream) | `EventSource` w/ auto-reconnect |
659+
| Multi-line payload | not allowed (one line = one record) | `data:` repeated per line |
660+
| End of stream | TCP close | TCP close (or app-level event) |
661+
| Typical use | bulk export, logs, APIs | live UI updates in browser |
662+
663+
578664
## WebSocket Support
579665

580666
uHTTP supports WebSocket connections (RFC 6455) in both event mode and non-event mode.

tests/test_ndjson.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test NDJSON streaming response (server -> client)
4+
"""
5+
import unittest
6+
import socket
7+
import time
8+
import json
9+
import threading
10+
from uhttp import server as uhttp_server
11+
12+
13+
class TestNDJSON(unittest.TestCase):
14+
"""Test suite for NDJSON streaming responses"""
15+
16+
server = None
17+
server_thread = None
18+
nd_clients = []
19+
PORT = 9986
20+
21+
@classmethod
22+
def setUpClass(cls):
23+
cls.server = uhttp_server.HttpServer(port=cls.PORT)
24+
25+
def run_server():
26+
try:
27+
while cls.server:
28+
client = cls.server.wait(timeout=0.1)
29+
30+
if client:
31+
if client.path == '/stream':
32+
if client.response_ndjson():
33+
cls.nd_clients.append({
34+
'client': client,
35+
'counter': 0,
36+
'last_send': time.time(),
37+
'mode': 'dict',
38+
})
39+
elif client.path == '/stream-mixed':
40+
if client.response_ndjson():
41+
cls.nd_clients.append({
42+
'client': client,
43+
'counter': 0,
44+
'last_send': time.time(),
45+
'mode': 'mixed',
46+
})
47+
elif client.path == '/stream-headers':
48+
if client.response_ndjson(
49+
headers={'X-Stream': 'ndjson'}):
50+
cls.nd_clients.append({
51+
'client': client,
52+
'counter': 0,
53+
'last_send': time.time(),
54+
'mode': 'dict',
55+
})
56+
else:
57+
client.respond("Not found", status=404)
58+
59+
for sc in list(cls.nd_clients):
60+
if time.time() - sc['last_send'] > 0.05:
61+
sc['counter'] += 1
62+
mode = sc['mode']
63+
64+
if sc['counter'] >= 4:
65+
sc['client'].response_stream_end()
66+
cls.nd_clients.remove(sc)
67+
elif mode == 'dict':
68+
sc['client'].send_ndjson(
69+
{'n': sc['counter'], 'msg': 'hello'})
70+
sc['last_send'] = time.time()
71+
elif mode == 'mixed':
72+
# rotate through different JSON types
73+
values = [
74+
{'n': sc['counter']},
75+
[1, 2, sc['counter']],
76+
f'string {sc["counter"]}',
77+
sc['counter']]
78+
sc['client'].send_ndjson(
79+
values[(sc['counter'] - 1) % len(values)])
80+
sc['last_send'] = time.time()
81+
82+
except Exception:
83+
pass
84+
85+
cls.server_thread = threading.Thread(target=run_server, daemon=True)
86+
cls.server_thread.start()
87+
time.sleep(0.5)
88+
89+
@classmethod
90+
def tearDownClass(cls):
91+
if cls.server:
92+
cls.server.close()
93+
cls.server = None
94+
95+
def setUp(self):
96+
TestNDJSON.nd_clients = []
97+
98+
def _recv_all(self, sock, timeout=3.0):
99+
sock.settimeout(timeout)
100+
all_data = b""
101+
try:
102+
while True:
103+
chunk = sock.recv(4096)
104+
if not chunk:
105+
break
106+
all_data += chunk
107+
except socket.timeout:
108+
pass
109+
return all_data
110+
111+
def _make_request(self, path):
112+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
113+
sock.connect(('localhost', self.PORT))
114+
request = (
115+
f"GET {path} HTTP/1.1\r\n"
116+
f"Host: localhost\r\n"
117+
f"Connection: close\r\n"
118+
f"\r\n"
119+
).encode()
120+
sock.sendall(request)
121+
return sock
122+
123+
def _split_response(self, data):
124+
"""Split raw HTTP response into (headers_text, body_bytes)"""
125+
sep = data.find(b"\r\n\r\n")
126+
self.assertGreater(sep, 0, "no header/body separator")
127+
return data[:sep].decode(), data[sep + 4:]
128+
129+
def test_stream_headers(self):
130+
"""NDJSON response has correct content-type and cache-control"""
131+
sock = self._make_request('/stream')
132+
try:
133+
data = self._recv_all(sock)
134+
headers, _ = self._split_response(data)
135+
self.assertIn("200 OK", headers)
136+
self.assertIn("content-type: application/x-ndjson", headers)
137+
self.assertIn("cache-control: no-cache", headers)
138+
finally:
139+
sock.close()
140+
141+
def test_custom_headers_passthrough(self):
142+
"""Custom headers passed to response_ndjson appear in response"""
143+
sock = self._make_request('/stream-headers')
144+
try:
145+
data = self._recv_all(sock)
146+
headers, _ = self._split_response(data)
147+
self.assertIn("X-Stream: ndjson", headers)
148+
self.assertIn("content-type: application/x-ndjson", headers)
149+
finally:
150+
sock.close()
151+
152+
def test_ndjson_lines_parse(self):
153+
"""Each body line is a valid JSON object terminated by \\n"""
154+
sock = self._make_request('/stream')
155+
try:
156+
data = self._recv_all(sock)
157+
_, body = self._split_response(data)
158+
159+
# body must end with \n on the last record
160+
self.assertTrue(body.endswith(b"\n"))
161+
162+
lines = body.split(b"\n")
163+
# last element is empty string after trailing \n
164+
self.assertEqual(lines[-1], b"")
165+
records = [json.loads(l) for l in lines[:-1]]
166+
167+
self.assertEqual(len(records), 3)
168+
for i, rec in enumerate(records, start=1):
169+
self.assertEqual(rec, {'n': i, 'msg': 'hello'})
170+
finally:
171+
sock.close()
172+
173+
def test_no_embedded_newlines(self):
174+
"""Each NDJSON line contains exactly one record (no embedded \\n)"""
175+
sock = self._make_request('/stream')
176+
try:
177+
data = self._recv_all(sock)
178+
_, body = self._split_response(data)
179+
180+
for line in body.split(b"\n")[:-1]:
181+
self.assertNotIn(b"\n", line)
182+
# must be parseable on its own
183+
json.loads(line)
184+
finally:
185+
sock.close()
186+
187+
def test_mixed_json_types(self):
188+
"""send_ndjson accepts dict/list/str/int per-record"""
189+
sock = self._make_request('/stream-mixed')
190+
try:
191+
data = self._recv_all(sock)
192+
_, body = self._split_response(data)
193+
lines = body.split(b"\n")[:-1]
194+
self.assertEqual(len(lines), 3)
195+
self.assertEqual(json.loads(lines[0]), {'n': 1})
196+
self.assertEqual(json.loads(lines[1]), [1, 2, 2])
197+
self.assertEqual(json.loads(lines[2]), 'string 3')
198+
finally:
199+
sock.close()
200+
201+
def test_stream_end_closes_connection(self):
202+
"""response_stream_end closes the socket after final record"""
203+
sock = self._make_request('/stream')
204+
try:
205+
data = self._recv_all(sock, timeout=3.0)
206+
_, body = self._split_response(data)
207+
self.assertTrue(len(body) > 0)
208+
remaining = sock.recv(1024)
209+
self.assertEqual(remaining, b"")
210+
finally:
211+
sock.close()
212+
213+
214+
if __name__ == '__main__':
215+
unittest.main()

uhttp/server.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
CONTENT_TYPE_MULTIPART_REPLACE = (
3838
'multipart/x-mixed-replace; boundary=' + BOUNDARY)
3939
CONTENT_TYPE_EVENT_STREAM = 'text/event-stream'
40+
CONTENT_TYPE_NDJSON = 'application/x-ndjson'
4041
CACHE_CONTROL = 'cache-control'
4142
CACHE_CONTROL_NO_CACHE = 'no-cache'
4243
LOCATION = 'Location'
@@ -1754,6 +1755,38 @@ def send_event(self, data=None, event=None, event_id=None, retry=None):
17541755
return False
17551756
return True
17561757

1758+
def response_ndjson(self, headers=None, cookies=None):
1759+
"""Start NDJSON streaming response (application/x-ndjson).
1760+
1761+
Thin wrapper over response_stream() with NDJSON content-type.
1762+
Use send_ndjson() to send objects, response_stream_end() to finish.
1763+
1764+
Returns True on success, False if socket is closed.
1765+
"""
1766+
return self.response_stream(
1767+
content_type=CONTENT_TYPE_NDJSON,
1768+
headers=headers, cookies=cookies)
1769+
1770+
def send_ndjson(self, obj):
1771+
"""Send one JSON-serializable object as an NDJSON line.
1772+
1773+
Args:
1774+
obj: any JSON-serializable value (dict/list/str/int/float/bool/None)
1775+
1776+
Returns True on success, False if socket is closed.
1777+
"""
1778+
if self._socket is None:
1779+
return False
1780+
try:
1781+
# two _send() calls reuse _send_buffer so the line goes out as
1782+
# a single TCP segment (same pattern as send_event)
1783+
self._send(_json.dumps(obj))
1784+
self._send('\n')
1785+
except OSError:
1786+
self.close()
1787+
return False
1788+
return True
1789+
17571790
def response_stream_end(self):
17581791
"""End streaming response and close connection"""
17591792
self._is_multipart = False

0 commit comments

Comments
 (0)