Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pybag = "pybag.cli.main:main"
[project.optional-dependencies]
zstd = ["zstandard>=0.23.0"]
lz4 = ["lz4>=4.4.3"]
cli = ["rich>=13.7.0"]

[build-system]
requires = ["hatchling"]
Expand All @@ -26,6 +27,7 @@ test = [
"mcap-ros2-support>=0.5.5",
"rosbags>=0.10.10",
"numpy>=2.2.3",
"pybag[cli]",
]

[tool.isort]
Expand Down
246 changes: 246 additions & 0 deletions src/pybag/cli/info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import argparse
from datetime import datetime
from pathlib import Path

from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text

from pybag.mcap.record_reader import (
BaseMcapRecordReader,
McapRecordReaderFactory
)


def format_duration(nanoseconds: int) -> str:
"""Format duration from nanoseconds to human readable format."""
seconds = nanoseconds / 1_000_000_000

if seconds < 60:
return f"{seconds:.2f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"


def format_timestamp(nanoseconds: int) -> str:
"""Format timestamp from nanoseconds since epoch to human readable format."""
seconds = nanoseconds / 1_000_000_000
return datetime.fromtimestamp(seconds).strftime("%Y-%m-%d %H:%M:%S")


def format_file_size(bytes_size: int) -> str:
"""Format file size in human readable format."""
file_size = float(bytes_size)
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if file_size < 1024:
if file_size != int(file_size):
return f"{file_size:.1f} {unit}"
return f"{int(file_size)} {unit}"
file_size = file_size / 1024
return f"{bytes_size:.1f} PB"


def format_frequency(message_count: int, duration_ns: int) -> str:
"""Calculate and format message frequency."""
if duration_ns == 0:
return "N/A"

duration_seconds = duration_ns / 1_000_000_000
frequency = message_count / duration_seconds

if frequency >= 1:
return f"{frequency:.1f} Hz"
else:
period = 1 / frequency
return f"1/{period:.1f} Hz"


def create_file_info_panel(reader: BaseMcapRecordReader, file_path: Path) -> Panel:
"""Create a panel with basic file information."""
stats = reader.get_statistics()
duration = stats.message_end_time - stats.message_start_time
file_size = file_path.stat().st_size

info_text = Text()
info_text.append(f"File: {file_path.name}\n", style="bold")
info_text.append(f"Size: {format_file_size(file_size)}\n")
info_text.append(f"Duration: {format_duration(duration)}\n")
info_text.append(f"Start: {format_timestamp(stats.message_start_time)}\n")
info_text.append(f"End: {format_timestamp(stats.message_end_time)}")

return Panel(info_text, title="File Information", border_style="dim")


def create_summary_panel(reader: BaseMcapRecordReader) -> Panel:
"""Create a panel with summary statistics."""
stats = reader.get_statistics()

summary_text = Text()
summary_text.append(f"Messages: {stats.message_count:,} ", style="bold")
summary_text.append(f"Topics: {stats.channel_count} ")
summary_text.append(f"Schemas: {stats.schema_count}\n")
summary_text.append(f"Chunks: {stats.chunk_count} ")
summary_text.append(f"Attachments: {stats.attachment_count} ")
summary_text.append(f"Metadata: {stats.metadata_count}")

return Panel(summary_text, title="Summary", border_style="dim")


def create_topics_table(reader: BaseMcapRecordReader) -> Table:
"""Create a table with topic information."""
from rich.box import SIMPLE
table = Table(show_header=True, header_style="bold", box=SIMPLE, border_style="dim")
table.add_column("Topic", no_wrap=True, justify="center")
table.add_column("Messages", justify="center")
table.add_column("Frequency", justify="center")
table.add_column("Type", no_wrap=True, justify="center")

stats = reader.get_statistics()
duration = stats.message_end_time - stats.message_start_time
topics = reader.get_channels()
schemas = reader.get_schemas()

# Sort topics by message count (descending)
sorted_topics = sorted(
topics.items(),
key=lambda x: stats.channel_message_counts.get(x[0], 0),
reverse=True
)

for channel_id, channel in sorted_topics:
message_count = stats.channel_message_counts.get(channel_id, 0)
frequency = format_frequency(message_count, duration)

# Get schema name if available
schema_name = "Unknown"
if channel.schema_id in schemas:
schema_name = schemas[channel.schema_id].name

table.add_row(
channel.topic,
f"{message_count:,}",
frequency,
schema_name
)

return table


def create_schemas_table(reader: BaseMcapRecordReader) -> Table:
"""Create a table with schema information."""
table = Table(show_header=True, header_style="bold", box=None, border_style="dim")
table.add_column("ID", justify="right", style="cyan")
table.add_column("Name", style="green")
table.add_column("Encoding")
table.add_column("Size", justify="right")
table.add_column("Topics", justify="right")

schemas = reader.get_schemas()
channels = reader.get_channels()

# Count how many channels use each schema
schema_usage = {}
for channel in channels.values():
schema_id = channel.schema_id
schema_usage[schema_id] = schema_usage.get(schema_id, 0) + 1

# Sort schemas by usage (descending), then by name
sorted_schemas = sorted(
schemas.items(),
key=lambda x: (schema_usage.get(x[0], 0), x[1].name),
reverse=True
)

for schema_id, schema in sorted_schemas:
usage_count = schema_usage.get(schema_id, 0)
size_formatted = format_file_size(len(schema.data))
usage_text = str(usage_count)

table.add_row(
str(schema_id),
schema.name,
schema.encoding,
size_formatted,
usage_text
)

return table


def info_command(args: argparse.Namespace) -> None:
"""Execute the info command."""
console = Console()
file_path = Path(args.mcap_path)

if not file_path.exists():
console.print(f"[bold red]Error:[/bold red] File '{file_path}' does not exist")
return

try:
reader = McapRecordReaderFactory.from_file(file_path)
stats = reader.get_statistics()
header = reader.get_header()
duration = stats.message_end_time - stats.message_start_time
file_size = file_path.stat().st_size

# Print clean title
console.print(f"\n[bold blue]{file_path.name}[/bold blue]\n")

# File properties
file_info = Text()
file_info.append("Size: ", style="dim")
file_info.append(f"{format_file_size(file_size)}")
file_info.append(" Duration: ", style="dim")
file_info.append(f"{format_duration(duration)}")
if header.profile:
file_info.append(" Profile: ", style="dim")
file_info.append(header.profile, style="cyan")
console.print(file_info)

# Content summary
content_info = Text()
content_info.append("Messages: ", style="dim")
content_info.append(f"{stats.message_count:,}", style="bold")
content_info.append(" Topics: ", style="dim")
content_info.append(f"{stats.channel_count}")
content_info.append(" Schemas: ", style="dim")
content_info.append(f"{stats.schema_count}")
content_info.append(" Chunks: ", style="dim")
content_info.append(f"{stats.chunk_count}")
console.print(content_info)

# Time range section
console.print()
console.print("[dim]Time Range:[/dim]")
console.print(f" {format_timestamp(stats.message_start_time)} → {format_timestamp(stats.message_end_time)}")
console.print(f" [dim]{stats.message_start_time} → {stats.message_end_time} ns[/dim]")

# Topics section
console.print()
if stats.channel_count == 0:
console.print("[dim]Topics: none[/dim]")
else:
console.print("[dim]Topics:[/dim]")
topics_table = create_topics_table(reader)
console.print(topics_table)

except Exception as e:
console.print(f"[bold red]Error reading MCAP file:[/bold red] {e}")


def add_info_parser(subparsers) -> None:
"""Add the info command parser."""
info_parser = subparsers.add_parser(
"info",
help="Display information about an MCAP file"
)
info_parser.add_argument(
"mcap_path",
help="Path to the MCAP file"
)
info_parser.set_defaults(func=info_command)
6 changes: 4 additions & 2 deletions src/pybag/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import argparse

from .info import add_info_parser


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
Expand All @@ -9,11 +11,11 @@ def build_parser() -> argparse.ArgumentParser:
"MCAP and bag files."
),
)
parser.set_defaults(func=lambda args: parser.print_help())
subparsers = parser.add_subparsers(dest="command")

# In the future subcommands will be added here.
add_info_parser(subparsers)

parser.set_defaults(func=lambda args: parser.print_help())
return parser


Expand Down
8 changes: 8 additions & 0 deletions src/pybag/mcap/record_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def __init__(self, file: BaseReader, *, check_crc: bool = False):
self._version = McapRecordParser.parse_magic_bytes(self._file)
logger.debug(f'MCAP version: {self._version}')

# Parse and store header
self._header = McapRecordParser.parse_header(self._file)
logger.debug(f'MCAP profile: {self._header.profile}')

footer = self.get_footer()

# Summary section start
Expand Down Expand Up @@ -227,6 +231,10 @@ def close(self) -> None:
"""Close the MCAP file and release all resources."""
self._file.close()

def get_header(self) -> HeaderRecord:
"""Get the header record from the MCAP file."""
return self._header

# Getters for records

def get_header(self) -> HeaderRecord:
Expand Down
Loading