Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions src/pybag/encoding/ros1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import struct
from typing import Any

from pybag.encoding import MessageDecoder, MessageEncoder
from pybag.io.raw_reader import BytesReader
from pybag.io.raw_writer import BytesWriter


class Ros1Decoder(MessageDecoder):
"""Decode primitive values from a ROS1 byte stream."""

def __init__(self, data: bytes, *, little_endian: bool = True) -> None:
self._is_little_endian = little_endian
self._data = BytesReader(data)

def parse(self, type_str: str) -> Any:
return getattr(self, type_str)()

# Primitive parsers -------------------------------------------------

def bool(self) -> bool:
return struct.unpack("?", self._data.read(1))[0]

def int8(self) -> int:
fmt = "<b" if self._is_little_endian else ">b"
return struct.unpack(fmt, self._data.read(1))[0]

def uint8(self) -> int:
fmt = "<B" if self._is_little_endian else ">B"
return struct.unpack(fmt, self._data.read(1))[0]

def byte(self) -> bytes:
return self._data.read(1)

def char(self) -> str:
fmt = "<c" if self._is_little_endian else ">c"
Comment on lines +32 to +36

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Treat char as signed byte rather than UTF‑8 string

ROS1 defines char as a synonym for int8, so it should round‑trip as a signed 8‑bit integer. The current implementation converts the byte to a UTF‑8 character (struct.unpack("<c")[0].decode()) and the encoder requires value.encode(). Any char value ≥0x80 or negative will raise UnicodeDecodeError on decode, and callers that pass the typical integer values produced by ROS messages will fail because integers have no encode method. This prevents valid char fields from being serialized/deserialized and makes the API inconsistent with the other integer types.

Useful? React with 👍 / 👎.

return struct.unpack(fmt, self._data.read(1))[0].decode()

def int16(self) -> int:
fmt = "<h" if self._is_little_endian else ">h"
return struct.unpack(fmt, self._data.read(2))[0]

def uint16(self) -> int:
fmt = "<H" if self._is_little_endian else ">H"
return struct.unpack(fmt, self._data.read(2))[0]

def int32(self) -> int:
fmt = "<i" if self._is_little_endian else ">i"
return struct.unpack(fmt, self._data.read(4))[0]

def uint32(self) -> int:
fmt = "<I" if self._is_little_endian else ">I"
return struct.unpack(fmt, self._data.read(4))[0]

def int64(self) -> int:
fmt = "<q" if self._is_little_endian else ">q"
return struct.unpack(fmt, self._data.read(8))[0]

def uint64(self) -> int:
fmt = "<Q" if self._is_little_endian else ">Q"
return struct.unpack(fmt, self._data.read(8))[0]

def float32(self) -> float:
fmt = "<f" if self._is_little_endian else ">f"
return struct.unpack(fmt, self._data.read(4))[0]

def float64(self) -> float:
fmt = "<d" if self._is_little_endian else ">d"
return struct.unpack(fmt, self._data.read(8))[0]

def string(self) -> str:
length = self.uint32()
if length == 0:
return ""
return self._data.read(length).decode()

# Container parsers --------------------------------------------------

def array(self, type: str, length: int) -> list:
return [getattr(self, type)() for _ in range(length)]

def sequence(self, type: str) -> list:
length = self.uint32()
return [getattr(self, type)() for _ in range(length)]


class Ros1Encoder(MessageEncoder):
"""Encode primitive values into a ROS1 byte stream."""

def __init__(self, *, little_endian: bool = True) -> None:
self._is_little_endian = little_endian
self._payload = BytesWriter()

def encode(self, type_str: str, value: Any) -> None:
getattr(self, type_str)(value)

def save(self) -> bytes:
return self._payload.as_bytes()

# Primitive encoders -------------------------------------------------

def bool(self, value: bool) -> None:
self._payload.write(struct.pack("?", value))

def int8(self, value: int) -> None:
fmt = "<b" if self._is_little_endian else ">b"
self._payload.write(struct.pack(fmt, value))

def uint8(self, value: int) -> None:
fmt = "<B" if self._is_little_endian else ">B"
self._payload.write(struct.pack(fmt, value))

def byte(self, value: bytes) -> None:
self._payload.write(value)

def char(self, value: str) -> None:
fmt = "<c" if self._is_little_endian else ">c"
self._payload.write(struct.pack(fmt, value.encode()))

def int16(self, value: int) -> None:
fmt = "<h" if self._is_little_endian else ">h"
self._payload.write(struct.pack(fmt, value))

def uint16(self, value: int) -> None:
fmt = "<H" if self._is_little_endian else ">H"
self._payload.write(struct.pack(fmt, value))

def int32(self, value: int) -> None:
fmt = "<i" if self._is_little_endian else ">i"
self._payload.write(struct.pack(fmt, value))

def uint32(self, value: int) -> None:
fmt = "<I" if self._is_little_endian else ">I"
self._payload.write(struct.pack(fmt, value))

def int64(self, value: int) -> None:
fmt = "<q" if self._is_little_endian else ">q"
self._payload.write(struct.pack(fmt, value))

def uint64(self, value: int) -> None:
fmt = "<Q" if self._is_little_endian else ">Q"
self._payload.write(struct.pack(fmt, value))

def float32(self, value: float) -> None:
fmt = "<f" if self._is_little_endian else ">f"
self._payload.write(struct.pack(fmt, value))

def float64(self, value: float) -> None:
fmt = "<d" if self._is_little_endian else ">d"
self._payload.write(struct.pack(fmt, value))

def string(self, value: str) -> None:
encoded = value.encode()
self.uint32(len(encoded))
self._payload.write(encoded)

# Container encoders -------------------------------------------------

def array(self, type: str, values: list[Any]) -> None:
for v in values:
getattr(self, type)(v)

def sequence(self, type: str, values: list[Any]) -> None:
self.uint32(len(values))
for v in values:
getattr(self, type)(v)
51 changes: 51 additions & 0 deletions tests/encoding/test_ros1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import pytest

from pybag.encoding.ros1 import Ros1Decoder, Ros1Encoder


@pytest.mark.parametrize("little_endian", [True, False])
def test_encode_decode_all_primitive_types(little_endian: bool) -> None:
data_values = [
("bool", [True, False]),
("int8", [-8, 8]),
("uint8", [0, 200]),
("int16", [-12_345, 12_345]),
("uint16", [0, 54_321]),
("int32", [-12_345_678, 12_345_678]),
("uint32", [0, 12_345_678]),
("int64", [-12_345_678_901, 12_345_678_901]),
("uint64", [0, 9_876_543_210]),
("float32", [-0.5, 0.5]),
("float64", [0.0009765625, -0.0009765625]),
("string", ["", "hello world"]),
]

encoder = Ros1Encoder(little_endian=little_endian)
for type_name, values in data_values:
for v in values:
encoder.encode(type_name, v)

data = encoder.save()

decoder = Ros1Decoder(data, little_endian=little_endian)
for type_name, values in data_values:
for v in values:
assert decoder.parse(type_name) == v


@pytest.mark.parametrize("little_endian", [True, False])
def test_encode_decode_array(little_endian: bool) -> None:
encoder = Ros1Encoder(little_endian=little_endian)
encoder.array("int32", [1, 2, 3])

decoder = Ros1Decoder(encoder.save(), little_endian=little_endian)
assert decoder.array("int32", 3) == [1, 2, 3]


@pytest.mark.parametrize("little_endian", [True, False])
def test_encode_decode_sequence(little_endian: bool) -> None:
encoder = Ros1Encoder(little_endian=little_endian)
encoder.sequence("int32", [1, 2, 3])

decoder = Ros1Decoder(encoder.save(), little_endian=little_endian)
assert decoder.sequence("int32") == [1, 2, 3]