Skip to content

Add more explicit typing #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ ignore = [
"D212", # multi-line-summary-first-line (incompatible with formatter)
"D213", # Multi-line docstring summary should start at the second line
"ISC001", # incompatible with formatter

# Moving imports into type-checking blocks can mess with pytest.patch()
"TC001", # Move application import {} into a type-checking block
"TC002", # Move third-party import {} into a type-checking block
"TC003", # Move standard library import {} into a type-checking block

"TRY003", # Avoid specifying long messages outside the exception class
"TRY400", # Use `logging.exception` instead of `logging.error`
]
Expand Down
13 changes: 7 additions & 6 deletions custom_components/zaptec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def create_entities_from_zaptec(

return entities

def create_streams(self):
def create_streams(self) -> None:
"""Create the streams for all installations."""
for install in self.zaptec.installations:
if install.id in self.zaptec:
Expand All @@ -501,7 +501,7 @@ def create_streams(self):
)
self.streams.append((task, install))

async def cancel_streams(self):
async def cancel_streams(self) -> None:
"""Cancel all streams for the account."""
for task, install in self.streams:
_LOGGER.debug("Cancelling stream for %s", install.qual_id)
Expand Down Expand Up @@ -707,7 +707,7 @@ async def trigger_poll(self) -> None:
finally:
self._trigger_task = None

def cleanup_task(_task: asyncio.Task):
def cleanup_task(_task: asyncio.Task) -> None:
"""Cleanup the task after it has run."""
self._trigger_task = None

Expand Down Expand Up @@ -797,6 +797,7 @@ def _get_zaptec_value(
Raises:
KeyUnavailableError: If key doesn't exist or obj doesn't have
`.get()`, which indicates that obj isn't a Mapping-like object

"""
obj = self.zaptec_obj
key = key or self.key
Expand Down Expand Up @@ -835,7 +836,7 @@ def _log_zaptec_attribute(self) -> str:
return f".{v}"

@callback
def _log_value(self, attribute: str | None, force=False):
def _log_value(self, attribute: str | None, force=False) -> None:
"""Helper to log a new value."""
if attribute is None:
return
Expand All @@ -854,7 +855,7 @@ def _log_value(self, attribute: str | None, force=False):
)

@callback
def _log_unavailable(self, exception: Exception | None = None):
def _log_unavailable(self, exception: Exception | None = None) -> None:
"""Helper to log when unavailable."""
available = self._attr_available
prev_available = self._prev_available
Expand Down Expand Up @@ -885,7 +886,7 @@ def _log_unavailable(self, exception: Exception | None = None):
_LOGGER.info("Entity %s is available", self.entity_id)

@property
def key(self):
def key(self) -> str:
"""Helper to retrieve the key from the entity description."""
return self.entity_description.key

Expand Down
8 changes: 4 additions & 4 deletions custom_components/zaptec/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ async def _refresh_token(self):
_LOGGER.debug(" TOKEN OK")
return

elif response.status == 400:
if response.status == 400:
data = await response.json()
raise log_exc(
AuthenticationError(
Expand Down Expand Up @@ -1145,11 +1145,11 @@ async def request(self, url: str, *, method="get", data=None, base_url=API_URL):
kwargs["headers"]["Authorization"] = f"Bearer {self._access_token}"
continue # Retry request

elif response.status in (201, 204): # Created, no content
if response.status in (201, 204): # Created, no content
content = await response.read()
return content

elif response.status == 200: # OK
if response.status == 200: # OK
# Read the JSON payload
try:
json_result = await response.json(content_type=None)
Expand Down Expand Up @@ -1292,7 +1292,7 @@ async def build(self):

# Update the observation, settings and commands ids based on the
# discovered device types.
ZCONST.update_ids_from_schema({chg["DeviceType"] for chg in self.chargers})
ZCONST.update_ids_from_schema({str(chg["DeviceType"]) for chg in self.chargers})

self.is_built = True

Expand Down
2 changes: 1 addition & 1 deletion custom_components/zaptec/binary_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _update_from_zaptec(self) -> None:
class ZaptecBinarySensorWithAttrs(ZaptecBinarySensor):
"""Zaptec binary sensor with additional attributes."""

def _post_init(self):
def _post_init(self) -> None:
self._attr_extra_state_attributes = self.zaptec_obj.asdict()
self._attr_unique_id = self.zaptec_obj.id

Expand Down
2 changes: 1 addition & 1 deletion custom_components/zaptec/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def _get_chargers(self) -> tuple[dict[str, str], dict[str, str]]:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"

def charger_text(charger: Charger):
def charger_text(charger: Charger) -> str:
"""Format the charger text for display."""
text = f"{charger.name} ({charger.get('DeviceId', '-')})"
if circuit := charger.get("CircuitName"):
Expand Down
2 changes: 1 addition & 1 deletion custom_components/zaptec/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class ZaptecChargeSensor(ZaptecSensorTranslate):
def _update_from_zaptec(self) -> None:
"""Update the entity from Zaptec data."""
# Called from ZaptecBaseEntity._handle_coordinator_update()
self._attr_native_value = self._get_zaptec_value(lower_case_str=True)
self._attr_native_value: str = self._get_zaptec_value(lower_case_str=True)
self._attr_icon = self.CHARGE_MODE_ICON_MAP.get(
self._attr_native_value, self.CHARGE_MODE_ICON_MAP["unknown"]
)
Expand Down
57 changes: 29 additions & 28 deletions custom_components/zaptec/zconst.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import UserDict
import json
import logging
from typing import ClassVar, Literal
from typing import Any, ClassVar, Literal

from .misc import to_under

Expand All @@ -26,7 +26,7 @@
# Helper wrapper for reading constants from the API
#
class ZConst(UserDict):
"""Zaptec constants wrapper class"""
"""Zaptec constants wrapper class."""

observations: dict[str, int]
settings: dict[str, int]
Expand All @@ -53,7 +53,9 @@ class ZConst(UserDict):
}
"""Mapping of charger models to device serial number prefixes."""

def get_remap(self, wanted, device_types=None) -> dict:
def get_remap(
self, wanted: list[str], device_types: set[str] | None = None
) -> dict[str, int]:
"""Parse the Zaptec constants and return a remap dict.

Parse the given zaptec constants record `CONST` and generate
Expand Down Expand Up @@ -83,7 +85,7 @@ def get_remap(self, wanted, device_types=None) -> dict:
ids.update({v: k for k, v in ids.items()})
return ids

def update_ids_from_schema(self, device_types):
def update_ids_from_schema(self, device_types: set[str] | None) -> None:
"""Update the id from a schema.

Read observations, settings and command ids from the
Expand All @@ -105,27 +107,27 @@ def update_ids_from_schema(self, device_types):
# DATA EXTRACTION
#
@property
def charger_operation_modes_list(self):
def charger_operation_modes_list(self) -> list[str]:
"""Return a list of all charger operation modes."""
return list(self.get("ChargerOperationModes", {}))

@property
def device_types_list(self):
def device_types_list(self) -> list[str]:
"""Return a list of all device types."""
return list(self.get("DeviceTypes", {}))

@property
def installation_authentication_type_list(self):
def installation_authentication_type_list(self) -> list[str]:
"""Return a list of all installation authentication types."""
return list(self.get("InstallationAuthenticationType", {}))

@property
def installation_types_list(self):
def installation_types_list(self) -> list[str]:
"""Return a list of all installation types."""
return list(self.get("InstallationTypes", {}))

@property
def network_types_list(self):
def network_types_list(self) -> list[str]:
"""Return a list of all electrical network types."""
return list(self.get("NetworkTypes", {}))

Expand All @@ -143,53 +145,52 @@ def serial_to_model(self) -> dict[str, str]:
# Want these to be methods so they can be used for type convertions of
# attributes in the API responses.
#
def type_authentication_type(self, v):
def type_authentication_type(self, val: int) -> str:
"""Convert the authentication type to a string."""
modes = {str(v): k for k, v in self.get("InstallationAuthenticationType", {}).items()}
return modes.get(str(v), str(v))
return modes.get(str(val), str(val))

def type_completed_session(self, data):
def type_completed_session(self, val: str) -> dict[str, Any]:
"""Convert the CompletedSession to a dict."""
data = json.loads(data)
data = json.loads(val)
if "SignedSession" in data:
data["SignedSession"] = self.type_ocmf(data["SignedSession"])
return data

def type_device_type(self, v):
def type_device_type(self, val: int) -> str:
"""Convert the device type to a string."""
modes = {str(v): k for k, v in self.get("DeviceTypes", {}).items()}
return modes.get(str(v), str(v))
return modes.get(str(val), str(val))

def type_installation_type(self, v):
def type_installation_type(self, val: int) -> str:
"""Convert the installation type to a string."""
modes = {
str(v.get("Id")): v.get("Name") for v in self.get("InstallationTypes", {}).values()
}
return modes.get(str(v), str(v))
return modes.get(str(val), str(val))

def type_network_type(self, v):
def type_network_type(self, val: int) -> str:
"""Convert the network type to a string."""
modes = {str(v): k for k, v in self.get("NetworkTypes", {}).items()}
return modes.get(str(v), str(v))
return modes.get(str(val), str(val))

def type_ocmf(self, data):
def type_ocmf(self, data: str) -> dict[str, Any]:
"""Open Charge Metering Format (OCMF) type."""
# https://github.com/SAFE-eV/OCMF-Open-Charge-Metering-Format/blob/master/OCMF-en.md
sects = data.split("|")
if len(sects) not in (2, 3) or sects[0] != "OCMF":
raise ValueError(f"Invalid OCMF data: {data}")
data = json.loads(sects[1])
return data
return json.loads(sects[1])

def type_charger_operation_mode(self, v):
def type_charger_operation_mode(self, val: int) -> str:
"""Convert the operation mode to a string."""
modes = {str(v): k for k, v in self.get("ChargerOperationModes", {}).items()}
return modes.get(str(v), str(v))
return modes.get(str(val), str(val))

def type_user_roles(self, v):
def type_user_roles(self, val: int) -> str:
"""Convert the user roles to a string."""
val = int(v)
if not val:
if val == 0:
return "None"
roles = set(k for k, v in self.get("UserRoles", {}).items() if v & val == v)
# v != 0 is needed to avoid the 0 == 0 case leading to None always being included
roles = {k for k, v in self.get("UserRoles", {}).items() if v != 0 and (v & val) == v}
return ", ".join(roles)