Skip to content

Commit 2022c04

Browse files
committed
fix metadata refresh issues
1 parent ac5ae5e commit 2022c04

File tree

3 files changed

+80
-61
lines changed

3 files changed

+80
-61
lines changed

docs/tutorial.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,14 @@ which can be used to append data to any axis. E.g.::
155155
compressor: Blosc(cname='lz4', clevel=5, shuffle=1)
156156
store: dict
157157
>>> z.append(a)
158+
(20000, 1000)
158159
>>> z
159160
Array((20000, 1000), int32, chunks=(1000, 100), order=C)
160161
nbytes: 76.3M; nbytes_stored: 3.8M; ratio: 20.3; initialized: 200/200
161162
compressor: Blosc(cname='lz4', clevel=5, shuffle=1)
162163
store: dict
163164
>>> z.append(np.vstack([a, a]), axis=1)
165+
(20000, 2000)
164166
>>> z
165167
Array((20000, 2000), int32, chunks=(1000, 100), order=C)
166168
nbytes: 152.6M; nbytes_stored: 7.5M; ratio: 20.3; initialized: 400/400

zarr/core.py

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,23 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None,
9797
self._is_view = False
9898

9999
# initialize metadata
100-
if self._synchronizer is None:
101-
self._load_metadata()
102-
else:
103-
mkey = self._key_prefix + array_meta_key
104-
with self._synchronizer[mkey]:
105-
self._load_metadata()
100+
self._load_metadata()
106101

107102
# initialize attributes
108103
akey = self._key_prefix + attrs_key
109104
self._attrs = Attributes(store, key=akey, read_only=read_only,
110105
synchronizer=synchronizer)
111106

112107
def _load_metadata(self):
108+
"""(Re)load metadata from store."""
109+
if self._synchronizer is None:
110+
self._load_metadata_nosync()
111+
else:
112+
mkey = self._key_prefix + array_meta_key
113+
with self._synchronizer[mkey]:
114+
self._load_metadata_nosync()
115+
116+
def _load_metadata_nosync(self):
113117
try:
114118
mkey = self._key_prefix + array_meta_key
115119
meta_bytes = self._store[mkey]
@@ -136,10 +140,18 @@ def _load_metadata(self):
136140
# setup filters
137141
filters = meta['filters']
138142
if filters:
139-
filters = [get_codec(f) for f in filters]
143+
filters = [get_codec(config) for config in filters]
140144
self._filters = filters
141145

142-
def _flush_metadata(self):
146+
def _refresh_metadata(self):
147+
if not self._cache_metadata:
148+
self._load_metadata()
149+
150+
def _refresh_metadata_nosync(self):
151+
if not self._cache_metadata:
152+
self._load_metadata_nosync()
153+
154+
def _flush_metadata_nosync(self):
143155
if self._is_view:
144156
raise PermissionError('not permitted for views')
145157

@@ -193,9 +205,7 @@ def chunk_store(self):
193205
def shape(self):
194206
"""A tuple of integers describing the length of each dimension of
195207
the array."""
196-
# refresh metadata because shape can change if array is resized
197-
if not self._cache_metadata:
198-
self._load_metadata()
208+
self._refresh_metadata()
199209
return self._shape
200210

201211
@shape.setter
@@ -206,38 +216,32 @@ def shape(self, value):
206216
def chunks(self):
207217
"""A tuple of integers describing the length of each dimension of a
208218
chunk of the array."""
209-
# should be immutable
210219
return self._chunks
211220

212221
@property
213222
def dtype(self):
214223
"""The NumPy data type."""
215-
# should be immutable
216224
return self._dtype
217225

218226
@property
219227
def compressor(self):
220228
"""Primary compression codec."""
221-
# should be immutable
222229
return self._compressor
223230

224231
@property
225232
def fill_value(self):
226233
"""A value used for uninitialized portions of the array."""
227-
# should be immutable
228234
return self._fill_value
229235

230236
@property
231237
def order(self):
232238
"""A string indicating the order in which bytes are arranged within
233239
chunks of the array."""
234-
# should be immutable
235240
return self._order
236241

237242
@property
238243
def filters(self):
239244
"""One or more codecs used to transform data prior to compression."""
240-
# should be immutable
241245
return self._filters
242246

243247
@property
@@ -256,21 +260,31 @@ def ndim(self):
256260
"""Number of dimensions."""
257261
return len(self.shape)
258262

263+
@property
264+
def _size(self):
265+
return reduce(operator.mul, self._shape)
266+
259267
@property
260268
def size(self):
261269
"""The total number of elements in the array."""
262-
return reduce(operator.mul, self.shape)
270+
self._refresh_metadata()
271+
return self._size
263272

264273
@property
265274
def itemsize(self):
266275
"""The size in bytes of each item in the array."""
267276
return self.dtype.itemsize
268277

278+
@property
279+
def _nbytes(self):
280+
return self._size * self.itemsize
281+
269282
@property
270283
def nbytes(self):
271284
"""The total number of bytes that would be required to store the
272285
array without compression."""
273-
return self.size * self.itemsize
286+
self._refresh_metadata()
287+
return self._nbytes
274288

275289
@property
276290
def nbytes_stored(self):
@@ -287,18 +301,28 @@ def nbytes_stored(self):
287301
else:
288302
return m + n
289303

304+
@property
305+
def _cdata_shape(self):
306+
return tuple(
307+
int(np.ceil(s / c)) for s, c in zip(self._shape, self._chunks)
308+
)
309+
290310
@property
291311
def cdata_shape(self):
292312
"""A tuple of integers describing the number of chunks along each
293313
dimension of the array."""
294-
return tuple(
295-
int(np.ceil(s / c)) for s, c in zip(self.shape, self.chunks)
296-
)
314+
self._refresh_metadata()
315+
return self._cdata_shape
316+
317+
@property
318+
def _nchunks(self):
319+
return reduce(operator.mul, self._cdata_shape)
297320

298321
@property
299322
def nchunks(self):
300323
"""Total number of chunks."""
301-
return reduce(operator.mul, self.cdata_shape)
324+
self._refresh_metadata()
325+
return self._nchunks
302326

303327
@property
304328
def nchunks_initialized(self):
@@ -769,48 +793,45 @@ def _encode_chunk(self, chunk):
769793
return cdata
770794

771795
def __repr__(self):
772-
773-
# refresh metadata
774-
if not self._cache_metadata:
775-
self._load_metadata()
796+
self._refresh_metadata()
776797

777798
# main line
778799
r = '%s(' % type(self).__name__
779800
if self.name:
780801
r += '%s, ' % self.name
781-
r += '%s, ' % str(self.shape)
782-
r += '%s, ' % str(self.dtype)
783-
r += 'chunks=%s, ' % str(self.chunks)
784-
r += 'order=%s' % self.order
802+
r += '%s, ' % str(self._shape)
803+
r += '%s, ' % str(self._dtype)
804+
r += 'chunks=%s, ' % str(self._chunks)
805+
r += 'order=%s' % self._order
785806
r += ')'
786807

787808
# storage size info
788-
r += '\n nbytes: %s' % human_readable_size(self.nbytes)
809+
r += '\n nbytes: %s' % human_readable_size(self._nbytes)
789810
if self.nbytes_stored > 0:
790811
r += '; nbytes_stored: %s' % human_readable_size(
791812
self.nbytes_stored)
792-
r += '; ratio: %.1f' % (self.nbytes / self.nbytes_stored)
813+
r += '; ratio: %.1f' % (self._nbytes / self.nbytes_stored)
793814
r += '; initialized: %s/%s' % (self.nchunks_initialized,
794-
self.nchunks)
815+
self._nchunks)
795816

796817
# filters
797-
if self.filters:
818+
if self._filters:
798819
# first line
799-
r += '\n filters: %r' % self.filters[0]
820+
r += '\n filters: %r' % self._filters[0]
800821
# subsequent lines
801-
for f in self.filters[1:]:
822+
for f in self._filters[1:]:
802823
r += '\n %r' % f
803824

804825
# compressor
805-
if self.compressor:
806-
r += '\n compressor: %r' % self.compressor
826+
if self._compressor:
827+
r += '\n compressor: %r' % self._compressor
807828

808829
# storage and synchronizer classes
809-
r += '\n store: %s' % type(self.store).__name__
810-
if self.store != self.chunk_store:
811-
r += '; chunk_store: %s' % type(self.chunk_store).__name__
812-
if self.synchronizer is not None:
813-
r += '; synchronizer: %s' % type(self.synchronizer).__name__
830+
r += '\n store: %s' % type(self._store).__name__
831+
if self._store != self._chunk_store:
832+
r += '; chunk_store: %s' % type(self._chunk_store).__name__
833+
if self._synchronizer is not None:
834+
r += '; synchronizer: %s' % type(self._synchronizer).__name__
814835

815836
return r
816837

@@ -829,24 +850,14 @@ def _write_op(self, f, *args, **kwargs):
829850

830851
# synchronization
831852
if self._synchronizer is None:
832-
833-
# refresh metadata
834-
if not self._cache_metadata:
835-
self._load_metadata()
836-
853+
self._refresh_metadata_nosync()
837854
return f(*args, **kwargs)
838855

839856
else:
840-
841857
# synchronize on the array
842858
mkey = self._key_prefix + array_meta_key
843-
844859
with self._synchronizer[mkey]:
845-
846-
# refresh metadata
847-
if not self._cache_metadata:
848-
self._load_metadata()
849-
860+
self._refresh_metadata_nosync()
850861
return f(*args, **kwargs)
851862

852863
def resize(self, *args):
@@ -894,7 +905,7 @@ def _resize_nosync(self, *args):
894905

895906
# update metadata
896907
self._shape = new_shape
897-
self._flush_metadata()
908+
self._flush_metadata_nosync()
898909

899910
# determine the new number and arrangement of chunks
900911
chunks = self._chunks
@@ -920,6 +931,10 @@ def append(self, data, axis=0):
920931
axis : int
921932
Axis along which to append.
922933
934+
Returns
935+
-------
936+
new_shape : tuple
937+
923938
Notes
924939
-----
925940
The size of all dimensions other than `axis` must match between this
@@ -937,12 +952,14 @@ def append(self, data, axis=0):
937952
compressor: Blosc(cname='lz4', clevel=5, shuffle=1)
938953
store: dict
939954
>>> z.append(a)
955+
(20000, 1000)
940956
>>> z
941957
Array((20000, 1000), int32, chunks=(1000, 100), order=C)
942958
nbytes: 76.3M; nbytes_stored: 3.8M; ratio: 20.3; initialized: 200/200
943959
compressor: Blosc(cname='lz4', clevel=5, shuffle=1)
944960
store: dict
945961
>>> z.append(np.vstack([a, a]), axis=1)
962+
(20000, 2000)
946963
>>> z
947964
Array((20000, 2000), int32, chunks=(1000, 100), order=C)
948965
nbytes: 152.6M; nbytes_stored: 7.5M; ratio: 20.3; initialized: 400/400

zarr/tests/test_sync.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ def test_parallel_setitem(self):
7878

7979
# parallel setitem
8080
future = pool.map_async(_set_arange, zip([arr] * n, range(n)))
81-
results = future.get(60)
81+
results = sorted(future.get(60))
8282
print(results)
83-
eq(list(range(n)), sorted(results))
83+
eq(list(range(n)), results)
8484
assert_array_equal(np.arange(n * 1000), arr[:])
8585

8686
def test_parallel_append(self):
@@ -93,9 +93,9 @@ def test_parallel_append(self):
9393

9494
# parallel append
9595
future = pool.map_async(_append, zip([arr] * n, range(n)))
96-
results = future.get(60)
96+
results = sorted(future.get(60))
9797
print(results)
98-
eq([((i+2)*1000,) for i in range(n)], sorted(results))
98+
eq([((i+2)*1000,) for i in range(n)], results)
9999
eq(((n+1)*1000,), arr.shape)
100100

101101

0 commit comments

Comments
 (0)