Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 77 additions & 3 deletions src/pybag/mcap_reader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import logging
import heapq
from collections.abc import Generator
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import Any, Callable
from typing import Any, Callable, Iterable

from pybag.deserialize import MessageDeserializerFactory
from pybag.mcap.error import McapUnknownEncodingError, McapUnknownTopicError
from pybag.mcap.record_reader import (
BaseMcapRecordReader,
McapRecordReaderFactory
McapRecordReaderFactory,
)

# GLOBAL TODOs:
Expand Down Expand Up @@ -40,7 +41,10 @@ def __init__(self, reader: BaseMcapRecordReader):
self._message_deserializer = MessageDeserializerFactory.from_profile(self._profile)

@staticmethod
def from_file(file_path: Path | str) -> 'McapFileReader':
def from_file(file_path: Path | str | Iterable[Path | str]) -> 'McapFileReader':
"""Create a reader from a file path or iterable of file paths."""
if isinstance(file_path, Iterable) and not isinstance(file_path, (str, Path)):
return McapMultipleFileReader.from_files(list(file_path))
reader = McapRecordReaderFactory.from_file(file_path)
return McapFileReader(reader)

Expand Down Expand Up @@ -133,3 +137,73 @@ def __exit__(
) -> None:
self.close()


class McapMultipleFileReader(McapFileReader):
"""Reader that seamlessly reads from multiple MCAP files."""

def __init__(self, readers: list[McapFileReader]):
self._readers = readers

@staticmethod
def from_files(file_paths: list[Path | str]) -> 'McapMultipleFileReader':
readers = [McapFileReader.from_file(p) for p in file_paths]
return McapMultipleFileReader(readers)

def get_topics(self) -> list[str]: # type: ignore[override]
topics: set[str] = set()
for reader in self._readers:
topics.update(reader.get_topics())
return list(topics)

def get_message_count(self, topic: str) -> int: # type: ignore[override]
count = 0
for reader in self._readers:
if topic in reader.get_topics():
count += reader.get_message_count(topic)
if count == 0:
raise McapUnknownTopicError(f'Topic {topic} not found in MCAP files')
return count

@property # type: ignore[override]
def start_time(self) -> int:
return min(reader.start_time for reader in self._readers)

@property # type: ignore[override]
def end_time(self) -> int:
return max(reader.end_time for reader in self._readers)

def messages( # type: ignore[override]
self,
topic: str,
start_time: float | None = None,
end_time: float | None = None,
filter: Callable[[DecodedMessage], bool] | None = None,
) -> Generator[DecodedMessage, None, None]:
iterators = []
for reader in self._readers:
if topic in reader.get_topics():
iterators.append(iter(reader.messages(topic, start_time, end_time)))
if not iterators:
raise McapUnknownTopicError(f'Topic {topic} not found in MCAP files')

heap: list[tuple[int, int, DecodedMessage, Generator[DecodedMessage, None, None]]] = []
for idx, it in enumerate(iterators):
try:
msg = next(it)
heapq.heappush(heap, (msg.log_time, idx, msg, it))
except StopIteration:
continue

while heap:
_, idx, msg, it = heapq.heappop(heap)
if filter is None or filter(msg):
yield msg
try:
next_msg = next(it)
heapq.heappush(heap, (next_msg.log_time, idx, next_msg, it))
except StopIteration:
pass

def close(self) -> None: # type: ignore[override]
for reader in self._readers:
reader.close()
29 changes: 29 additions & 0 deletions tests/test_multi_mcap_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pathlib import Path
from tempfile import TemporaryDirectory

from pybag.mcap_reader import McapFileReader
from pybag.mcap_writer import McapFileWriter
from pybag.ros2.humble import std_msgs


def test_read_multiple_files_as_one() -> None:
with TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
file1 = temp_path / "one.mcap"
file2 = temp_path / "two.mcap"

with McapFileWriter.open(file1, chunk_size=1) as writer:
writer.write_message("/chatter", 1, std_msgs.String(data="hello"))
writer.write_message("/chatter", 3, std_msgs.String(data="again"))
with McapFileWriter.open(file2, chunk_size=1) as writer:
writer.write_message("/chatter", 2, std_msgs.String(data="world"))
writer.write_message("/chatter", 4, std_msgs.String(data="!!"))

reader = McapFileReader.from_file([file1, file2])

assert reader.get_message_count("/chatter") == 4
messages = list(reader.messages("/chatter"))
assert [m.data.data for m in messages] == ["hello", "world", "again", "!!"]
assert reader.start_time == 1
assert reader.end_time == 4

Loading