Skip to content

Commit a616dcf

Browse files
Roughly working distributed encode
1 parent 24b1711 commit a616dcf

File tree

1 file changed

+55
-30
lines changed

1 file changed

+55
-30
lines changed

bio2zarr/vcf.py

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import logging
77
import math
8+
import os
89
import pathlib
910
import pickle
1011
import shutil
@@ -1592,7 +1593,8 @@ def fromdict(d):
15921593
f"{d['format_version']} != {VZW_METADATA_FORMAT_VERSION}"
15931594
)
15941595
ret = VcfZarrWriterMetadata(**d)
1595-
ret.schema = VcfZarrSchema(**ret.schema)
1596+
ret.schema = VcfZarrSchema.fromdict(ret.schema)
1597+
ret.partitions = [VcfZarrPartition(**p) for p in ret.partitions]
15961598
return ret
15971599

15981600

@@ -1751,35 +1753,28 @@ def load_metadata(self):
17511753

17521754
def encode_partition(self, partition, *, show_progress=False, worker_processes=1):
17531755
self.load_metadata()
1754-
partition_path = self.partitions_path / f"wip_{partition}"
1755-
# If the partition path exists already, keep going. Let's assume
1756-
# that it's an earlier failed attempt. Not worrying about having
1757-
# concurrent encode_partition runs happening for now, but I guess
1758-
# we could make the partition_path depend on a uuid or something
1759-
# to make sure it's unique.
1760-
logger.debug(f"Copying empty arrays in {self.arrays_path} to {partition_path}")
1761-
shutil.copytree(self.arrays_path, partition_path, dirs_exist_ok=True)
1762-
# self.root = partition_path
1763-
# self.encode_partition_slice([partition], show_progress=show_progress,
1764-
# worker_processes=worker_processes)
1765-
#
1766-
# NOTE not sure what to do here, started making some changes and
1767-
# got too tired.
1768-
1769-
# def get_array(self, name):
1770-
# return self.root[name]
1771-
1772-
# def finalise_array(self, variable_name):
1773-
# source = self.path / ("wip_" + variable_name)
1774-
# dest = self.path / variable_name
1775-
# # Atomic swap
1776-
# os.rename(source, dest)
1777-
# logger.info(f"Finalised {variable_name}")
1778-
1779-
def encode_array_partition(self, column, partition):
1756+
partition_path = self.partitions_path / f"{partition}"
1757+
partition_path.mkdir(exist_ok=True)
1758+
logger.debug(f"Creating partition dir {partition_path}")
1759+
1760+
for col in self.metadata.schema.columns.values():
1761+
if col.vcf_field is not None:
1762+
self.encode_array_partition(col, partition, partition_path)
1763+
1764+
def encode_array_partition(self, column, partition_index, partition_path):
1765+
wip_path = partition_path / f"wip_{column.name}"
1766+
final_path = partition_path / column.name
1767+
# Create an empty array like the definition
1768+
src = self.arrays_path / column.name
1769+
# TODO add overwrite here
1770+
shutil.copytree(src, wip_path)
1771+
1772+
array = zarr.open(wip_path)
1773+
logger.debug(f"Opened empty array {array}")
1774+
1775+
partition = self.metadata.partitions[partition_index]
1776+
ba = core.BufferedArray(array, partition.start_index)
17801777
source_col = self.icf.columns[column.vcf_field]
1781-
array = self.get_array(column.name)
1782-
ba = core.BufferedArray(array, start)
17831778
sanitiser = source_col.sanitiser_factory(ba.buff.shape)
17841779

17851780
for value in source_col.iter_values(
@@ -1790,11 +1785,41 @@ def encode_array_partition(self, column, partition):
17901785
j = ba.next_buffer_row()
17911786
sanitiser(ba.buff, j, value)
17921787
ba.flush()
1788+
1789+
# Atomic swap
1790+
os.rename(wip_path, final_path)
17931791
logger.debug(
1794-
f"Encoded {column.name} chunk slice "
1792+
f"Encoded {column.name} partition "
17951793
f"{partition.start_chunk}:{partition.stop_chunk}"
17961794
)
17971795

1796+
#######################
1797+
# finalise
1798+
#######################
1799+
1800+
def finalise_array(self, name):
1801+
logger.debug(f"Finalising {name}")
1802+
for partition in range(len(self.metadata.partitions)):
1803+
# Move all the files in partition dir to dest dir
1804+
1805+
partition_path = self.partitions_path / f"{partition}"
1806+
src = partition_path / name
1807+
dest = self.arrays_path / name
1808+
for chunk_file in list(src.iterdir()):
1809+
if not chunk_file.name.startswith("."):
1810+
# TODO check for a count of then number of files
1811+
os.rename(chunk_file, dest / chunk_file.name)
1812+
# Finally, once all the chunks have moved into the arrays dir,
1813+
# we move it out of wip
1814+
os.rename(self.arrays_path / name, self.path / name)
1815+
1816+
def finalise(self):
1817+
self.load_metadata()
1818+
1819+
for col in self.metadata.schema.columns.values():
1820+
if col.vcf_field is not None:
1821+
self.finalise_array(col.name)
1822+
17981823

17991824
# def encode_genotypes_slice(self, start, stop):
18001825
# source_col = self.icf.columns["FORMAT/GT"]

0 commit comments

Comments
 (0)