@@ -1752,37 +1752,49 @@ def load_metadata(self):
17521752 def encode_partition (self , partition , * , show_progress = False , worker_processes = 1 ):
17531753 self .load_metadata ()
17541754 partition_path = self .partitions_path / f"wip_{ partition } "
1755- partition_path .mkdir ()
1756- # Now - copy the arrays from arrays_path to partition_path somehow (shutil?)
1757- # and then set self.root to that parition_path.
1758- # Then we should be able to encode just the slice for this partition
1759- # into those arrays. Once that's fully completed, we rename the
1760- # partition_path to just {partition}
1761-
1762-
1763- # def get_array(self, name):
1764- # return self.root["wip_" + name]
1765-
1766- # def finalise_array(self, variable_name):
1767- # source = self.path / ("wip_" + variable_name)
1768- # dest = self.path / variable_name
1769- # # Atomic swap
1770- # os.rename(source, dest)
1771- # logger.info(f"Finalised {variable_name}")
1772-
1773- # def encode_array_slice(self, column, start, stop):
1774- # source_col = self.icf.columns[column.vcf_field]
1775- # array = self.get_array(column.name)
1776- # ba = core.BufferedArray(array, start)
1777- # sanitiser = source_col.sanitiser_factory(ba.buff.shape)
1755+ # If the partition path exists already, keep going. Let's assume
1756+ # that it's an earlier failed attempt. Not worrying about having
1757+ # concurrent encode_partition runs happening for now, but I guess
1758+ # we could make the partition_path depend on a uuid or something
1759+ # to make sure it's unique.
1760+ logger .debug (f"Copying empty arrays in { self .arrays_path } to { partition_path } " )
1761+ shutil .copytree (self .arrays_path , partition_path , dirs_exist_ok = True )
1762+ # self.root = partition_path
1763+ # self.encode_partition_slice([partition], show_progress=show_progress,
1764+ # worker_processes=worker_processes)
1765+ #
1766+ # NOTE not sure what to do here, started making some changes and
1767+ # got too tired.
1768+
1769+ # def get_array(self, name):
1770+ # return self.root[name]
1771+
1772+ # def finalise_array(self, variable_name):
1773+ # source = self.path / ("wip_" + variable_name)
1774+ # dest = self.path / variable_name
1775+ # # Atomic swap
1776+ # os.rename(source, dest)
1777+ # logger.info(f"Finalised {variable_name}")
1778+
1779+ def encode_array_partition (self , column , partition ):
1780+ source_col = self .icf .columns [column .vcf_field ]
1781+ array = self .get_array (column .name )
1782+ ba = core .BufferedArray (array , start )
1783+ sanitiser = source_col .sanitiser_factory (ba .buff .shape )
1784+
1785+ for value in source_col .iter_values (
1786+ partition .start_index , partition .stop_index
1787+ ):
1788+ # We write directly into the buffer in the sanitiser function
1789+ # to make it easier to reason about dimension padding
1790+ j = ba .next_buffer_row ()
1791+ sanitiser (ba .buff , j , value )
1792+ ba .flush ()
1793+ logger .debug (
1794+ f"Encoded { column .name } chunk slice "
1795+ f"{ partition .start_chunk } :{ partition .stop_chunk } "
1796+ )
17781797
1779- # for value in source_col.iter_values(start, stop):
1780- # # We write directly into the buffer in the sanitiser function
1781- # # to make it easier to reason about dimension padding
1782- # j = ba.next_buffer_row()
1783- # sanitiser(ba.buff, j, value)
1784- # ba.flush()
1785- # logger.debug(f"Encoded {column.name} slice {start}:{stop}")
17861798
17871799# def encode_genotypes_slice(self, start, stop):
17881800# source_col = self.icf.columns["FORMAT/GT"]
0 commit comments