Skip to content

Commit e6b9a19

Browse files
Merge pull request #136 from jeromekelleher/distributed-encode
Distributed encode
2 parents fa2162d + 7835d3f commit e6b9a19

File tree

7 files changed

+706
-297
lines changed

7 files changed

+706
-297
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
# 0.0.6 2024-04-xx
1+
# 0.0.6 2024-04-24
22

33
- Only use NOSHUFFLE by default on ``call_genotype`` and bool arrays.
4+
- Add initial implementation of distributed encode
45

56
# 0.0.5 2024-04-17
67

bio2zarr/cli.py

Lines changed: 125 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import click
77
import coloredlogs
8+
import humanfriendly
89
import numcodecs
910
import tabulate
1011

@@ -39,6 +40,14 @@ def list_commands(self, ctx):
3940
"zarr_path", type=click.Path(file_okay=False, dir_okay=True)
4041
)
4142

43+
zarr_path = click.argument(
44+
"zarr_path", type=click.Path(exists=True, file_okay=False, dir_okay=True)
45+
)
46+
47+
num_partitions = click.argument("num_partitions", type=click.IntRange(min=1))
48+
49+
partition = click.argument("partition", type=click.IntRange(min=0))
50+
4251
verbose = click.option("-v", "--verbose", count=True, help="Increase verbosity")
4352

4453
force = click.option(
@@ -92,6 +101,27 @@ def list_commands(self, ctx):
92101
help="Chunk size in the samples dimension",
93102
)
94103

104+
schema = click.option("-s", "--schema", default=None, type=click.Path(exists=True))
105+
106+
max_variant_chunks = click.option(
107+
"-V",
108+
"--max-variant-chunks",
109+
type=int,
110+
default=None,
111+
help=(
112+
"Truncate the output in the variants dimension to have "
113+
"this number of chunks. Mainly intended to help with "
114+
"schema tuning."
115+
),
116+
)
117+
118+
max_memory = click.option(
119+
"-M",
120+
"--max-memory",
121+
default=None,
122+
help="An approximate bound on overall memory usage (e.g. 10G),",
123+
)
124+
95125

96126
def setup_logging(verbosity):
97127
level = "WARNING"
@@ -158,7 +188,7 @@ def explode(
158188
@click.command
159189
@vcfs
160190
@new_icf_path
161-
@click.argument("num_partitions", type=click.IntRange(min=1))
191+
@num_partitions
162192
@force
163193
@column_chunk_size
164194
@compressor
@@ -194,7 +224,7 @@ def dexplode_init(
194224

195225
@click.command
196226
@icf_path
197-
@click.argument("partition", type=click.IntRange(min=0))
227+
@partition
198228
@verbose
199229
def dexplode_partition(icf_path, partition, verbose):
200230
"""
@@ -207,14 +237,14 @@ def dexplode_partition(icf_path, partition, verbose):
207237

208238

209239
@click.command
210-
@click.argument("path", type=click.Path(), required=True)
240+
@icf_path
211241
@verbose
212-
def dexplode_finalise(path, verbose):
242+
def dexplode_finalise(icf_path, verbose):
213243
"""
214244
Final step for distributed conversion of VCF(s) to intermediate columnar format.
215245
"""
216246
setup_logging(verbose)
217-
vcf.explode_finalise(path)
247+
vcf.explode_finalise(icf_path)
218248

219249

220250
@click.command
@@ -244,26 +274,11 @@ def mkschema(icf_path):
244274
@new_zarr_path
245275
@force
246276
@verbose
247-
@click.option("-s", "--schema", default=None, type=click.Path(exists=True))
277+
@schema
248278
@variants_chunk_size
249279
@samples_chunk_size
250-
@click.option(
251-
"-V",
252-
"--max-variant-chunks",
253-
type=int,
254-
default=None,
255-
help=(
256-
"Truncate the output in the variants dimension to have "
257-
"this number of chunks. Mainly intended to help with "
258-
"schema tuning."
259-
),
260-
)
261-
@click.option(
262-
"-M",
263-
"--max-memory",
264-
default=None,
265-
help="An approximate bound on overall memory usage (e.g. 10G),",
266-
)
280+
@max_variant_chunks
281+
@max_memory
267282
@worker_processes
268283
def encode(
269284
icf_path,
@@ -288,13 +303,96 @@ def encode(
288303
schema_path=schema,
289304
variants_chunk_size=variants_chunk_size,
290305
samples_chunk_size=samples_chunk_size,
291-
max_v_chunks=max_variant_chunks,
306+
max_variant_chunks=max_variant_chunks,
292307
worker_processes=worker_processes,
293308
max_memory=max_memory,
294309
show_progress=True,
295310
)
296311

297312

313+
@click.command
314+
@icf_path
315+
@new_zarr_path
316+
@num_partitions
317+
@force
318+
@schema
319+
@variants_chunk_size
320+
@samples_chunk_size
321+
@max_variant_chunks
322+
@verbose
323+
def dencode_init(
324+
icf_path,
325+
zarr_path,
326+
num_partitions,
327+
force,
328+
schema,
329+
variants_chunk_size,
330+
samples_chunk_size,
331+
max_variant_chunks,
332+
verbose,
333+
):
334+
"""
335+
Initialise conversion of intermediate format to VCF Zarr. This will
336+
set up the specified ZARR_PATH to perform this conversion over
337+
NUM_PARTITIONS.
338+
339+
The output of this commmand is the actual number of partitions generated
340+
(which may be less then the requested number, if there is not sufficient
341+
chunks in the variants dimension) and a rough lower-bound on the amount
342+
of memory required to encode a partition.
343+
344+
NOTE: the format of this output will likely change in subsequent releases;
345+
it should not be considered machine-readable for now.
346+
"""
347+
setup_logging(verbose)
348+
check_overwrite_dir(zarr_path, force)
349+
num_partitions, max_memory = vcf.encode_init(
350+
icf_path,
351+
zarr_path,
352+
target_num_partitions=num_partitions,
353+
schema_path=schema,
354+
variants_chunk_size=variants_chunk_size,
355+
samples_chunk_size=samples_chunk_size,
356+
max_variant_chunks=max_variant_chunks,
357+
show_progress=True,
358+
)
359+
formatted_size = humanfriendly.format_size(max_memory, binary=True)
360+
# NOTE adding the size to the stdout here so that users can parse it
361+
# and use in their submission scripts. This is a first pass, and
362+
# will most likely change as we see what works and doesn't.
363+
# NOTE we probably want to format this as a table, which lists
364+
# some other properties, line by line
365+
# NOTE This size number is also not quite enough, you need a bit of
366+
# headroom with it (probably 10% or so). We should include this.
367+
click.echo(f"{num_partitions}\t{formatted_size}")
368+
369+
370+
@click.command
371+
@zarr_path
372+
@partition
373+
@verbose
374+
def dencode_partition(zarr_path, partition, verbose):
375+
"""
376+
Convert a partition from intermediate columnar format to VCF Zarr.
377+
Must be called *after* the Zarr path has been initialised with dencode_init.
378+
Partition indexes must be from 0 (inclusive) to the number of paritions
379+
returned by dencode_init (exclusive).
380+
"""
381+
setup_logging(verbose)
382+
vcf.encode_partition(zarr_path, partition)
383+
384+
385+
@click.command
386+
@zarr_path
387+
@verbose
388+
def dencode_finalise(zarr_path, verbose):
389+
"""
390+
Final step for distributed conversion of ICF to VCF Zarr.
391+
"""
392+
setup_logging(verbose)
393+
vcf.encode_finalise(zarr_path, show_progress=True)
394+
395+
298396
@click.command(name="convert")
299397
@vcfs
300398
@new_zarr_path
@@ -382,6 +480,9 @@ def vcf2zarr():
382480
vcf2zarr.add_command(dexplode_init)
383481
vcf2zarr.add_command(dexplode_partition)
384482
vcf2zarr.add_command(dexplode_finalise)
483+
vcf2zarr.add_command(dencode_init)
484+
vcf2zarr.add_command(dencode_partition)
485+
vcf2zarr.add_command(dencode_finalise)
385486

386487

387488
@click.command(name="convert")

bio2zarr/core.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def flush(self):
110110
sync_flush_2d_array(
111111
self.buff[: self.buffer_row], self.array, self.array_offset
112112
)
113+
# FIXME the array.name doesn't seem to be working here for some reason
113114
logger.debug(
114115
f"Flushed <{self.array.name} {self.array.shape} "
115116
f"{self.array.dtype}> "
@@ -131,8 +132,7 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset):
131132
# encoder implementations.
132133
s = slice(offset, offset + np_buffer.shape[0])
133134
samples_chunk_size = zarr_array.chunks[1]
134-
# TODO use zarr chunks here to support non-uniform chunking later
135-
# and for simplicity
135+
# TODO use zarr chunks here for simplicity
136136
zarr_array_width = zarr_array.shape[1]
137137
start = 0
138138
while start < zarr_array_width:
@@ -192,7 +192,7 @@ def __init__(self, worker_processes=1, progress_config=None):
192192
self.progress_config = progress_config
193193
self.progress_bar = tqdm.tqdm(
194194
total=progress_config.total,
195-
desc=f"{progress_config.title:>7}",
195+
desc=f"{progress_config.title:>8}",
196196
unit_scale=True,
197197
unit=progress_config.units,
198198
smoothing=0.1,

0 commit comments

Comments
 (0)