Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 28 additions & 36 deletions utils/python-rpc/framework/rpc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright (c) 2018-2024, The Monero Project

#
# All rights reserved.
#
Expand Down Expand Up @@ -33,56 +32,49 @@

class Response(dict):
def __init__(self, d):
for k in d.keys():
if type(d[k]) == dict:
self[k] = Response(d[k])
elif type(d[k]) == list:
self[k] = []
for i in range(len(d[k])):
if type(d[k][i]) == dict:
self[k].append(Response(d[k][i]))
else:
self[k].append(d[k][i])
for k, v in d.items():
if isinstance(v, dict):
self[k] = Response(v)
elif isinstance(v, list):
self[k] = [Response(i) if isinstance(i, dict) else i for i in v]
else:
self[k] = d[k]
self[k] = v

def __getattr__(self, key):
return self[key]
try:
return self[key]
except KeyError:
raise AttributeError(f"'Response' object has no attribute '{key}'")

def __setattr__(self, key, value):
self[key] = value
def __eq__(self, other):
if type(other) == dict:
return self == Response(other)
if self.keys() != other.keys():
return False
for k in self.keys():
if self[k] != other[k]:
return False
return True

class JSONRPC(object):
def __init__(self, url, username=None, password=None):
class JSONRPC:
def __init__(self, url, username=None, password=None, timeout=10):
self.url = url
self.username = username
self.password = password
self.timeout = timeout

def send_request(self, path, inputs, result_field = None):
res = requests.post(
self.url + path,
def send_request(self, path, inputs, result_field=None):
response = requests.post(
f"{self.url}{path}",
data=json.dumps(inputs),
headers={'content-type': 'application/json'},
auth=HTTPDigestAuth(self.username, self.password) if self.username is not None else None)
res = res.json()

assert 'error' not in res, res
auth=HTTPDigestAuth(self.username, self.password) if self.username else None,
timeout=self.timeout
)
response.raise_for_status()

res = response.json()

if 'error' in res:
raise ValueError(f"Error in response: {res['error']}")

if result_field:
res = res[result_field]
return Response(res.get(result_field, {}))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a subtle, and incorrect, behavior change: The previous code would raise a KeyError if the response doesn't contain result_field, whereas the new code returns an empty response, thereby silently ignoring malformed responses. I would suggest reverting this change.


return Response(res)

def send_json_rpc_request(self, inputs):
return self.send_request("/json_rpc", inputs, 'result')



return self.send_request("/json_rpc", inputs, 'result')
Loading