From 957323e955a78480f99db25de2ab44e95db09c49 Mon Sep 17 00:00:00 2001 From: Angus Stewart Date: Wed, 3 Sep 2025 21:31:04 +0200 Subject: [PATCH] Add ROS1 message encoding --- src/pybag/encoding/ros1.py | 166 ++++++++++++++++++++++++++++++++++++ tests/encoding/test_ros1.py | 51 +++++++++++ 2 files changed, 217 insertions(+) create mode 100644 src/pybag/encoding/ros1.py create mode 100644 tests/encoding/test_ros1.py diff --git a/src/pybag/encoding/ros1.py b/src/pybag/encoding/ros1.py new file mode 100644 index 0000000..351d12c --- /dev/null +++ b/src/pybag/encoding/ros1.py @@ -0,0 +1,166 @@ +import struct +from typing import Any + +from pybag.encoding import MessageDecoder, MessageEncoder +from pybag.io.raw_reader import BytesReader +from pybag.io.raw_writer import BytesWriter + + +class Ros1Decoder(MessageDecoder): + """Decode primitive values from a ROS1 byte stream.""" + + def __init__(self, data: bytes, *, little_endian: bool = True) -> None: + self._is_little_endian = little_endian + self._data = BytesReader(data) + + def parse(self, type_str: str) -> Any: + return getattr(self, type_str)() + + # Primitive parsers ------------------------------------------------- + + def bool(self) -> bool: + return struct.unpack("?", self._data.read(1))[0] + + def int8(self) -> int: + fmt = "b" + return struct.unpack(fmt, self._data.read(1))[0] + + def uint8(self) -> int: + fmt = "B" + return struct.unpack(fmt, self._data.read(1))[0] + + def byte(self) -> bytes: + return self._data.read(1) + + def char(self) -> str: + fmt = "c" + return struct.unpack(fmt, self._data.read(1))[0].decode() + + def int16(self) -> int: + fmt = "h" + return struct.unpack(fmt, self._data.read(2))[0] + + def uint16(self) -> int: + fmt = "H" + return struct.unpack(fmt, self._data.read(2))[0] + + def int32(self) -> int: + fmt = "i" + return struct.unpack(fmt, self._data.read(4))[0] + + def uint32(self) -> int: + fmt = "I" + return struct.unpack(fmt, self._data.read(4))[0] + + def int64(self) -> int: + fmt = "q" + return struct.unpack(fmt, self._data.read(8))[0] + + def uint64(self) -> int: + fmt = "Q" + return struct.unpack(fmt, self._data.read(8))[0] + + def float32(self) -> float: + fmt = "f" + return struct.unpack(fmt, self._data.read(4))[0] + + def float64(self) -> float: + fmt = "d" + return struct.unpack(fmt, self._data.read(8))[0] + + def string(self) -> str: + length = self.uint32() + if length == 0: + return "" + return self._data.read(length).decode() + + # Container parsers -------------------------------------------------- + + def array(self, type: str, length: int) -> list: + return [getattr(self, type)() for _ in range(length)] + + def sequence(self, type: str) -> list: + length = self.uint32() + return [getattr(self, type)() for _ in range(length)] + + +class Ros1Encoder(MessageEncoder): + """Encode primitive values into a ROS1 byte stream.""" + + def __init__(self, *, little_endian: bool = True) -> None: + self._is_little_endian = little_endian + self._payload = BytesWriter() + + def encode(self, type_str: str, value: Any) -> None: + getattr(self, type_str)(value) + + def save(self) -> bytes: + return self._payload.as_bytes() + + # Primitive encoders ------------------------------------------------- + + def bool(self, value: bool) -> None: + self._payload.write(struct.pack("?", value)) + + def int8(self, value: int) -> None: + fmt = "b" + self._payload.write(struct.pack(fmt, value)) + + def uint8(self, value: int) -> None: + fmt = "B" + self._payload.write(struct.pack(fmt, value)) + + def byte(self, value: bytes) -> None: + self._payload.write(value) + + def char(self, value: str) -> None: + fmt = "c" + self._payload.write(struct.pack(fmt, value.encode())) + + def int16(self, value: int) -> None: + fmt = "h" + self._payload.write(struct.pack(fmt, value)) + + def uint16(self, value: int) -> None: + fmt = "H" + self._payload.write(struct.pack(fmt, value)) + + def int32(self, value: int) -> None: + fmt = "i" + self._payload.write(struct.pack(fmt, value)) + + def uint32(self, value: int) -> None: + fmt = "I" + self._payload.write(struct.pack(fmt, value)) + + def int64(self, value: int) -> None: + fmt = "q" + self._payload.write(struct.pack(fmt, value)) + + def uint64(self, value: int) -> None: + fmt = "Q" + self._payload.write(struct.pack(fmt, value)) + + def float32(self, value: float) -> None: + fmt = "f" + self._payload.write(struct.pack(fmt, value)) + + def float64(self, value: float) -> None: + fmt = "d" + self._payload.write(struct.pack(fmt, value)) + + def string(self, value: str) -> None: + encoded = value.encode() + self.uint32(len(encoded)) + self._payload.write(encoded) + + # Container encoders ------------------------------------------------- + + def array(self, type: str, values: list[Any]) -> None: + for v in values: + getattr(self, type)(v) + + def sequence(self, type: str, values: list[Any]) -> None: + self.uint32(len(values)) + for v in values: + getattr(self, type)(v) diff --git a/tests/encoding/test_ros1.py b/tests/encoding/test_ros1.py new file mode 100644 index 0000000..a416da9 --- /dev/null +++ b/tests/encoding/test_ros1.py @@ -0,0 +1,51 @@ +import pytest + +from pybag.encoding.ros1 import Ros1Decoder, Ros1Encoder + + +@pytest.mark.parametrize("little_endian", [True, False]) +def test_encode_decode_all_primitive_types(little_endian: bool) -> None: + data_values = [ + ("bool", [True, False]), + ("int8", [-8, 8]), + ("uint8", [0, 200]), + ("int16", [-12_345, 12_345]), + ("uint16", [0, 54_321]), + ("int32", [-12_345_678, 12_345_678]), + ("uint32", [0, 12_345_678]), + ("int64", [-12_345_678_901, 12_345_678_901]), + ("uint64", [0, 9_876_543_210]), + ("float32", [-0.5, 0.5]), + ("float64", [0.0009765625, -0.0009765625]), + ("string", ["", "hello world"]), + ] + + encoder = Ros1Encoder(little_endian=little_endian) + for type_name, values in data_values: + for v in values: + encoder.encode(type_name, v) + + data = encoder.save() + + decoder = Ros1Decoder(data, little_endian=little_endian) + for type_name, values in data_values: + for v in values: + assert decoder.parse(type_name) == v + + +@pytest.mark.parametrize("little_endian", [True, False]) +def test_encode_decode_array(little_endian: bool) -> None: + encoder = Ros1Encoder(little_endian=little_endian) + encoder.array("int32", [1, 2, 3]) + + decoder = Ros1Decoder(encoder.save(), little_endian=little_endian) + assert decoder.array("int32", 3) == [1, 2, 3] + + +@pytest.mark.parametrize("little_endian", [True, False]) +def test_encode_decode_sequence(little_endian: bool) -> None: + encoder = Ros1Encoder(little_endian=little_endian) + encoder.sequence("int32", [1, 2, 3]) + + decoder = Ros1Decoder(encoder.save(), little_endian=little_endian) + assert decoder.sequence("int32") == [1, 2, 3]