Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions src/pybag/mcap_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,58 @@ def end_time(self) -> int:

def messages(
self,
topic: str,
topic: str | list[str] | None = None,
topics: list[str] | None = None,
start_time: float | None = None,
Comment on lines 74 to 77

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Preserve positional parameters after adding multi-topic argument

Introducing the new topics parameter between topic and start_time changes the positional call signature. Existing code that previously called reader.messages('/foo', 0, 10) (topic, start_time, end_time) will now map 0 to topics and raise ValueError('Specify either "topic" or "topics", not both') before the call even runs. This is a backward-incompatible API break that will surface immediately in any caller using positional arguments for start_time/end_time. Consider making topics keyword-only or placing it after the time arguments to retain compatibility.

Useful? React with 👍 / 👎.

end_time: float | None = None,
) -> Generator[DecodedMessage, None, None]:
"""
Iterate over messages in the MCAP file.
"""Iterate over messages in the MCAP file.

Args:
topic: Topic to filter by.
start_time: Start time to filter by. If None, start from the beginning of the file.
end_time: End time to filter by. If None, read to the end of the file.
topic: Single topic or pattern to filter by. Deprecated, use ``topics`` instead.
topics: List of topics or glob patterns to filter by.
start_time: Start time to filter by. If ``None``, start from the beginning of the
file.
end_time: End time to filter by. If ``None``, read to the end of the file.

Returns:
An iterator over DecodedMessage objects.
An iterator over :class:`DecodedMessage` objects.
"""
channel_id = self._reader.get_channel_id(topic)
if channel_id is None:
raise McapUnknownEncodingError(f'Topic {topic} not found in MCAP file')

for message in self._reader.get_messages(channel_id, start_time, end_time):
if topics is None:
if topic is None:
raise McapUnknownTopicError('No topics provided')
topics = [topic] if isinstance(topic, str) else list(topic)
else:
if topic is not None:
raise ValueError('Specify either "topic" or "topics", not both')

import fnmatch
channel_map = {c.topic: cid for cid, c in self._reader.get_channels().items()}
channel_ids: set[int] = set()
for pattern in topics:
matched = fnmatch.filter(channel_map.keys(), pattern)
if not matched:
raise McapUnknownTopicError(f'Topic {pattern} not found in MCAP file')
channel_ids.update(channel_map[m] for m in matched)

import heapq

generators = [
self._reader.get_messages(cid, start_time, end_time)
for cid in sorted(channel_ids)
]
heap: list[tuple[int, int, Any, Any]] = []
for idx, gen in enumerate(generators):
try:
msg = next(gen)
except StopIteration:
continue
heapq.heappush(heap, (msg.log_time, idx, msg, gen))

while heap:
log_time, idx, message, gen = heapq.heappop(heap)
channel_id = message.channel_id
message_deserializer = self._message_deserializer
if message_deserializer is None:
message_deserializer = MessageDeserializerFactory.from_message(
Expand All @@ -103,16 +135,22 @@ def messages(
raise McapUnknownEncodingError(f'Unknown encoding type: {self._profile}')

yield DecodedMessage(
message.channel_id,
channel_id,
message.sequence,
message.log_time,
log_time,
message.publish_time,
message_deserializer.deserialize_message(
message,
self._reader.get_message_schema(message)
)
)

try:
next_msg = next(gen)
except StopIteration:
continue
heapq.heappush(heap, (next_msg.log_time, idx, next_msg, gen))


if __name__ == '__main__':
import json
Expand Down
30 changes: 30 additions & 0 deletions tests/read/test_read_multiple_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pathlib import Path
from tempfile import TemporaryDirectory

from rosbags.rosbag2 import StoragePlugin, Writer
from rosbags.typesys import Stores, get_typestore

from pybag.mcap_reader import McapFileReader


def _find_mcap_file(temp_dir: str) -> Path:
return next(Path(temp_dir).rglob('*.mcap'))


def test_read_multiple_topics_and_patterns():
typestore = get_typestore(Stores.ROS2_JAZZY)
String = typestore.types['std_msgs/msg/String']

with TemporaryDirectory() as temp_dir:
with Writer(Path(temp_dir) / 'rosbags', version=9, storage_plugin=StoragePlugin.MCAP) as writer:
conn1 = writer.add_connection('/pose/first', String.__msgtype__, typestore=typestore)
writer.write(conn1, 0, typestore.serialize_cdr(String(data='pose'), String.__msgtype__))
conn2 = writer.add_connection('/cmd_vel', String.__msgtype__, typestore=typestore)
writer.write(conn2, 1, typestore.serialize_cdr(String(data='cmd'), String.__msgtype__))

mcap_file = _find_mcap_file(temp_dir)
reader = McapFileReader.from_file(mcap_file)
messages = list(reader.messages(topics=['/pose/*', '/cmd_vel']))

assert [m.log_time for m in messages] == [0, 1]
assert [m.data.data for m in messages] == ['pose', 'cmd']