diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d704db0..201db661 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# 0.0.10 2024-05-XX +- Change output format of dexplode-init and dencode-init + # 0.0.9 2024-05-02 - Change on-disk format for explode and schema diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index 670a323f..7e7aabbd 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -5,7 +5,6 @@ import click import coloredlogs -import humanfriendly import numcodecs import tabulate @@ -65,6 +64,13 @@ def list_commands(self, ctx): help="Partition indexes are interpreted as one-based", ) +json = click.option( + "--json", + is_flag=True, + flag_value=True, + help="Output summary data in JSON format", +) + version = click.version_option(version=f"{provenance.__version__}") worker_processes = click.option( @@ -166,6 +172,16 @@ def get_compressor(cname): return numcodecs.get_codec(config) +def show_work_summary(work_summary, json): + if json: + output = work_summary.asjson() + else: + data = work_summary.asdict() + output = tabulate.tabulate(list(data.items()), tablefmt="plain") + # output = "\n".join(f"{k}\t{v}" for k, v in data.items()) + click.echo(output) + + @click.command @vcfs @new_icf_path @@ -199,6 +215,7 @@ def explode( @force @column_chunk_size @compressor +@json @verbose @worker_processes def dexplode_init( @@ -208,6 +225,7 @@ def dexplode_init( force, column_chunk_size, compressor, + json, verbose, worker_processes, ): @@ -217,7 +235,7 @@ def dexplode_init( """ setup_logging(verbose) check_overwrite_dir(icf_path, force) - num_partitions = vcf.explode_init( + work_summary = vcf.explode_init( icf_path, vcfs, target_num_partitions=num_partitions, @@ -226,7 +244,7 @@ def dexplode_init( compressor=get_compressor(compressor), show_progress=True, ) - click.echo(num_partitions) + show_work_summary(work_summary, json) @click.command @@ -331,6 +349,7 @@ def encode( @variants_chunk_size @samples_chunk_size @max_variant_chunks +@json @verbose def dencode_init( icf_path, @@ -341,6 +360,7 @@ def dencode_init( variants_chunk_size, samples_chunk_size, max_variant_chunks, + json, verbose, ): """ @@ -358,7 +378,7 @@ def dencode_init( """ setup_logging(verbose) check_overwrite_dir(zarr_path, force) - num_partitions, max_memory = vcf.encode_init( + work_summary = vcf.encode_init( icf_path, zarr_path, target_num_partitions=num_partitions, @@ -368,15 +388,7 @@ def dencode_init( max_variant_chunks=max_variant_chunks, show_progress=True, ) - formatted_size = humanfriendly.format_size(max_memory, binary=True) - # NOTE adding the size to the stdout here so that users can parse it - # and use in their submission scripts. This is a first pass, and - # will most likely change as we see what works and doesn't. - # NOTE we probably want to format this as a table, which lists - # some other properties, line by line - # NOTE This size number is also not quite enough, you need a bit of - # headroom with it (probably 10% or so). We should include this. - click.echo(f"{num_partitions}\t{formatted_size}") + show_work_summary(work_summary, json) @click.command diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 40af85de..f333c0ed 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -204,6 +204,10 @@ def num_contigs(self): def num_filters(self): return len(self.filters) + @property + def num_samples(self): + return len(self.samples) + @staticmethod def fromdict(d): if d["format_version"] != ICF_METADATA_FORMAT_VERSION: @@ -982,6 +986,19 @@ def check_field_clobbering(icf_metadata): ) +@dataclasses.dataclass +class IcfWriteSummary: + num_partitions: int + num_samples: int + num_variants: int + + def asdict(self): + return dataclasses.asdict(self) + + def asjson(self): + return json.dumps(self.asdict(), indent=4) + + class IntermediateColumnarFormatWriter: def __init__(self, path): self.path = pathlib.Path(path) @@ -1038,7 +1055,11 @@ def init( logger.info("Writing WIP metadata") with open(self.wip_path / "metadata.json", "w") as f: json.dump(self.metadata.asdict(), f, indent=4) - return self.num_partitions + return IcfWriteSummary( + num_partitions=self.num_partitions, + num_variants=icf_metadata.num_records, + num_samples=icf_metadata.num_samples, + ) def mkdirs(self): num_dirs = len(self.metadata.fields) @@ -1371,6 +1392,7 @@ def variant_chunk_nbytes(self): """ Returns the nbytes for a single variant chunk of this array. """ + # TODO WARNING IF this is a string chunk_items = self.chunks[0] for size in self.shape[1:]: chunk_items *= size @@ -1643,6 +1665,21 @@ def fromdict(d): return ret +@dataclasses.dataclass +class VcfZarrWriteSummary: + num_partitions: int + num_samples: int + num_variants: int + num_chunks: int + max_encoding_memory: str + + def asdict(self): + return dataclasses.asdict(self) + + def asjson(self): + return json.dumps(self.asdict(), indent=4) + + class VcfZarrWriter: def __init__(self, path): self.path = pathlib.Path(path) @@ -1718,13 +1755,22 @@ def init( store = zarr.DirectoryStore(self.arrays_path) root = zarr.group(store=store) - for column in self.schema.fields.values(): - self.init_array(root, column, partitions[-1].stop) + total_chunks = 0 + for field in self.schema.fields.values(): + a = self.init_array(root, field, partitions[-1].stop) + total_chunks += a.nchunks logger.info("Writing WIP metadata") with open(self.wip_path / "metadata.json", "w") as f: json.dump(self.metadata.asdict(), f, indent=4) - return len(partitions) + + return VcfZarrWriteSummary( + num_variants=self.icf.num_records, + num_samples=self.icf.num_samples, + num_partitions=self.num_partitions, + num_chunks=total_chunks, + max_encoding_memory=display_size(self.get_max_encoding_memory()), + ) def encode_samples(self, root): if self.schema.samples != self.icf.metadata.samples: @@ -1794,6 +1840,7 @@ def init_array(self, root, variable, variants_dim_size): } ) logger.debug(f"Initialised {a}") + return a ####################### # encode_partition @@ -2062,6 +2109,9 @@ def get_max_encoding_memory(self): """ Return the approximate maximum memory used to encode a variant chunk. """ + # NOTE This size number is also not quite enough, you need a bit of + # headroom with it (probably 10% or so). We should include this. + # FIXME this is actively wrong for String columns. See if we can do better. max_encoding_mem = max( col.variant_chunk_nbytes for col in self.schema.fields.values() ) @@ -2190,14 +2240,13 @@ def encode_init( schema = VcfZarrSchema.fromjson(f.read()) zarr_path = pathlib.Path(zarr_path) vzw = VcfZarrWriter(zarr_path) - vzw.init( + return vzw.init( icf, target_num_partitions=target_num_partitions, schema=schema, dimension_separator=dimension_separator, max_variant_chunks=max_variant_chunks, ) - return vzw.num_partitions, vzw.get_max_encoding_memory() def encode_partition(zarr_path, partition): diff --git a/tests/test_cli.py b/tests/test_cli.py index f37a3fc7..1e07b0ca 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,5 @@ +import dataclasses +import json from unittest import mock import click.testing as ct @@ -46,6 +48,17 @@ DEFAULT_DENCODE_FINALISE_ARGS = dict(show_progress=True) +@dataclasses.dataclass +class FakeWorkSummary: + num_partitions: int + + def asdict(self): + return dataclasses.asdict(self) + + def asjson(self): + return json.dumps(self.asdict()) + + class TestWithMocks: vcf_path = "tests/data/vcf/sample.vcf.gz" @@ -262,7 +275,7 @@ def test_vcf_explode_missing_and_existing_vcf(self, mocked, tmp_path): assert "'no_such_file' does not exist" in result.stderr mocked.assert_not_called() - @mock.patch("bio2zarr.vcf.explode_init", return_value=5) + @mock.patch("bio2zarr.vcf.explode_init", return_value=FakeWorkSummary(5)) def test_vcf_dexplode_init(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -273,7 +286,7 @@ def test_vcf_dexplode_init(self, mocked, tmp_path): ) assert result.exit_code == 0 assert len(result.stderr) == 0 - assert result.stdout == "5\n" + assert list(result.stdout.split()) == ["num_partitions", "5"] mocked.assert_called_once_with( str(icf_path), (self.vcf_path,), @@ -415,7 +428,7 @@ def test_encode(self, mocked, tmp_path): **DEFAULT_ENCODE_ARGS, ) - @mock.patch("bio2zarr.vcf.encode_init", return_value=(10, 1024)) + @mock.patch("bio2zarr.vcf.encode_init", return_value=FakeWorkSummary(10)) def test_dencode_init(self, mocked, tmp_path): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -427,7 +440,7 @@ def test_dencode_init(self, mocked, tmp_path): catch_exceptions=False, ) assert result.exit_code == 0 - assert result.stdout == "10\t1 KiB\n" + assert list(result.stdout.split()) == ["num_partitions", "10"] assert len(result.stderr) == 0 mocked.assert_called_once_with( str(icf_path), @@ -529,11 +542,11 @@ def test_dexplode(self, tmp_path, one_based): runner = ct.CliRunner(mix_stderr=False) result = runner.invoke( cli.vcf2zarr, - f"dexplode-init {self.vcf_path} {icf_path} 5", + f"dexplode-init {self.vcf_path} {icf_path} 5 --json", catch_exceptions=False, ) assert result.exit_code == 0 - assert result.stdout.strip() == "3" + assert json.loads(result.stdout)["num_partitions"] == 3 for j in range(3): if one_based: @@ -598,11 +611,11 @@ def test_dencode(self, tmp_path, one_based): assert result.exit_code == 0 result = runner.invoke( cli.vcf2zarr, - f"dencode-init {icf_path} {zarr_path} 5 --variants-chunk-size=3", + f"dencode-init {icf_path} {zarr_path} 5 --variants-chunk-size=3 --json", catch_exceptions=False, ) assert result.exit_code == 0 - assert result.stdout.split()[0] == "3" + assert json.loads(result.stdout)["num_partitions"] == 3 for j in range(3): if one_based: diff --git a/tests/test_icf.py b/tests/test_icf.py index 2cc21432..29e1eae0 100644 --- a/tests/test_icf.py +++ b/tests/test_icf.py @@ -105,8 +105,8 @@ class TestIcfWriterExample: def test_init_paths(self, tmp_path): icf_path = tmp_path / "x.icf" assert not icf_path.exists() - num_partitions = vcf.explode_init(icf_path, [self.data_path]) - assert num_partitions == 3 + summary = vcf.explode_init(icf_path, [self.data_path]) + assert summary.num_partitions == 3 assert icf_path.exists() wip_path = icf_path / "wip" assert wip_path.exists() @@ -118,9 +118,9 @@ def test_init_paths(self, tmp_path): def test_finalise_paths(self, tmp_path): icf_path = tmp_path / "x.icf" wip_path = icf_path / "wip" - num_partitions = vcf.explode_init(icf_path, [self.data_path]) + summary = vcf.explode_init(icf_path, [self.data_path]) assert icf_path.exists() - for j in range(num_partitions): + for j in range(summary.num_partitions): vcf.explode_partition(icf_path, j) assert wip_path.exists() vcf.explode_finalise(icf_path) @@ -270,8 +270,8 @@ def run_explode(self, tmp_path, **kwargs): def run_dexplode(self, tmp_path, **kwargs): icf_path = tmp_path / "icf" - partitions = vcf.explode_init(icf_path, [self.data_path], **kwargs) - for j in range(partitions): + summary = vcf.explode_init(icf_path, [self.data_path], **kwargs) + for j in range(summary.num_partitions): vcf.explode_partition(icf_path, j) vcf.explode_finalise(icf_path) return vcf.IntermediateColumnarFormat(icf_path) diff --git a/tests/test_vcf.py b/tests/test_vcf.py index cdfddd78..2cdf6c5f 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -420,10 +420,8 @@ class TestVcfZarrWriterExample: def test_init_paths(self, icf_path, tmp_path): zarr_path = tmp_path / "x.zarr" assert not zarr_path.exists() - num_partitions, _ = vcf.encode_init( - icf_path, zarr_path, 7, variants_chunk_size=3 - ) - assert num_partitions == 3 + summary = vcf.encode_init(icf_path, zarr_path, 7, variants_chunk_size=3) + assert summary.num_partitions == 3 assert zarr_path.exists() wip_path = zarr_path / "wip" assert wip_path.exists() @@ -442,12 +440,10 @@ def test_init_paths(self, icf_path, tmp_path): def test_finalise_paths(self, icf_path, tmp_path): zarr_path = tmp_path / "x.zarr" assert not zarr_path.exists() - num_partitions, _ = vcf.encode_init( - icf_path, zarr_path, 7, variants_chunk_size=3 - ) + summary = vcf.encode_init(icf_path, zarr_path, 7, variants_chunk_size=3) wip_path = zarr_path / "wip" assert wip_path.exists() - for j in range(num_partitions): + for j in range(summary.num_partitions): vcf.encode_partition(zarr_path, j) assert (wip_path / "partitions" / f"p{j}").exists() vcf.encode_finalise(zarr_path) @@ -531,7 +527,7 @@ def generate_vcf(self, path, info_field=None, format_field=None, num_rows=1): pos = str(k + 1) print("\t".join(["1", pos, "A", "T", ".", ".", ".", "."]), file=out) - print(open(path).read()) + # print(open(path).read()) # This also compresses the input file pysam.tabix_index(str(path), preset="vcf") diff --git a/tests/test_vcf_examples.py b/tests/test_vcf_examples.py index 909efc72..e6a73121 100644 --- a/tests/test_vcf_examples.py +++ b/tests/test_vcf_examples.py @@ -872,13 +872,13 @@ def test_split_explode(tmp_path): "tests/data/vcf/sample.vcf.gz.3.split/X.vcf.gz", ] out = tmp_path / "test.explode" - num_partitions = vcf.explode_init(out, paths, target_num_partitions=15) - assert num_partitions == 3 + work_summary = vcf.explode_init(out, paths, target_num_partitions=15) + assert work_summary.num_partitions == 3 with pytest.raises(FileNotFoundError): pcvcf = vcf.IntermediateColumnarFormat(out) - for j in range(num_partitions): + for j in range(work_summary.num_partitions): vcf.explode_partition(out, j) vcf.explode_finalise(out) pcvcf = vcf.IntermediateColumnarFormat(out)