Skip to content

Improved info interface on encode/explode init #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.0.10 2024-05-XX
- Change output format of dexplode-init and dencode-init

# 0.0.9 2024-05-02

- Change on-disk format for explode and schema
Expand Down
38 changes: 25 additions & 13 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import click
import coloredlogs
import humanfriendly
import numcodecs
import tabulate

Expand Down Expand Up @@ -65,6 +64,13 @@ def list_commands(self, ctx):
help="Partition indexes are interpreted as one-based",
)

json = click.option(
"--json",
is_flag=True,
flag_value=True,
help="Output summary data in JSON format",
)

version = click.version_option(version=f"{provenance.__version__}")

worker_processes = click.option(
Expand Down Expand Up @@ -166,6 +172,16 @@ def get_compressor(cname):
return numcodecs.get_codec(config)


def show_work_summary(work_summary, json):
if json:
output = work_summary.asjson()
else:
data = work_summary.asdict()
output = tabulate.tabulate(list(data.items()), tablefmt="plain")
# output = "\n".join(f"{k}\t{v}" for k, v in data.items())
click.echo(output)


@click.command
@vcfs
@new_icf_path
Expand Down Expand Up @@ -199,6 +215,7 @@ def explode(
@force
@column_chunk_size
@compressor
@json
@verbose
@worker_processes
def dexplode_init(
Expand All @@ -208,6 +225,7 @@ def dexplode_init(
force,
column_chunk_size,
compressor,
json,
verbose,
worker_processes,
):
Expand All @@ -217,7 +235,7 @@ def dexplode_init(
"""
setup_logging(verbose)
check_overwrite_dir(icf_path, force)
num_partitions = vcf.explode_init(
work_summary = vcf.explode_init(
icf_path,
vcfs,
target_num_partitions=num_partitions,
Expand All @@ -226,7 +244,7 @@ def dexplode_init(
compressor=get_compressor(compressor),
show_progress=True,
)
click.echo(num_partitions)
show_work_summary(work_summary, json)


@click.command
Expand Down Expand Up @@ -331,6 +349,7 @@ def encode(
@variants_chunk_size
@samples_chunk_size
@max_variant_chunks
@json
@verbose
def dencode_init(
icf_path,
Expand All @@ -341,6 +360,7 @@ def dencode_init(
variants_chunk_size,
samples_chunk_size,
max_variant_chunks,
json,
verbose,
):
"""
Expand All @@ -358,7 +378,7 @@ def dencode_init(
"""
setup_logging(verbose)
check_overwrite_dir(zarr_path, force)
num_partitions, max_memory = vcf.encode_init(
work_summary = vcf.encode_init(
icf_path,
zarr_path,
target_num_partitions=num_partitions,
Expand All @@ -368,15 +388,7 @@ def dencode_init(
max_variant_chunks=max_variant_chunks,
show_progress=True,
)
formatted_size = humanfriendly.format_size(max_memory, binary=True)
# NOTE adding the size to the stdout here so that users can parse it
# and use in their submission scripts. This is a first pass, and
# will most likely change as we see what works and doesn't.
# NOTE we probably want to format this as a table, which lists
# some other properties, line by line
# NOTE This size number is also not quite enough, you need a bit of
# headroom with it (probably 10% or so). We should include this.
click.echo(f"{num_partitions}\t{formatted_size}")
show_work_summary(work_summary, json)


@click.command
Expand Down
61 changes: 55 additions & 6 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ def num_contigs(self):
def num_filters(self):
return len(self.filters)

@property
def num_samples(self):
return len(self.samples)

@staticmethod
def fromdict(d):
if d["format_version"] != ICF_METADATA_FORMAT_VERSION:
Expand Down Expand Up @@ -982,6 +986,19 @@ def check_field_clobbering(icf_metadata):
)


@dataclasses.dataclass
class IcfWriteSummary:
num_partitions: int
num_samples: int
num_variants: int

def asdict(self):
return dataclasses.asdict(self)

def asjson(self):
return json.dumps(self.asdict(), indent=4)


class IntermediateColumnarFormatWriter:
def __init__(self, path):
self.path = pathlib.Path(path)
Expand Down Expand Up @@ -1038,7 +1055,11 @@ def init(
logger.info("Writing WIP metadata")
with open(self.wip_path / "metadata.json", "w") as f:
json.dump(self.metadata.asdict(), f, indent=4)
return self.num_partitions
return IcfWriteSummary(
num_partitions=self.num_partitions,
num_variants=icf_metadata.num_records,
num_samples=icf_metadata.num_samples,
)

def mkdirs(self):
num_dirs = len(self.metadata.fields)
Expand Down Expand Up @@ -1371,6 +1392,7 @@ def variant_chunk_nbytes(self):
"""
Returns the nbytes for a single variant chunk of this array.
"""
# TODO WARNING IF this is a string
chunk_items = self.chunks[0]
for size in self.shape[1:]:
chunk_items *= size
Expand Down Expand Up @@ -1643,6 +1665,21 @@ def fromdict(d):
return ret


@dataclasses.dataclass
class VcfZarrWriteSummary:
num_partitions: int
num_samples: int
num_variants: int
num_chunks: int
max_encoding_memory: str

def asdict(self):
return dataclasses.asdict(self)

def asjson(self):
return json.dumps(self.asdict(), indent=4)


class VcfZarrWriter:
def __init__(self, path):
self.path = pathlib.Path(path)
Expand Down Expand Up @@ -1718,13 +1755,22 @@ def init(
store = zarr.DirectoryStore(self.arrays_path)
root = zarr.group(store=store)

for column in self.schema.fields.values():
self.init_array(root, column, partitions[-1].stop)
total_chunks = 0
for field in self.schema.fields.values():
a = self.init_array(root, field, partitions[-1].stop)
total_chunks += a.nchunks

logger.info("Writing WIP metadata")
with open(self.wip_path / "metadata.json", "w") as f:
json.dump(self.metadata.asdict(), f, indent=4)
return len(partitions)

return VcfZarrWriteSummary(
num_variants=self.icf.num_records,
num_samples=self.icf.num_samples,
num_partitions=self.num_partitions,
num_chunks=total_chunks,
max_encoding_memory=display_size(self.get_max_encoding_memory()),
)

def encode_samples(self, root):
if self.schema.samples != self.icf.metadata.samples:
Expand Down Expand Up @@ -1794,6 +1840,7 @@ def init_array(self, root, variable, variants_dim_size):
}
)
logger.debug(f"Initialised {a}")
return a

#######################
# encode_partition
Expand Down Expand Up @@ -2062,6 +2109,9 @@ def get_max_encoding_memory(self):
"""
Return the approximate maximum memory used to encode a variant chunk.
"""
# NOTE This size number is also not quite enough, you need a bit of
# headroom with it (probably 10% or so). We should include this.
# FIXME this is actively wrong for String columns. See if we can do better.
max_encoding_mem = max(
col.variant_chunk_nbytes for col in self.schema.fields.values()
)
Expand Down Expand Up @@ -2190,14 +2240,13 @@ def encode_init(
schema = VcfZarrSchema.fromjson(f.read())
zarr_path = pathlib.Path(zarr_path)
vzw = VcfZarrWriter(zarr_path)
vzw.init(
return vzw.init(
icf,
target_num_partitions=target_num_partitions,
schema=schema,
dimension_separator=dimension_separator,
max_variant_chunks=max_variant_chunks,
)
return vzw.num_partitions, vzw.get_max_encoding_memory()


def encode_partition(zarr_path, partition):
Expand Down
29 changes: 21 additions & 8 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import dataclasses
import json
from unittest import mock

import click.testing as ct
Expand Down Expand Up @@ -46,6 +48,17 @@
DEFAULT_DENCODE_FINALISE_ARGS = dict(show_progress=True)


@dataclasses.dataclass
class FakeWorkSummary:
num_partitions: int

def asdict(self):
return dataclasses.asdict(self)

def asjson(self):
return json.dumps(self.asdict())


class TestWithMocks:
vcf_path = "tests/data/vcf/sample.vcf.gz"

Expand Down Expand Up @@ -262,7 +275,7 @@ def test_vcf_explode_missing_and_existing_vcf(self, mocked, tmp_path):
assert "'no_such_file' does not exist" in result.stderr
mocked.assert_not_called()

@mock.patch("bio2zarr.vcf.explode_init", return_value=5)
@mock.patch("bio2zarr.vcf.explode_init", return_value=FakeWorkSummary(5))
def test_vcf_dexplode_init(self, mocked, tmp_path):
runner = ct.CliRunner(mix_stderr=False)
icf_path = tmp_path / "icf"
Expand All @@ -273,7 +286,7 @@ def test_vcf_dexplode_init(self, mocked, tmp_path):
)
assert result.exit_code == 0
assert len(result.stderr) == 0
assert result.stdout == "5\n"
assert list(result.stdout.split()) == ["num_partitions", "5"]
mocked.assert_called_once_with(
str(icf_path),
(self.vcf_path,),
Expand Down Expand Up @@ -415,7 +428,7 @@ def test_encode(self, mocked, tmp_path):
**DEFAULT_ENCODE_ARGS,
)

@mock.patch("bio2zarr.vcf.encode_init", return_value=(10, 1024))
@mock.patch("bio2zarr.vcf.encode_init", return_value=FakeWorkSummary(10))
def test_dencode_init(self, mocked, tmp_path):
icf_path = tmp_path / "icf"
icf_path.mkdir()
Expand All @@ -427,7 +440,7 @@ def test_dencode_init(self, mocked, tmp_path):
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.stdout == "10\t1 KiB\n"
assert list(result.stdout.split()) == ["num_partitions", "10"]
assert len(result.stderr) == 0
mocked.assert_called_once_with(
str(icf_path),
Expand Down Expand Up @@ -529,11 +542,11 @@ def test_dexplode(self, tmp_path, one_based):
runner = ct.CliRunner(mix_stderr=False)
result = runner.invoke(
cli.vcf2zarr,
f"dexplode-init {self.vcf_path} {icf_path} 5",
f"dexplode-init {self.vcf_path} {icf_path} 5 --json",
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.stdout.strip() == "3"
assert json.loads(result.stdout)["num_partitions"] == 3

for j in range(3):
if one_based:
Expand Down Expand Up @@ -598,11 +611,11 @@ def test_dencode(self, tmp_path, one_based):
assert result.exit_code == 0
result = runner.invoke(
cli.vcf2zarr,
f"dencode-init {icf_path} {zarr_path} 5 --variants-chunk-size=3",
f"dencode-init {icf_path} {zarr_path} 5 --variants-chunk-size=3 --json",
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.stdout.split()[0] == "3"
assert json.loads(result.stdout)["num_partitions"] == 3

for j in range(3):
if one_based:
Expand Down
12 changes: 6 additions & 6 deletions tests/test_icf.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class TestIcfWriterExample:
def test_init_paths(self, tmp_path):
icf_path = tmp_path / "x.icf"
assert not icf_path.exists()
num_partitions = vcf.explode_init(icf_path, [self.data_path])
assert num_partitions == 3
summary = vcf.explode_init(icf_path, [self.data_path])
assert summary.num_partitions == 3
assert icf_path.exists()
wip_path = icf_path / "wip"
assert wip_path.exists()
Expand All @@ -118,9 +118,9 @@ def test_init_paths(self, tmp_path):
def test_finalise_paths(self, tmp_path):
icf_path = tmp_path / "x.icf"
wip_path = icf_path / "wip"
num_partitions = vcf.explode_init(icf_path, [self.data_path])
summary = vcf.explode_init(icf_path, [self.data_path])
assert icf_path.exists()
for j in range(num_partitions):
for j in range(summary.num_partitions):
vcf.explode_partition(icf_path, j)
assert wip_path.exists()
vcf.explode_finalise(icf_path)
Expand Down Expand Up @@ -270,8 +270,8 @@ def run_explode(self, tmp_path, **kwargs):

def run_dexplode(self, tmp_path, **kwargs):
icf_path = tmp_path / "icf"
partitions = vcf.explode_init(icf_path, [self.data_path], **kwargs)
for j in range(partitions):
summary = vcf.explode_init(icf_path, [self.data_path], **kwargs)
for j in range(summary.num_partitions):
vcf.explode_partition(icf_path, j)
vcf.explode_finalise(icf_path)
return vcf.IntermediateColumnarFormat(icf_path)
Expand Down
Loading
Loading