-
Notifications
You must be signed in to change notification settings - Fork 0
Add ROS1 message encoding #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
siliconlad
wants to merge
1
commit into
main
Choose a base branch
from
implement-ros1-message-encoding
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
import struct | ||
from typing import Any | ||
|
||
from pybag.encoding import MessageDecoder, MessageEncoder | ||
from pybag.io.raw_reader import BytesReader | ||
from pybag.io.raw_writer import BytesWriter | ||
|
||
|
||
class Ros1Decoder(MessageDecoder): | ||
"""Decode primitive values from a ROS1 byte stream.""" | ||
|
||
def __init__(self, data: bytes, *, little_endian: bool = True) -> None: | ||
self._is_little_endian = little_endian | ||
self._data = BytesReader(data) | ||
|
||
def parse(self, type_str: str) -> Any: | ||
return getattr(self, type_str)() | ||
|
||
# Primitive parsers ------------------------------------------------- | ||
|
||
def bool(self) -> bool: | ||
return struct.unpack("?", self._data.read(1))[0] | ||
|
||
def int8(self) -> int: | ||
fmt = "<b" if self._is_little_endian else ">b" | ||
return struct.unpack(fmt, self._data.read(1))[0] | ||
|
||
def uint8(self) -> int: | ||
fmt = "<B" if self._is_little_endian else ">B" | ||
return struct.unpack(fmt, self._data.read(1))[0] | ||
|
||
def byte(self) -> bytes: | ||
return self._data.read(1) | ||
|
||
def char(self) -> str: | ||
fmt = "<c" if self._is_little_endian else ">c" | ||
return struct.unpack(fmt, self._data.read(1))[0].decode() | ||
|
||
def int16(self) -> int: | ||
fmt = "<h" if self._is_little_endian else ">h" | ||
return struct.unpack(fmt, self._data.read(2))[0] | ||
|
||
def uint16(self) -> int: | ||
fmt = "<H" if self._is_little_endian else ">H" | ||
return struct.unpack(fmt, self._data.read(2))[0] | ||
|
||
def int32(self) -> int: | ||
fmt = "<i" if self._is_little_endian else ">i" | ||
return struct.unpack(fmt, self._data.read(4))[0] | ||
|
||
def uint32(self) -> int: | ||
fmt = "<I" if self._is_little_endian else ">I" | ||
return struct.unpack(fmt, self._data.read(4))[0] | ||
|
||
def int64(self) -> int: | ||
fmt = "<q" if self._is_little_endian else ">q" | ||
return struct.unpack(fmt, self._data.read(8))[0] | ||
|
||
def uint64(self) -> int: | ||
fmt = "<Q" if self._is_little_endian else ">Q" | ||
return struct.unpack(fmt, self._data.read(8))[0] | ||
|
||
def float32(self) -> float: | ||
fmt = "<f" if self._is_little_endian else ">f" | ||
return struct.unpack(fmt, self._data.read(4))[0] | ||
|
||
def float64(self) -> float: | ||
fmt = "<d" if self._is_little_endian else ">d" | ||
return struct.unpack(fmt, self._data.read(8))[0] | ||
|
||
def string(self) -> str: | ||
length = self.uint32() | ||
if length == 0: | ||
return "" | ||
return self._data.read(length).decode() | ||
|
||
# Container parsers -------------------------------------------------- | ||
|
||
def array(self, type: str, length: int) -> list: | ||
return [getattr(self, type)() for _ in range(length)] | ||
|
||
def sequence(self, type: str) -> list: | ||
length = self.uint32() | ||
return [getattr(self, type)() for _ in range(length)] | ||
|
||
|
||
class Ros1Encoder(MessageEncoder): | ||
"""Encode primitive values into a ROS1 byte stream.""" | ||
|
||
def __init__(self, *, little_endian: bool = True) -> None: | ||
self._is_little_endian = little_endian | ||
self._payload = BytesWriter() | ||
|
||
def encode(self, type_str: str, value: Any) -> None: | ||
getattr(self, type_str)(value) | ||
|
||
def save(self) -> bytes: | ||
return self._payload.as_bytes() | ||
|
||
# Primitive encoders ------------------------------------------------- | ||
|
||
def bool(self, value: bool) -> None: | ||
self._payload.write(struct.pack("?", value)) | ||
|
||
def int8(self, value: int) -> None: | ||
fmt = "<b" if self._is_little_endian else ">b" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def uint8(self, value: int) -> None: | ||
fmt = "<B" if self._is_little_endian else ">B" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def byte(self, value: bytes) -> None: | ||
self._payload.write(value) | ||
|
||
def char(self, value: str) -> None: | ||
fmt = "<c" if self._is_little_endian else ">c" | ||
self._payload.write(struct.pack(fmt, value.encode())) | ||
|
||
def int16(self, value: int) -> None: | ||
fmt = "<h" if self._is_little_endian else ">h" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def uint16(self, value: int) -> None: | ||
fmt = "<H" if self._is_little_endian else ">H" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def int32(self, value: int) -> None: | ||
fmt = "<i" if self._is_little_endian else ">i" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def uint32(self, value: int) -> None: | ||
fmt = "<I" if self._is_little_endian else ">I" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def int64(self, value: int) -> None: | ||
fmt = "<q" if self._is_little_endian else ">q" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def uint64(self, value: int) -> None: | ||
fmt = "<Q" if self._is_little_endian else ">Q" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def float32(self, value: float) -> None: | ||
fmt = "<f" if self._is_little_endian else ">f" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def float64(self, value: float) -> None: | ||
fmt = "<d" if self._is_little_endian else ">d" | ||
self._payload.write(struct.pack(fmt, value)) | ||
|
||
def string(self, value: str) -> None: | ||
encoded = value.encode() | ||
self.uint32(len(encoded)) | ||
self._payload.write(encoded) | ||
|
||
# Container encoders ------------------------------------------------- | ||
|
||
def array(self, type: str, values: list[Any]) -> None: | ||
for v in values: | ||
getattr(self, type)(v) | ||
|
||
def sequence(self, type: str, values: list[Any]) -> None: | ||
self.uint32(len(values)) | ||
for v in values: | ||
getattr(self, type)(v) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import pytest | ||
|
||
from pybag.encoding.ros1 import Ros1Decoder, Ros1Encoder | ||
|
||
|
||
@pytest.mark.parametrize("little_endian", [True, False]) | ||
def test_encode_decode_all_primitive_types(little_endian: bool) -> None: | ||
data_values = [ | ||
("bool", [True, False]), | ||
("int8", [-8, 8]), | ||
("uint8", [0, 200]), | ||
("int16", [-12_345, 12_345]), | ||
("uint16", [0, 54_321]), | ||
("int32", [-12_345_678, 12_345_678]), | ||
("uint32", [0, 12_345_678]), | ||
("int64", [-12_345_678_901, 12_345_678_901]), | ||
("uint64", [0, 9_876_543_210]), | ||
("float32", [-0.5, 0.5]), | ||
("float64", [0.0009765625, -0.0009765625]), | ||
("string", ["", "hello world"]), | ||
] | ||
|
||
encoder = Ros1Encoder(little_endian=little_endian) | ||
for type_name, values in data_values: | ||
for v in values: | ||
encoder.encode(type_name, v) | ||
|
||
data = encoder.save() | ||
|
||
decoder = Ros1Decoder(data, little_endian=little_endian) | ||
for type_name, values in data_values: | ||
for v in values: | ||
assert decoder.parse(type_name) == v | ||
|
||
|
||
@pytest.mark.parametrize("little_endian", [True, False]) | ||
def test_encode_decode_array(little_endian: bool) -> None: | ||
encoder = Ros1Encoder(little_endian=little_endian) | ||
encoder.array("int32", [1, 2, 3]) | ||
|
||
decoder = Ros1Decoder(encoder.save(), little_endian=little_endian) | ||
assert decoder.array("int32", 3) == [1, 2, 3] | ||
|
||
|
||
@pytest.mark.parametrize("little_endian", [True, False]) | ||
def test_encode_decode_sequence(little_endian: bool) -> None: | ||
encoder = Ros1Encoder(little_endian=little_endian) | ||
encoder.sequence("int32", [1, 2, 3]) | ||
|
||
decoder = Ros1Decoder(encoder.save(), little_endian=little_endian) | ||
assert decoder.sequence("int32") == [1, 2, 3] |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1] Treat
char
as signed byte rather than UTF‑8 stringROS1 defines
char
as a synonym forint8
, so it should round‑trip as a signed 8‑bit integer. The current implementation converts the byte to a UTF‑8 character (struct.unpack("<c")[0].decode()
) and the encoder requiresvalue.encode()
. Anychar
value ≥0x80 or negative will raiseUnicodeDecodeError
on decode, and callers that pass the typical integer values produced by ROS messages will fail because integers have noencode
method. This prevents validchar
fields from being serialized/deserialized and makes the API inconsistent with the other integer types.Useful? React with 👍 / 👎.