Skip to content

Patch ResponseParser.parse() and others to prepare for upstream bump #1347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
Changes
-------

2.22.0 (2025-04-21)
2.22.0 (2025-04-29)
^^^^^^^^^^^^^^^^^^^
* patch `ResponseParser` and subclasses
* fully patch ``ClientArgsCreator.get_client_args()``
* patch ``AioEndpoint.__init__()``
* patch ``EventStream._parse_event()``, ``ResponseParser`` and subclasses
* use SPDX license identifier for project metadata

2.21.1 (2025-03-04)
Expand Down
3 changes: 2 additions & 1 deletion aiobotocore/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .config import AioConfig
from .endpoint import DEFAULT_HTTP_SESSION_CLS, AioEndpointCreator
from .parsers import create_parser
from .regions import AioEndpointRulesetResolver
from .signers import AioRequestSigner

Expand Down Expand Up @@ -94,7 +95,7 @@ def get_client_args(
serializer = botocore.serialize.create_serializer(
protocol, parameter_validation
)
response_parser = botocore.parsers.create_parser(protocol)
response_parser = create_parser(protocol)

ruleset_resolver = self._build_endpoint_resolver(
endpoints_ruleset_data,
Expand Down
42 changes: 27 additions & 15 deletions aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from aiobotocore.httpchecksum import handle_checksum_body
from aiobotocore.httpsession import AIOHTTPSession
from aiobotocore.parsers import AioResponseParserFactory
from aiobotocore.response import StreamingBody

DEFAULT_HTTP_SESSION_CLS = AIOHTTPSession
Expand Down Expand Up @@ -57,6 +58,28 @@ async def convert_to_response_dict(http_response, operation_model):


class AioEndpoint(Endpoint):
def __init__(
self,
host,
endpoint_prefix,
event_emitter,
response_parser_factory=None,
http_session=None,
):
if response_parser_factory is None:
response_parser_factory = AioResponseParserFactory()

if http_session is None:
raise ValueError('http_session must be provided')

super().__init__(
host=host,
endpoint_prefix=endpoint_prefix,
event_emitter=event_emitter,
response_parser_factory=response_parser_factory,
http_session=http_session,
)

async def close(self):
await self.http_session.close()

Expand Down Expand Up @@ -203,16 +226,9 @@ async def _do_get_response(self, request, operation_model, context):
customized_response_dict=customized_response_dict,
)
parser = self._response_parser_factory.create_parser(protocol)

if asyncio.iscoroutinefunction(parser.parse):
parsed_response = await parser.parse(
response_dict, operation_model.output_shape
)
else:
parsed_response = parser.parse(
response_dict, operation_model.output_shape
)

parsed_response = await parser.parse(
response_dict, operation_model.output_shape
)
parsed_response.update(customized_response_dict)

if http_response.status_code >= 300:
Expand All @@ -239,11 +255,7 @@ async def _add_modeled_error_fields(
error_shape = service_model.shape_for_error_code(error_code)
if error_shape is None:
return

if asyncio.iscoroutinefunction(parser.parse):
modeled_parse = await parser.parse(response_dict, error_shape)
else:
modeled_parse = parser.parse(response_dict, error_shape)
modeled_parse = await parser.parse(response_dict, error_shape)
# TODO: avoid naming conflicts with ResponseMetadata and Error
parsed_response.update(modeled_parse)

Expand Down
13 changes: 12 additions & 1 deletion aiobotocore/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
EventStreamBuffer,
NoInitialResponseError,
)
from botocore.exceptions import EventStreamError


class AioEventStream(EventStream):
Expand All @@ -14,7 +15,7 @@ def __aiter__(self):

async def __anext__(self):
async for event in self._event_generator:
parsed_event = self._parse_event(event)
parsed_event = await self._parse_event(event)
if parsed_event:
yield parsed_event

Expand All @@ -25,6 +26,16 @@ async def _create_raw_event_generator(self):
for event in event_stream_buffer:
yield event # unfortunately no yield from async func support

async def _parse_event(self, event):
response_dict = event.to_response_dict()
parsed_response = await self._parser.parse(
response_dict, self._output_shape
)
if response_dict['status_code'] == 200:
return parsed_response
else:
raise EventStreamError(parsed_response, self._operation_name)

async def get_initial_response(self):
try:
async for event in self._event_generator:
Expand Down
69 changes: 34 additions & 35 deletions aiobotocore/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
lowercase_dict,
)

from ._helpers import resolve_awaitable
from .eventstream import AioEventStream


Expand All @@ -32,6 +33,39 @@ def create_parser(protocol):


class AioResponseParser(ResponseParser):
async def parse(self, response, shape):
LOG.debug('Response headers: %s', response['headers'])
LOG.debug('Response body:\n%s', response['body'])
if response['status_code'] >= 301:
if self._is_generic_error_response(response):
parsed = self._do_generic_error_parse(response)
elif self._is_modeled_error_shape(shape):
parsed = self._do_modeled_error_parse(response, shape)
# We don't want to decorate the modeled fields with metadata
return parsed
else:
parsed = self._do_error_parse(response, shape)
else:
parsed = await resolve_awaitable(self._do_parse(response, shape))

# We don't want to decorate event stream responses with metadata
if shape and shape.serialization.get('eventstream'):
return parsed

# Add ResponseMetadata if it doesn't exist and inject the HTTP
# status code and headers from the response.
if isinstance(parsed, dict):
response_metadata = parsed.get('ResponseMetadata', {})
response_metadata['HTTPStatusCode'] = response['status_code']
# Ensure that the http header keys are all lower cased. Older
# versions of urllib3 (< 1.11) would unintentionally do this for us
# (see urllib3#633). We need to do this conversion manually now.
headers = response['headers']
response_metadata['HTTPHeaders'] = lowercase_dict(headers)
parsed['ResponseMetadata'] = response_metadata
self._add_checksum_response_metadata(response, response_metadata)
return parsed

def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
Expand Down Expand Up @@ -98,41 +132,6 @@ async def _handle_event_stream(self, response, shape, event_name):
parsed[event_name] = event_stream
return parsed

# this is actually from ResponseParser however for now JSONParser is the
# only class that needs this async
async def parse(self, response, shape):
LOG.debug('Response headers: %s', response['headers'])
LOG.debug('Response body:\n%s', response['body'])
if response['status_code'] >= 301:
if self._is_generic_error_response(response):
parsed = self._do_generic_error_parse(response)
elif self._is_modeled_error_shape(shape):
parsed = self._do_modeled_error_parse(response, shape)
# We don't want to decorate the modeled fields with metadata
return parsed
else:
parsed = self._do_error_parse(response, shape)
else:
parsed = await self._do_parse(response, shape)

# We don't want to decorate event stream responses with metadata
if shape and shape.serialization.get('eventstream'):
return parsed

# Add ResponseMetadata if it doesn't exist and inject the HTTP
# status code and headers from the response.
if isinstance(parsed, dict):
response_metadata = parsed.get('ResponseMetadata', {})
response_metadata['HTTPStatusCode'] = response['status_code']
# Ensure that the http header keys are all lower cased. Older
# versions of urllib3 (< 1.11) would unintentionally do this for us
# (see urllib3#633). We need to do this conversion manually now.
headers = response['headers']
response_metadata['HTTPHeaders'] = lowercase_dict(headers)
parsed['ResponseMetadata'] = response_metadata
self._add_checksum_response_metadata(response, response_metadata)
return parsed


class AioBaseRestParser(BaseRestParser, AioResponseParser):
pass
Expand Down
7 changes: 1 addition & 6 deletions aiobotocore/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,5 @@ async def get_response(operation_model, http_response):
response_dict['body'] = await http_response.content

parser = parsers.create_parser(protocol)
if asyncio.iscoroutinefunction(parser.parse):
parsed = await parser.parse(
response_dict, operation_model.output_shape
)
else:
parsed = parser.parse(response_dict, operation_model.output_shape)
parsed = await parser.parse(response_dict, operation_model.output_shape)
return http_response, parsed
12 changes: 6 additions & 6 deletions tests/botocore_tests/unit/test_eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
NoInitialResponseError,
)
from botocore.exceptions import EventStreamError
from botocore.parsers import EventStreamXMLParser

from aiobotocore.eventstream import AioEventStream
from aiobotocore.parsers import AioEventStreamXMLParser

EMPTY_MESSAGE = (
b'\x00\x00\x00\x10\x00\x00\x00\x00\x05\xc2H\xeb}\x98\xc8\xff',
Expand Down Expand Up @@ -494,7 +494,7 @@ async def test_event_stream_wrapper_iteration():
b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
)
parser = mock.Mock(spec=EventStreamXMLParser)
parser = mock.Mock(spec=AioEventStreamXMLParser)
output_shape = mock.Mock()
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
events = [e async for e in event_stream]
Expand All @@ -510,7 +510,7 @@ async def test_event_stream_wrapper_iteration():

async def test_eventstream_wrapper_iteration_error():
raw_stream = create_mock_raw_stream(ERROR_EVENT_MESSAGE[0])
parser = mock.Mock(spec=EventStreamXMLParser)
parser = mock.Mock(spec=AioEventStreamXMLParser)
parser.parse.return_value = {}
output_shape = mock.Mock()
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
Expand All @@ -532,7 +532,7 @@ async def test_event_stream_initial_response():
b'\x05event\x0b:event-type\x07\x00\x10initial-response\r:content-type',
b'\x07\x00\ttext/json{"InitialResponse": "sometext"}\xf6\x98$\x83',
)
parser = mock.Mock(spec=EventStreamXMLParser)
parser = mock.Mock(spec=AioEventStreamXMLParser)
output_shape = mock.Mock()
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
event = await event_stream.get_initial_response()
Expand All @@ -551,7 +551,7 @@ async def test_event_stream_initial_response_wrong_type():
b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
)
parser = mock.Mock(spec=EventStreamXMLParser)
parser = mock.Mock(spec=AioEventStreamXMLParser)
output_shape = mock.Mock()
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
with pytest.raises(NoInitialResponseError):
Expand All @@ -560,7 +560,7 @@ async def test_event_stream_initial_response_wrong_type():

async def test_event_stream_initial_response_no_event():
raw_stream = create_mock_raw_stream(b'')
parser = mock.Mock(spec=EventStreamXMLParser)
parser = mock.Mock(spec=AioEventStreamXMLParser)
output_shape = mock.Mock()
event_stream = AioEventStream(raw_stream, output_shape, parser, '')
with pytest.raises(NoInitialResponseError):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_eventstreams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import botocore.parsers
import pytest

from aiobotocore.eventstream import AioEventStream
from aiobotocore.parsers import AioEventStreamXMLParser

# TODO once Moto supports either S3 Select or Kinesis SubscribeToShard then
# this can be tested against a real AWS API
Expand Down Expand Up @@ -50,7 +50,7 @@ async def test_eventstream_chunking(s3_client):
outputshape = s3_client._service_model.operation_model(
operation_name
).output_shape.members['Payload']
parser = botocore.parsers.EventStreamXMLParser()
parser = AioEventStreamXMLParser()
sr = FakeStreamReader(TEST_STREAM_DATA)

event_stream = AioEventStream(sr, outputshape, parser, operation_name)
Expand Down Expand Up @@ -79,7 +79,7 @@ async def test_eventstream_no_iter(s3_client):
outputshape = s3_client._service_model.operation_model(
operation_name
).output_shape.members['Payload']
parser = botocore.parsers.EventStreamXMLParser()
parser = AioEventStreamXMLParser()
sr = FakeStreamReader(TEST_STREAM_DATA)

event_stream = AioEventStream(sr, outputshape, parser, operation_name)
Expand Down
25 changes: 18 additions & 7 deletions tests/test_patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,12 @@ def test_protocol_parsers():
'5b7701c1f5b3cb2daa6eb307cdbdbbb2e9d33e5f',
},
),
(
Endpoint.__init__,
{
'4bafe9733a02817950f5096612410ec4ebc40f55',
},
),
(
Endpoint.create_request,
{
Expand Down Expand Up @@ -716,6 +722,12 @@ def test_protocol_parsers():
'8a9b454943f8ef6e81f5794d641adddd1fdd5248',
},
),
(
EventStream._parse_event,
{
'c5b4e65fe718653a6f4cee4e8647f286f10fae05',
},
),
(
EventStream.get_initial_response,
{
Expand Down Expand Up @@ -779,6 +791,12 @@ def test_protocol_parsers():
'5cf11c9acecd1f60a013f6facbe0f294daa3f390',
},
),
(
ResponseParser.parse,
{
'c2153eac3789855f4fc6a816a1f30a6afe0cf969',
},
),
(
ResponseParser._create_event_stream,
{
Expand Down Expand Up @@ -821,13 +839,6 @@ def test_protocol_parsers():
'3cf7bb1ecff0d72bafd7e7fd6625595b4060abd6',
},
),
# NOTE, if this hits we need to change our ResponseParser impl in JSONParser
(
JSONParser.parse,
{
'c2153eac3789855f4fc6a816a1f30a6afe0cf969',
},
),
(
RestJSONParser._create_event_stream,
{
Expand Down