Skip to content

Commit 25b5186

Browse files
committed
Patch EventStream._parse_event() and ResponseParser.parse()
1 parent 3587e36 commit 25b5186

File tree

8 files changed

+73
-74
lines changed

8 files changed

+73
-74
lines changed

CHANGES.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Changes
55
^^^^^^^^^^^^^^^^^^^
66
* fully patch ``ClientArgsCreator.get_client_args()``
77
* patch ``AioEndpoint.__init__()``
8-
* patch `ResponseParser` and subclasses
8+
* patch ``EventStream._parse_event()``, ``ResponseParser`` and subclasses
99
* use SPDX license identifier for project metadata
1010

1111
2.21.1 (2025-03-04)

aiobotocore/endpoint.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -226,16 +226,9 @@ async def _do_get_response(self, request, operation_model, context):
226226
customized_response_dict=customized_response_dict,
227227
)
228228
parser = self._response_parser_factory.create_parser(protocol)
229-
230-
if asyncio.iscoroutinefunction(parser.parse):
231-
parsed_response = await parser.parse(
232-
response_dict, operation_model.output_shape
233-
)
234-
else:
235-
parsed_response = parser.parse(
236-
response_dict, operation_model.output_shape
237-
)
238-
229+
parsed_response = await parser.parse(
230+
response_dict, operation_model.output_shape
231+
)
239232
parsed_response.update(customized_response_dict)
240233

241234
if http_response.status_code >= 300:
@@ -262,11 +255,7 @@ async def _add_modeled_error_fields(
262255
error_shape = service_model.shape_for_error_code(error_code)
263256
if error_shape is None:
264257
return
265-
266-
if asyncio.iscoroutinefunction(parser.parse):
267-
modeled_parse = await parser.parse(response_dict, error_shape)
268-
else:
269-
modeled_parse = parser.parse(response_dict, error_shape)
258+
modeled_parse = await parser.parse(response_dict, error_shape)
270259
# TODO: avoid naming conflicts with ResponseMetadata and Error
271260
parsed_response.update(modeled_parse)
272261

aiobotocore/eventstream.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
EventStreamBuffer,
44
NoInitialResponseError,
55
)
6+
from botocore.exceptions import EventStreamError
67

78

89
class AioEventStream(EventStream):
@@ -14,7 +15,7 @@ def __aiter__(self):
1415

1516
async def __anext__(self):
1617
async for event in self._event_generator:
17-
parsed_event = self._parse_event(event)
18+
parsed_event = await self._parse_event(event)
1819
if parsed_event:
1920
yield parsed_event
2021

@@ -25,6 +26,16 @@ async def _create_raw_event_generator(self):
2526
for event in event_stream_buffer:
2627
yield event # unfortunately no yield from async func support
2728

29+
async def _parse_event(self, event):
30+
response_dict = event.to_response_dict()
31+
parsed_response = await self._parser.parse(
32+
response_dict, self._output_shape
33+
)
34+
if response_dict['status_code'] == 200:
35+
return parsed_response
36+
else:
37+
raise EventStreamError(parsed_response, self._operation_name)
38+
2839
async def get_initial_response(self):
2940
try:
3041
async for event in self._event_generator:

aiobotocore/parsers.py

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
lowercase_dict,
1919
)
2020

21+
from ._helpers import resolve_awaitable
2122
from .eventstream import AioEventStream
2223

2324

@@ -32,6 +33,39 @@ def create_parser(protocol):
3233

3334

3435
class AioResponseParser(ResponseParser):
36+
async def parse(self, response, shape):
37+
LOG.debug('Response headers: %s', response['headers'])
38+
LOG.debug('Response body:\n%s', response['body'])
39+
if response['status_code'] >= 301:
40+
if self._is_generic_error_response(response):
41+
parsed = self._do_generic_error_parse(response)
42+
elif self._is_modeled_error_shape(shape):
43+
parsed = self._do_modeled_error_parse(response, shape)
44+
# We don't want to decorate the modeled fields with metadata
45+
return parsed
46+
else:
47+
parsed = self._do_error_parse(response, shape)
48+
else:
49+
parsed = await resolve_awaitable(self._do_parse(response, shape))
50+
51+
# We don't want to decorate event stream responses with metadata
52+
if shape and shape.serialization.get('eventstream'):
53+
return parsed
54+
55+
# Add ResponseMetadata if it doesn't exist and inject the HTTP
56+
# status code and headers from the response.
57+
if isinstance(parsed, dict):
58+
response_metadata = parsed.get('ResponseMetadata', {})
59+
response_metadata['HTTPStatusCode'] = response['status_code']
60+
# Ensure that the http header keys are all lower cased. Older
61+
# versions of urllib3 (< 1.11) would unintentionally do this for us
62+
# (see urllib3#633). We need to do this conversion manually now.
63+
headers = response['headers']
64+
response_metadata['HTTPHeaders'] = lowercase_dict(headers)
65+
parsed['ResponseMetadata'] = response_metadata
66+
self._add_checksum_response_metadata(response, response_metadata)
67+
return parsed
68+
3569
def _create_event_stream(self, response, shape):
3670
parser = self._event_stream_parser
3771
name = response['context'].get('operation_name')
@@ -98,41 +132,6 @@ async def _handle_event_stream(self, response, shape, event_name):
98132
parsed[event_name] = event_stream
99133
return parsed
100134

101-
# this is actually from ResponseParser however for now JSONParser is the
102-
# only class that needs this async
103-
async def parse(self, response, shape):
104-
LOG.debug('Response headers: %s', response['headers'])
105-
LOG.debug('Response body:\n%s', response['body'])
106-
if response['status_code'] >= 301:
107-
if self._is_generic_error_response(response):
108-
parsed = self._do_generic_error_parse(response)
109-
elif self._is_modeled_error_shape(shape):
110-
parsed = self._do_modeled_error_parse(response, shape)
111-
# We don't want to decorate the modeled fields with metadata
112-
return parsed
113-
else:
114-
parsed = self._do_error_parse(response, shape)
115-
else:
116-
parsed = await self._do_parse(response, shape)
117-
118-
# We don't want to decorate event stream responses with metadata
119-
if shape and shape.serialization.get('eventstream'):
120-
return parsed
121-
122-
# Add ResponseMetadata if it doesn't exist and inject the HTTP
123-
# status code and headers from the response.
124-
if isinstance(parsed, dict):
125-
response_metadata = parsed.get('ResponseMetadata', {})
126-
response_metadata['HTTPStatusCode'] = response['status_code']
127-
# Ensure that the http header keys are all lower cased. Older
128-
# versions of urllib3 (< 1.11) would unintentionally do this for us
129-
# (see urllib3#633). We need to do this conversion manually now.
130-
headers = response['headers']
131-
response_metadata['HTTPHeaders'] = lowercase_dict(headers)
132-
parsed['ResponseMetadata'] = response_metadata
133-
self._add_checksum_response_metadata(response, response_metadata)
134-
return parsed
135-
136135

137136
class AioBaseRestParser(BaseRestParser, AioResponseParser):
138137
pass

aiobotocore/response.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,5 @@ async def get_response(operation_model, http_response):
150150
response_dict['body'] = await http_response.content
151151

152152
parser = parsers.create_parser(protocol)
153-
if asyncio.iscoroutinefunction(parser.parse):
154-
parsed = await parser.parse(
155-
response_dict, operation_model.output_shape
156-
)
157-
else:
158-
parsed = parser.parse(response_dict, operation_model.output_shape)
153+
parsed = await parser.parse(response_dict, operation_model.output_shape)
159154
return http_response, parsed

tests/botocore_tests/unit/test_eventstream.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
NoInitialResponseError,
2828
)
2929
from botocore.exceptions import EventStreamError
30-
from botocore.parsers import EventStreamXMLParser
3130

3231
from aiobotocore.eventstream import AioEventStream
32+
from aiobotocore.parsers import AioEventStreamXMLParser
3333

3434
EMPTY_MESSAGE = (
3535
b'\x00\x00\x00\x10\x00\x00\x00\x00\x05\xc2H\xeb}\x98\xc8\xff',
@@ -494,7 +494,7 @@ async def test_event_stream_wrapper_iteration():
494494
b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
495495
b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
496496
)
497-
parser = mock.Mock(spec=EventStreamXMLParser)
497+
parser = mock.Mock(spec=AioEventStreamXMLParser)
498498
output_shape = mock.Mock()
499499
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
500500
events = [e async for e in event_stream]
@@ -510,7 +510,7 @@ async def test_event_stream_wrapper_iteration():
510510

511511
async def test_eventstream_wrapper_iteration_error():
512512
raw_stream = create_mock_raw_stream(ERROR_EVENT_MESSAGE[0])
513-
parser = mock.Mock(spec=EventStreamXMLParser)
513+
parser = mock.Mock(spec=AioEventStreamXMLParser)
514514
parser.parse.return_value = {}
515515
output_shape = mock.Mock()
516516
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
@@ -532,7 +532,7 @@ async def test_event_stream_initial_response():
532532
b'\x05event\x0b:event-type\x07\x00\x10initial-response\r:content-type',
533533
b'\x07\x00\ttext/json{"InitialResponse": "sometext"}\xf6\x98$\x83',
534534
)
535-
parser = mock.Mock(spec=EventStreamXMLParser)
535+
parser = mock.Mock(spec=AioEventStreamXMLParser)
536536
output_shape = mock.Mock()
537537
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
538538
event = await event_stream.get_initial_response()
@@ -551,7 +551,7 @@ async def test_event_stream_initial_response_wrong_type():
551551
b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
552552
b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
553553
)
554-
parser = mock.Mock(spec=EventStreamXMLParser)
554+
parser = mock.Mock(spec=AioEventStreamXMLParser)
555555
output_shape = mock.Mock()
556556
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
557557
with pytest.raises(NoInitialResponseError):
@@ -560,7 +560,7 @@ async def test_event_stream_initial_response_wrong_type():
560560

561561
async def test_event_stream_initial_response_no_event():
562562
raw_stream = create_mock_raw_stream(b'')
563-
parser = mock.Mock(spec=EventStreamXMLParser)
563+
parser = mock.Mock(spec=AioEventStreamXMLParser)
564564
output_shape = mock.Mock()
565565
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
566566
with pytest.raises(NoInitialResponseError):

tests/test_eventstreams.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import botocore.parsers
21
import pytest
32

43
from aiobotocore.eventstream import AioEventStream
4+
from aiobotocore.parsers import AioEventStreamXMLParser
55

66
# TODO once Moto supports either S3 Select or Kinesis SubscribeToShard then
77
# this can be tested against a real AWS API
@@ -50,7 +50,7 @@ async def test_eventstream_chunking(s3_client):
5050
outputshape = s3_client._service_model.operation_model(
5151
operation_name
5252
).output_shape.members['Payload']
53-
parser = botocore.parsers.EventStreamXMLParser()
53+
parser = AioEventStreamXMLParser()
5454
sr = FakeStreamReader(TEST_STREAM_DATA)
5555

5656
event_stream = AioEventStream(sr, outputshape, parser, operation_name)
@@ -79,7 +79,7 @@ async def test_eventstream_no_iter(s3_client):
7979
outputshape = s3_client._service_model.operation_model(
8080
operation_name
8181
).output_shape.members['Payload']
82-
parser = botocore.parsers.EventStreamXMLParser()
82+
parser = AioEventStreamXMLParser()
8383
sr = FakeStreamReader(TEST_STREAM_DATA)
8484

8585
event_stream = AioEventStream(sr, outputshape, parser, operation_name)

tests/test_patches.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,12 @@ def test_protocol_parsers():
722722
'8a9b454943f8ef6e81f5794d641adddd1fdd5248',
723723
},
724724
),
725+
(
726+
EventStream._parse_event,
727+
{
728+
'c5b4e65fe718653a6f4cee4e8647f286f10fae05',
729+
},
730+
),
725731
(
726732
EventStream.get_initial_response,
727733
{
@@ -785,6 +791,12 @@ def test_protocol_parsers():
785791
'5cf11c9acecd1f60a013f6facbe0f294daa3f390',
786792
},
787793
),
794+
(
795+
ResponseParser.parse,
796+
{
797+
'c2153eac3789855f4fc6a816a1f30a6afe0cf969',
798+
},
799+
),
788800
(
789801
ResponseParser._create_event_stream,
790802
{
@@ -827,13 +839,6 @@ def test_protocol_parsers():
827839
'3cf7bb1ecff0d72bafd7e7fd6625595b4060abd6',
828840
},
829841
),
830-
# NOTE, if this hits we need to change our ResponseParser impl in JSONParser
831-
(
832-
JSONParser.parse,
833-
{
834-
'c2153eac3789855f4fc6a816a1f30a6afe0cf969',
835-
},
836-
),
837842
(
838843
RestJSONParser._create_event_stream,
839844
{

0 commit comments

Comments
 (0)