Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
# Changelog
## 1.5.6 /2025-10-08
* Clean Up Error Handling by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/193
* Avoids ID of 'None' in queries by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/196
* Allows AsyncSubstrateInterface's Websocket connection to not automatically shut down by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/197
* return type annotation for `get_metadata_call_function` by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/199
* Change responses["results"] to deque by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/198
* do not attempt to reconnect if there are open subscriptions by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/200

**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.5...v1.5.6

## 1.5.5 /2025-10-06
* Improve timeout task cancellation by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/190

Expand Down
118 changes: 58 additions & 60 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import websockets.exceptions
from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string
from scalecodec import GenericVariant
from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject
from scalecodec.type_registry import load_type_registry_preset
from scalecodec.types import (
Expand Down Expand Up @@ -526,9 +527,9 @@ class Websocket:
def __init__(
self,
ws_url: str,
max_subscriptions=1024,
max_connections=100,
shutdown_timer=5,
max_subscriptions: int = 1024,
max_connections: int = 100,
shutdown_timer: Optional[float] = 5.0,
options: Optional[dict] = None,
_log_raw_websockets: bool = False,
retry_timeout: float = 60.0,
Expand All @@ -542,7 +543,9 @@ def __init__(
ws_url: Websocket URL to connect to
max_subscriptions: Maximum number of subscriptions per websocket connection
max_connections: Maximum number of connections total
shutdown_timer: Number of seconds to shut down websocket connection after last use
shutdown_timer: Number of seconds to shut down websocket connection after last use. If set to `None`, the
connection will never be automatically shut down. Use this for very long-running processes, where you
will manually shut down the connection if ever you intend to close it.
options: Options to pass to the websocket connection
_log_raw_websockets: Whether to log raw websockets in the "raw_websocket" logger
retry_timeout: Timeout in seconds to retry websocket connection
Expand Down Expand Up @@ -643,6 +646,10 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
self._attempts += 1
is_retry = True
if should_reconnect is True:
if len(self._received_subscriptions) > 0:
return SubstrateRequestException(
f"Unable to reconnect because there are currently open subscriptions."
)
for original_id, payload in list(self._inflight.items()):
self._received[original_id] = loop.create_future()
to_send = json.loads(payload)
Expand All @@ -659,25 +666,33 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
return e
elif isinstance(e := send_task.result(), Exception):
return e
elif len(self._received_subscriptions) > 0:
return SubstrateRequestException(
f"Currently open subscriptions while disconnecting. "
f"Ensure these are unsubscribed from before closing in the future."
)
return None

async def __aexit__(self, exc_type, exc_val, exc_tb):
if not self.state != State.CONNECTING:
if self._exit_task is not None:
self._exit_task.cancel()
try:
await self._exit_task
except asyncio.CancelledError:
pass
if self.ws is not None:
self._exit_task = asyncio.create_task(self._exit_with_timer())
if self.shutdown_timer is not None:
if not self.state != State.CONNECTING:
if self._exit_task is not None:
self._exit_task.cancel()
try:
await self._exit_task
except asyncio.CancelledError:
pass
if self.ws is not None:
self._exit_task = asyncio.create_task(self._exit_with_timer())

async def _exit_with_timer(self):
"""
Allows for graceful shutdown of websocket connection after specified number of seconds, allowing
for reuse of the websocket connection.
"""
try:
await asyncio.sleep(self.shutdown_timer)
if self.shutdown_timer is not None:
await asyncio.sleep(self.shutdown_timer)
await self.shutdown()
except asyncio.CancelledError:
pass
Expand Down Expand Up @@ -1407,7 +1422,8 @@ async def retrieve_pending_extrinsics(self) -> list:
runtime = await self.init_runtime()

result_data = await self.rpc_request("author_pendingExtrinsics", [])

if "error" in result_data:
raise SubstrateRequestException(result_data["error"]["message"])
extrinsics = []

for extrinsic_data in result_data["result"]:
Expand Down Expand Up @@ -2141,6 +2157,8 @@ async def get_parent_block_hash(self, block_hash) -> str:

async def _get_parent_block_hash(self, block_hash) -> str:
block_header = await self.rpc_request("chain_getHeader", [block_hash])
if "error" in block_header:
raise SubstrateRequestException(block_header["error"]["message"])

if block_header["result"] is None:
raise SubstrateRequestException(f'Block not found for "{block_hash}"')
Expand Down Expand Up @@ -2172,15 +2190,7 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any:
response = await self.rpc_request(
"state_getStorage", [storage_key, block_hash]
)

if "result" in response:
return response.get("result")
elif "error" in response:
raise SubstrateRequestException(response["error"]["message"])
else:
raise SubstrateRequestException(
"Unknown error occurred during retrieval of events"
)
return response.get("result")

@cached_fetcher(max_size=SUBSTRATE_RUNTIME_CACHE_SIZE)
async def get_block_runtime_info(self, block_hash: str) -> dict:
Expand Down Expand Up @@ -2236,9 +2246,6 @@ async def get_block_metadata(
params = [block_hash]
response = await self.rpc_request("state_getMetadata", params)

if "error" in response:
raise SubstrateRequestException(response["error"]["message"])

if (result := response.get("result")) and decode:
metadata_decoder = runtime_config.create_scale_object(
"MetadataVersioned", data=ScaleBytes(result)
Expand Down Expand Up @@ -2304,8 +2311,13 @@ async def _preprocess(
metadata=runtime.metadata,
)
method = "state_getStorageAt"
queryable = (
str(query_for)
if query_for is not None
else f"{method}{random.randint(0, 7000)}"
)
return Preprocessed(
str(query_for),
queryable,
method,
[storage_key.to_hex(), block_hash],
value_scale_type,
Expand Down Expand Up @@ -2553,7 +2565,7 @@ async def _get_block_hash(self, block_id: int) -> str:
return (await self.rpc_request("chain_getBlockHash", [block_id]))["result"]

async def get_chain_head(self) -> str:
result = await self._make_rpc_request(
response = await self._make_rpc_request(
[
self.make_payload(
"rpc_request",
Expand All @@ -2562,8 +2574,11 @@ async def get_chain_head(self) -> str:
)
]
)
self.last_block_hash = result["rpc_request"][0]["result"]
return result["rpc_request"][0]["result"]
result = response["rpc_request"][0]
if "error" in result:
raise SubstrateRequestException(result["error"]["message"])
self.last_block_hash = result["result"]
return result["result"]

async def compose_call(
self,
Expand Down Expand Up @@ -2690,9 +2705,6 @@ async def query_multi(
runtime=runtime,
)

if "error" in response:
raise SubstrateRequestException(response["error"]["message"])

result = []

storage_key_map = {s.to_hex(): s for s in storage_keys}
Expand Down Expand Up @@ -3044,12 +3056,7 @@ async def get_chain_finalised_head(self):

"""
response = await self.rpc_request("chain_getFinalizedHead", [])

if response is not None:
if "error" in response:
raise SubstrateRequestException(response["error"]["message"])

return response.get("result")
return response["result"]

async def _do_runtime_call_old(
self,
Expand Down Expand Up @@ -3092,6 +3099,8 @@ async def _do_runtime_call_old(
[f"{api}_{method}", param_data.hex(), block_hash],
runtime=runtime,
)
if "error" in result_data:
raise SubstrateRequestException(result_data["error"]["message"])
result_vec_u8_bytes = hex_to_bytes(result_data["result"])
result_bytes = await self.decode_scale(
"Vec<u8>", result_vec_u8_bytes, runtime=runtime
Expand Down Expand Up @@ -3185,6 +3194,8 @@ async def runtime_call(
[f"{api}_{method}", param_data.hex(), block_hash],
runtime=runtime,
)
if "error" in result_data:
raise SubstrateRequestException(result_data["error"]["message"])
output_type_string = f"scale_info::{runtime_call_def['output']}"

# Decode result
Expand Down Expand Up @@ -3237,6 +3248,8 @@ async def get_account_next_index(self, account_address: str) -> int:
nonce_obj = await self.rpc_request(
"account_nextIndex", [account_address]
)
if "error" in nonce_obj:
raise SubstrateRequestException(nonce_obj["error"]["message"])
self._nonces[account_address] = nonce_obj["result"]
else:
self._nonces[account_address] += 1
Expand Down Expand Up @@ -3622,9 +3635,6 @@ async def query_map(
method="state_getKeys", params=[prefix, block_hash], runtime=runtime
)

if "error" in response:
raise SubstrateRequestException(response["error"]["message"])

result_keys = response.get("result")

result = []
Expand All @@ -3640,8 +3650,6 @@ async def query_map(
params=[result_keys, block_hash],
runtime=runtime,
)
if "error" in response:
raise SubstrateRequestException(response["error"]["message"])
for result_group in response["result"]:
result = decode_query_map(
result_group["changes"],
Expand Down Expand Up @@ -3680,8 +3688,6 @@ async def query_map(
)
)
for response in all_responses:
if "error" in response:
raise SubstrateRequestException(response["error"]["message"])
for result_group in response["result"]:
changes.extend(result_group["changes"])

Expand Down Expand Up @@ -3905,9 +3911,6 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
"author_submitExtrinsic", [str(extrinsic.data)]
)

if "result" not in response:
raise SubstrateRequestException(response.get("error"))

result = AsyncExtrinsicReceipt(
substrate=self, extrinsic_hash=response["result"]
)
Expand All @@ -3919,18 +3922,17 @@ async def get_metadata_call_function(
module_name: str,
call_function_name: str,
block_hash: Optional[str] = None,
) -> Optional[list]:
) -> Optional[GenericVariant]:
"""
Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash
is omitted)
Retrieves specified call from the metadata at the block specified, or the chain tip if omitted.

Args:
module_name: name of the module
call_function_name: name of the call function
block_hash: optional block hash

Returns:
list of call functions
The dict-like call definition, if found. None otherwise.
"""
runtime = await self.init_runtime(block_hash=block_hash)

Expand Down Expand Up @@ -3994,12 +3996,8 @@ async def get_block_number(self, block_hash: Optional[str] = None) -> int:
"""Async version of `substrateinterface.base.get_block_number` method."""
response = await self.rpc_request("chain_getHeader", [block_hash])

if "error" in response:
raise SubstrateRequestException(response["error"]["message"])

elif "result" in response:
if response["result"]:
return int(response["result"]["number"], 16)
if response["result"]:
return int(response["result"]["number"], 16)
raise SubstrateRequestException(
f"Unable to retrieve block number for {block_hash}"
)
Expand Down
Loading
Loading