Skip to content

Commit 35c46a2

Browse files
committed
fixes to encodings, implemeneted ws frame parsing with decompression support
1 parent ee54671 commit 35c46a2

File tree

7 files changed

+240
-27
lines changed

7 files changed

+240
-27
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from firegex import *
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/usr/bin/env python3
2+
3+
from firegex.cli import run
4+
5+
if __name__ == "__main__":
6+
run()

fgex-lib/firegex/nfproxy/internals/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ def try_to_call(params:list):
121121
new_params = params.copy()
122122
for ele in params[i]:
123123
new_params[i] = ele
124-
for ele in try_to_call(new_params):
125-
yield ele
124+
yield from try_to_call(new_params)
126125
is_base_call = False
127126
break
128127
if is_base_call:
@@ -166,4 +165,9 @@ def compile(glob:dict) -> None:
166165
internal_data.invalid_encoding_action = glob["FGEX_INVALID_ENCODING_ACTION"]
167166

168167
PacketHandlerResult(glob).reset_result()
168+
169+
def fake_exit(*_a, **_k):
170+
print("WARNING: This function should not be called", flush=True)
171+
172+
glob["exit"] = fake_exit
169173

fgex-lib/firegex/nfproxy/models/http.py

Lines changed: 142 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import io
1212
import zlib
1313
import brotli
14+
from websockets.frames import Frame
15+
from websockets.extensions.permessage_deflate import PerMessageDeflate
16+
from pyllhttp import PAUSED_H2_UPGRADE, PAUSED_UPGRADE
1417

1518
@dataclass
1619
class InternalHTTPMessage:
@@ -33,16 +36,21 @@ class InternalHTTPMessage:
3336
method: str = field(default=str)
3437
content_length: int = field(default=0)
3538
stream: bytes = field(default_factory=bytes)
39+
ws_stream: list[Frame] = field(default_factory=list) # Decoded websocket stream
40+
upgrading_to_h2: bool = field(default=False)
41+
upgrading_to_ws: bool = field(default=False)
3642

3743
@dataclass
3844
class InternalHttpBuffer:
3945
"""Internal class to handle HTTP messages"""
4046
_url_buffer: bytes = field(default_factory=bytes)
41-
_header_fields: dict[bytes, bytes] = field(default_factory=dict)
47+
_raw_header_fields: dict[str, str|list[str]] = field(default_factory=dict)
48+
_header_fields: dict[str, str] = field(default_factory=dict)
4249
_body_buffer: bytes = field(default_factory=bytes)
4350
_status_buffer: bytes = field(default_factory=bytes)
4451
_current_header_field: bytes = field(default_factory=bytes)
4552
_current_header_value: bytes = field(default_factory=bytes)
53+
_ws_packet_stream: bytes = field(default_factory=bytes)
4654

4755
class InternalCallbackHandler():
4856

@@ -52,6 +60,8 @@ class InternalCallbackHandler():
5260
raised_error = False
5361
has_begun = False
5462
messages: deque[InternalHTTPMessage] = deque()
63+
_ws_extentions = None
64+
_ws_raised_error = False
5565

5666
def reset_data(self):
5767
self.msg = InternalHTTPMessage()
@@ -92,14 +102,31 @@ def on_header_value(self, value):
92102

93103
def on_header_value_complete(self):
94104
if self.buffers._current_header_field:
95-
self.buffers._header_fields[self.buffers._current_header_field.decode(errors="ignore")] = self.buffers._current_header_value.decode(errors="ignore")
105+
k, v = self.buffers._current_header_field.decode(errors="ignore"), self.buffers._current_header_value.decode(errors="ignore")
106+
old_value = self.buffers._raw_header_fields.get(k, None)
107+
108+
# raw headers are stored as thay were, considering to check changes between headers encoding
109+
if isinstance(old_value, list):
110+
old_value.append(v)
111+
elif isinstance(old_value, str):
112+
self.buffers._raw_header_fields[k] = [old_value, v]
113+
else:
114+
self.buffers._raw_header_fields[k] = v
115+
116+
# Decoding headers normally
117+
kl = k.lower()
118+
if kl in self.buffers._header_fields:
119+
self.buffers._header_fields[kl] += f", {v}" # Should be considered as a single list separated by commas as said in the RFC
120+
else:
121+
self.buffers._header_fields[kl] = v
122+
96123
self.buffers._current_header_field = b""
97124
self.buffers._current_header_value = b""
98125

99126
def on_headers_complete(self):
100-
self.msg.headers = self.buffers._header_fields
101-
self.msg.lheaders = {k.lower(): v for k, v in self.buffers._header_fields.items()}
102-
self.buffers._header_fields = {}
127+
self.msg.headers = self.buffers._raw_header_fields
128+
self.msg.lheaders = self.buffers._header_fields
129+
self.buffers._raw_header_fields = {}
103130
self.buffers._current_header_field = b""
104131
self.buffers._current_header_value = b""
105132
self.msg.headers_complete = True
@@ -119,6 +146,7 @@ def on_body(self, body: bytes):
119146

120147
def on_message_complete(self):
121148
self.msg.body = self.buffers._body_buffer
149+
self.msg.should_upgrade = self.should_upgrade
122150
self.buffers._body_buffer = b""
123151
encodings = [ele.strip() for ele in self.content_encoding.lower().split(",")]
124152
decode_success = True
@@ -142,7 +170,7 @@ def on_message_complete(self):
142170
print(f"Error decompressing brotli: {e}: skipping", flush=True)
143171
decode_success = False
144172
break
145-
elif enc == "gzip":
173+
elif enc == "gzip" or enc == "x-gzip": #https://datatracker.ietf.org/doc/html/rfc2616#section-3.5
146174
try:
147175
if "gzip" in self.content_encoding.lower():
148176
with gzip.GzipFile(fileobj=io.BytesIO(decoding_body)) as f:
@@ -158,6 +186,8 @@ def on_message_complete(self):
158186
print(f"Error decompressing zstd: {e}: skipping", flush=True)
159187
decode_success = False
160188
break
189+
elif enc == "identity":
190+
pass # No need to do anything https://datatracker.ietf.org/doc/html/rfc2616#section-3.5 (it's possible to be found also if it should't be used)
161191
else:
162192
decode_success = False
163193
break
@@ -214,20 +244,90 @@ def total_size(self) -> int:
214244
def content_length_parsed(self) -> int:
215245
return self.content_length
216246

247+
def _is_input(self) -> bool:
248+
raise NotImplementedError()
249+
217250
def _packet_to_stream(self):
218251
return self.should_upgrade and self.save_body
219252

253+
def _stream_parser(self, data: bytes):
254+
if self.msg.upgrading_to_ws:
255+
if self._ws_raised_error:
256+
self.msg.stream += data
257+
self.msg.total_size += len(data)
258+
return
259+
self.buffers._ws_packet_stream += data
260+
while True:
261+
try:
262+
new_frame, self.buffers._ws_packet_stream = self._parse_websocket_frame(self.buffers._ws_packet_stream)
263+
except Exception as e:
264+
self._ws_raised_error = True
265+
self.msg.stream += self.buffers._ws_packet_stream
266+
self.buffers._ws_packet_stream = b""
267+
self.msg.total_size += len(data)
268+
return
269+
if new_frame is None:
270+
break
271+
self.msg.ws_stream.append(new_frame)
272+
self.msg.total_size += len(new_frame.data)
273+
if self.msg.upgrading_to_h2:
274+
self.msg.total_size += len(data)
275+
self.msg.stream += data
276+
277+
def _parse_websocket_ext(self):
278+
ext_ws = []
279+
req_ext = []
280+
for ele in self.msg.lheaders.get("sec-websocket-extensions", "").split(","):
281+
for xt in ele.split(";"):
282+
req_ext.append(xt.strip().lower())
283+
284+
for ele in req_ext:
285+
if ele == "permessage-deflate":
286+
ext_ws.append(PerMessageDeflate(False, False, 15, 15))
287+
return ext_ws
288+
289+
def _parse_websocket_frame(self, data: bytes) -> tuple[Frame|None, bytes]:
290+
# mask = is_input
291+
if self._ws_extentions is None:
292+
self._ws_extentions = self._parse_websocket_ext()
293+
read_buffering = bytearray()
294+
def read_exact(n: int):
295+
nonlocal read_buffering
296+
buffer = bytearray(read_buffering)
297+
while len(buffer) < n:
298+
data = yield
299+
if data is None:
300+
raise RuntimeError("Should not send None to this generator")
301+
buffer.extend(data)
302+
new_data = bytes(buffer[:n])
303+
read_buffering = buffer[n:]
304+
return new_data
305+
306+
parsing = Frame.parse(read_exact, extensions=self._ws_extentions, mask=self._is_input())
307+
parsing.send(None)
308+
try:
309+
parsing.send(bytearray(data))
310+
except StopIteration as e:
311+
return e.value, read_buffering
312+
313+
return None, read_buffering
314+
220315
def parse_data(self, data: bytes):
221316
if self._packet_to_stream(): # This is a websocket upgrade!
222-
self.msg.message_complete = True # The message is complete but becomed a stream, so need to be called every time a new packet is received
223-
self.msg.total_size += len(data)
224-
self.msg.stream += data #buffering stream
317+
self._stream_parser(data)
225318
else:
226319
try:
227-
self.execute(data)
320+
reason, consumed = self.execute(data)
321+
if reason == PAUSED_UPGRADE:
322+
self.msg.upgrading_to_ws = True
323+
self.msg.message_complete = True
324+
self._stream_parser(data[consumed:])
325+
elif reason == PAUSED_H2_UPGRADE:
326+
self.msg.upgrading_to_h2 = True
327+
self.msg.message_complete = True
328+
self._stream_parser(data[consumed:])
228329
except Exception as e:
229330
self.raised_error = True
230-
print(f"Error parsing HTTP packet: {e} with data {data}", flush=True)
231331
raise e
232332

233333
def pop_message(self):
@@ -241,18 +341,23 @@ class InternalHttpRequest(InternalCallbackHandler, pyllhttp.Request):
241341
def __init__(self):
242342
super(InternalCallbackHandler, self).__init__()
243343
super(pyllhttp.Request, self).__init__()
344+
345+
def _is_input(self):
346+
return True
244347

245348
class InternalHttpResponse(InternalCallbackHandler, pyllhttp.Response):
246349
def __init__(self):
247350
super(InternalCallbackHandler, self).__init__()
248351
super(pyllhttp.Response, self).__init__()
352+
353+
def _is_input(self):
354+
return False
249355

250356
class InternalBasicHttpMetaClass:
251357
"""Internal class to handle HTTP requests and responses"""
252358

253359
def __init__(self, parser: InternalHttpRequest|InternalHttpResponse, msg: InternalHTTPMessage):
254360
self._parser = parser
255-
self.stream = b""
256361
self.raised_error = False
257362
self._message: InternalHTTPMessage|None = msg
258363
self._contructor_hook()
@@ -313,12 +418,32 @@ def keep_alive(self) -> bool:
313418
@property
314419
def should_upgrade(self) -> bool:
315420
"""If the message should upgrade"""
316-
return self._message.should_upgrade
421+
return self._parser.should_upgrade
317422

318423
@property
319424
def content_length(self) -> int|None:
320425
"""Content length of the message"""
321426
return self._message.content_length
427+
428+
@property
429+
def upgrading_to_h2(self) -> bool:
430+
"""If the message is upgrading to HTTP/2"""
431+
return self._message.upgrading_to_h2
432+
433+
@property
434+
def upgrading_to_ws(self) -> bool:
435+
"""If the message is upgrading to Websocket"""
436+
return self._message.upgrading_to_ws
437+
438+
@property
439+
def ws_stream(self) -> list[Frame]:
440+
"""Websocket stream"""
441+
return self._message.ws_stream
442+
443+
@property
444+
def stream(self) -> bytes:
445+
"""Stream of the message"""
446+
return self._message.stream
322447

323448
def get_header(self, header: str, default=None) -> str:
324449
"""Get a header from the message without caring about the case"""
@@ -391,8 +516,8 @@ def _fetch_packet(cls, internal_data: DataStreamCtx):
391516
if not headers_were_set and parser.msg.headers_complete:
392517
messages_tosend.append(parser.msg) # Also the current message needs to be sent due to complete headers
393518

394-
if headers_were_set and parser.msg.message_complete and parser.msg.should_upgrade and parser.save_body:
395-
messages_tosend.append(parser.msg) # Also the current message needs to beacase a websocket stream is going on
519+
if parser._packet_to_stream():
520+
messages_tosend.append(parser.msg) # Also the current message needs to beacase a stream is going on
396521

397522
messages_to_call = len(messages_tosend)
398523

@@ -423,7 +548,7 @@ def method(self) -> bytes:
423548
return self._parser.msg.method
424549

425550
def __repr__(self):
426-
return f"<HttpRequest method={self.method} url={self.url} headers={self.headers} body={self.body} http_version={self.http_version} keep_alive={self.keep_alive} should_upgrade={self.should_upgrade} headers_complete={self.headers_complete} message_complete={self.message_complete} content_length={self.content_length} stream={self.stream}>"
551+
return f"<HttpRequest method={self.method} url={self.url} headers={self.headers} body=[{0 if not self.body else len(self.body)} bytes] http_version={self.http_version} keep_alive={self.keep_alive} should_upgrade={self.should_upgrade} headers_complete={self.headers_complete} message_complete={self.message_complete} content_length={self.content_length} stream={self.stream} ws_stream={self.ws_stream}>"
427552

428553
class HttpResponse(InternalBasicHttpMetaClass):
429554
"""
@@ -445,7 +570,7 @@ def status_code(self) -> int:
445570
return self._parser.msg.status
446571

447572
def __repr__(self):
448-
return f"<HttpResponse status_code={self.status_code} url={self.url} headers={self.headers} body={self.body} http_version={self.http_version} keep_alive={self.keep_alive} should_upgrade={self.should_upgrade} headers_complete={self.headers_complete} message_complete={self.message_complete} content_length={self.content_length} stream={self.stream}>"
573+
return f"<HttpResponse status_code={self.status_code} url={self.url} headers={self.headers} body=[{0 if not self.body else len(self.body)} bytes] http_version={self.http_version} keep_alive={self.keep_alive} should_upgrade={self.should_upgrade} headers_complete={self.headers_complete} message_complete={self.message_complete} content_length={self.content_length} stream={self.stream} ws_stream={self.ws_stream}>"
449574

450575
class HttpRequestHeader(HttpRequest):
451576
"""

fgex-lib/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ zstd # waiting for pull request to be merged
55
brotli # waiting for pull request to be merged
66
watchfiles
77
fgex
8+
websockets
89
pyllhttp

frontend/src/components/NFProxy/NFProxyDocs.tsx

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,10 @@ export const NFProxyDocs = () => {
232232
<strong>url: </strong> The url of the request (read only)
233233
</List.Item>
234234
<List.Item>
235-
<strong>headers: </strong> The headers of the request (read only). The keys and values are exactly the same as the original request (case sensitive).
235+
<strong>headers: </strong> The headers of the request (read only). The keys and values are exactly the same as the original request (case sensitive). (values can be list in case the same header field is repeated)
236236
</List.Item>
237237
<List.Item>
238-
<strong>get_header(key:str, default = None): </strong> A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value.
238+
<strong>get_header(key:str, default = None): </strong> A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value. (if the same header field is repeated, its value is concatenated with a comma, this function will never return a list)
239239
</List.Item>
240240
<List.Item>
241241
<strong>user_agent: </strong> The user agent of the request (read only)
@@ -261,6 +261,15 @@ export const NFProxyDocs = () => {
261261
<List.Item>
262262
<strong>should_upgrade: </strong> It's true if the connection should be upgraded, false if it's not. (read only)
263263
</List.Item>
264+
<List.Item>
265+
<strong>upgrading_to_h2: </strong> It's true if the connection is upgrading to h2, false if it's not. (read only)
266+
</List.Item>
267+
<List.Item>
268+
<strong>ws_stream: </strong> It's a list of websockets.frames.Frame decoded (permessage-deflate is supported). (read only) [<a href="https://websockets.readthedocs.io/en/stable/">docs</a>]
269+
</List.Item>
270+
<List.Item>
271+
<strong>upgrading_to_ws: </strong> It's true if the connection is upgrading to ws, false if it's not. (read only)
272+
</List.Item>
264273
<List.Item>
265274
<strong>method: </strong> The method of the request (read only)
266275
</List.Item>
@@ -294,10 +303,10 @@ export const NFProxyDocs = () => {
294303
<strong>url: </strong> The url of the response (read only)
295304
</List.Item>
296305
<List.Item>
297-
<strong>headers: </strong> The headers of the response (read only). The keys and values are exactly the same as the original response (case sensitive).
306+
<strong>headers: </strong> The headers of the response (read only). The keys and values are exactly the same as the original response (case sensitive). (values can be list in case the same header field is repeated)
298307
</List.Item>
299308
<List.Item>
300-
<strong>get_header(key:str, default = None): </strong> A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value.
309+
<strong>get_header(key:str, default = None): </strong> A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value. (if the same header field is repeated, its value is concatenated with a comma, this function will never return a list)
301310
</List.Item>
302311
<List.Item>
303312
<strong>user_agent: </strong> The user agent of the response (read only)
@@ -323,6 +332,15 @@ export const NFProxyDocs = () => {
323332
<List.Item>
324333
<strong>should_upgrade: </strong> It's true if the connection should be upgraded, false if it's not. (read only)
325334
</List.Item>
335+
<List.Item>
336+
<strong>upgrading_to_h2: </strong> It's true if the connection is upgrading to h2, false if it's not. (read only)
337+
</List.Item>
338+
<List.Item>
339+
<strong>ws_stream: </strong> It's a list of websockets.frames.Frame decoded (permessage-deflate is supported). (read only) [<a href="https://websockets.readthedocs.io/en/stable/">docs</a>]
340+
</List.Item>
341+
<List.Item>
342+
<strong>upgrading_to_ws: </strong> It's true if the connection is upgrading to ws, false if it's not. (read only)
343+
</List.Item>
326344
<List.Item>
327345
<strong>status_code: </strong> The status code of the response (read only) (int)
328346
</List.Item>

0 commit comments

Comments
 (0)