Skip to content

Commit 272a492

Browse files
committed
fix nasty process sync bug
1 parent 3e63adf commit 272a492

File tree

4 files changed

+100
-58
lines changed

4 files changed

+100
-58
lines changed

zarr/core.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def _refresh_metadata(self):
148148
self._load_metadata()
149149

150150
def _refresh_metadata_nosync(self):
151-
if not self._cache_metadata:
151+
if not self._cache_metadata and not self._is_view:
152152
self._load_metadata_nosync()
153153

154154
def _flush_metadata_nosync(self):
@@ -205,6 +205,8 @@ def chunk_store(self):
205205
def shape(self):
206206
"""A tuple of integers describing the length of each dimension of
207207
the array."""
208+
# N.B., shape may change if array is resized, hence need to refresh
209+
# metadata
208210
self._refresh_metadata()
209211
return self._shape
210212

@@ -267,6 +269,8 @@ def _size(self):
267269
@property
268270
def size(self):
269271
"""The total number of elements in the array."""
272+
# N.B., this property depends on shape, and shape may change if array
273+
# is resized, hence need to refresh metadata
270274
self._refresh_metadata()
271275
return self._size
272276

@@ -283,6 +287,8 @@ def _nbytes(self):
283287
def nbytes(self):
284288
"""The total number of bytes that would be required to store the
285289
array without compression."""
290+
# N.B., this property depends on shape, and shape may change if array
291+
# is resized, hence need to refresh metadata
286292
self._refresh_metadata()
287293
return self._nbytes
288294

@@ -559,7 +565,7 @@ def __setitem__(self, key, value):
559565

560566
# refresh metadata
561567
if not self._cache_metadata:
562-
self._load_metadata()
568+
self._load_metadata_nosync()
563569

564570
# normalize selection
565571
selection = normalize_array_selection(key, self._shape)
@@ -793,7 +799,11 @@ def _encode_chunk(self, chunk):
793799
return cdata
794800

795801
def __repr__(self):
796-
self._refresh_metadata()
802+
# N.B., __repr__ needs to be synchronized to ensure consistent view
803+
# of metadata AND when retrieving nbytes_stored from filesystem storage
804+
return self._synchronized_op(self._repr_nosync)
805+
806+
def _repr_nosync(self):
797807

798808
# main line
799809
r = '%s(' % type(self).__name__
@@ -842,13 +852,9 @@ def __getstate__(self):
842852
def __setstate__(self, state):
843853
self.__init__(*state)
844854

845-
def _write_op(self, f, *args, **kwargs):
846-
847-
# guard condition
848-
if self._read_only:
849-
err_read_only()
855+
def _synchronized_op(self, f, *args, **kwargs):
850856

851-
# synchronization
857+
# no synchronization
852858
if self._synchronizer is None:
853859
self._refresh_metadata_nosync()
854860
return f(*args, **kwargs)
@@ -858,7 +864,16 @@ def _write_op(self, f, *args, **kwargs):
858864
mkey = self._key_prefix + array_meta_key
859865
with self._synchronizer[mkey]:
860866
self._refresh_metadata_nosync()
861-
return f(*args, **kwargs)
867+
result = f(*args, **kwargs)
868+
return result
869+
870+
def _write_op(self, f, *args, **kwargs):
871+
872+
# guard condition
873+
if self._read_only:
874+
err_read_only()
875+
876+
return self._synchronized_op(f, *args, **kwargs)
862877

863878
def resize(self, *args):
864879
"""Change the shape of the array by growing or shrinking one or more
@@ -915,11 +930,15 @@ def _resize_nosync(self, *args):
915930
# remove any chunks not within range
916931
for key in listdir(self._chunk_store, self._path):
917932
if key not in [array_meta_key, attrs_key]:
918-
cidx = map(int, key.split('.'))
919-
if all(i < c for i, c in zip(cidx, new_cdata_shape)):
920-
pass # keep the chunk
933+
try:
934+
cidx = list(map(int, key.split('.')))
935+
except ValueError as e:
936+
raise RuntimeError('unexpected key: %r' % key)
921937
else:
922-
del self._chunk_store[self._key_prefix + key]
938+
if all(i < c for i, c in zip(cidx, new_cdata_shape)):
939+
pass # keep the chunk
940+
else:
941+
del self._chunk_store[self._key_prefix + key]
923942

924943
def append(self, data, axis=0):
925944
"""Append `data` to `axis`.

zarr/storage.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
from zarr.meta import encode_array_metadata, encode_group_metadata
1818
from zarr.compat import PY2, binary_type
1919
from zarr.codecs import codec_registry
20-
from zarr.errors import PermissionError, err_contains_group, \
21-
err_contains_array, err_path_not_found, err_bad_compressor, \
22-
err_fspath_exists_notdir, err_read_only
20+
from zarr.errors import err_contains_group, err_contains_array, \
21+
err_path_not_found, err_bad_compressor, err_fspath_exists_notdir, \
22+
err_read_only
2323

2424

2525
array_meta_key = '.zarray'

zarr/sync.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import os
66

77

8+
import fasteners
9+
10+
811
class ThreadSynchronizer(object):
912
"""Provides synchronization using thread locks."""
1013

@@ -41,10 +44,8 @@ def __init__(self, path):
4144
self.path = path
4245

4346
def __getitem__(self, item):
44-
import fasteners
45-
lock = fasteners.InterProcessLock(
46-
os.path.join(self.path, item)
47-
)
47+
path = os.path.join(self.path, item)
48+
lock = fasteners.InterProcessLock(path)
4849
return lock
4950

5051
# pickling and unpickling should be handled automatically

zarr/tests/test_sync.py

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import atexit
55
import json
66
import shutil
7-
import os
8-
from multiprocessing.pool import ThreadPool, Pool as ProcessPool
7+
from multiprocessing.pool import ThreadPool
8+
from multiprocessing import Pool as ProcessPool
99
from multiprocessing import cpu_count
1010
import tempfile
11+
import traceback
12+
import sys
1113

1214

1315
import numpy as np
@@ -50,20 +52,28 @@ def init_attributes(self, store, read_only=False):
5052

5153

5254
def _append(arg):
53-
z, i = arg
54-
import numpy as np
55-
x = np.empty(1000, dtype='i4')
56-
x[:] = i
57-
shape = z.append(x)
58-
return shape
55+
try:
56+
z, i = arg
57+
import numpy as np
58+
x = np.empty(1000, dtype='i4')
59+
x[:] = i
60+
shape = z.append(x)
61+
return shape
62+
except Exception as e:
63+
traceback.print_exc(file=sys.stderr)
64+
raise
5965

6066

6167
def _set_arange(arg):
62-
z, i = arg
63-
import numpy as np
64-
x = np.arange(i*1000, (i*1000)+1000, 1)
65-
z[i*1000:(i*1000)+1000] = x
66-
return i
68+
try:
69+
z, i = arg
70+
import numpy as np
71+
x = np.arange(i*1000, (i*1000)+1000, 1)
72+
z[i*1000:(i*1000)+1000] = x
73+
return i
74+
except Exception as e:
75+
traceback.print_exc(file=sys.stderr)
76+
raise
6777

6878

6979
class MixinArraySyncTests(object):
@@ -77,15 +87,15 @@ def test_parallel_setitem(self):
7787
pool = self.create_pool()
7888

7989
# parallel setitem
80-
future = pool.map_async(_set_arange, zip([arr] * n, range(n)))
81-
results = sorted(future.get(60))
82-
pool.close()
83-
pool.terminate()
90+
results = pool.map(_set_arange, zip([arr] * n, range(n)), chunksize=1)
91+
results = sorted(results)
8492

8593
print(results)
8694
eq(list(range(n)), results)
8795
assert_array_equal(np.arange(n * 1000), arr[:])
8896

97+
pool.terminate()
98+
8999
def test_parallel_append(self):
90100
n = 100
91101

@@ -95,15 +105,15 @@ def test_parallel_append(self):
95105
pool = self.create_pool()
96106

97107
# parallel append
98-
future = pool.map_async(_append, zip([arr] * n, range(n)))
99-
results = sorted(future.get(60))
100-
pool.close()
101-
pool.terminate()
108+
results = pool.map(_append, zip([arr] * n, range(n)), chunksize=1)
109+
results = sorted(results)
102110

103111
print(results)
104112
eq([((i+2)*1000,) for i in range(n)], results)
105113
eq(((n+1)*1000,), arr.shape)
106114

115+
pool.terminate()
116+
107117

108118
class TestArrayWithThreadSynchronizer(TestArray, MixinArraySyncTests):
109119

@@ -136,12 +146,13 @@ def create_pool(self):
136146
class TestArrayWithProcessSynchronizer(TestArray, MixinArraySyncTests):
137147

138148
def create_array(self, read_only=False, **kwargs):
139-
path = 'test_sync'
140-
if os.path.exists(path):
141-
shutil.rmtree(path)
149+
path = tempfile.mkdtemp()
150+
atexit.register(atexit_rmtree, path)
142151
store = DirectoryStore(path)
143152
init_array(store, **kwargs)
144-
synchronizer = ProcessSynchronizer('test_sync_locks')
153+
sync_path = tempfile.mkdtemp()
154+
atexit.register(atexit_rmtree, sync_path)
155+
synchronizer = ProcessSynchronizer(sync_path)
145156
return Array(store, synchronizer=synchronizer,
146157
read_only=read_only, cache_metadata=False)
147158

@@ -161,7 +172,7 @@ def test_repr(self):
161172
eq(l1, l2)
162173

163174
def create_pool(self):
164-
pool = ProcessPool(cpu_count())
175+
pool = ProcessPool(processes=cpu_count())
165176
return pool
166177

167178

@@ -187,15 +198,20 @@ def test_parallel_create_group(self):
187198

188199
# parallel create group
189200
n = 100
190-
future = pool.map_async(
191-
_create_group, zip([g] * n, [str(i) for i in range(n)]))
192-
results = sorted(future.get(60))
201+
results = pool.map(
202+
_create_group,
203+
zip([g] * n, [str(i) for i in range(n)]),
204+
chunksize=1
205+
)
206+
results = sorted(results)
193207
pool.close()
194208
pool.terminate()
195209

196210
print(results)
197211
eq(n, len(g))
198212

213+
pool.terminate()
214+
199215
def test_parallel_require_group(self):
200216

201217
# setup
@@ -204,15 +220,20 @@ def test_parallel_require_group(self):
204220

205221
# parallel require group
206222
n = 100
207-
future = pool.map_async(
208-
_require_group, zip([g] * n, [str(i//10) for i in range(n)]))
209-
results = sorted(future.get(60))
223+
results = pool.map(
224+
_require_group,
225+
zip([g] * n, [str(i//10) for i in range(n)]),
226+
chunksize=1
227+
)
228+
results = sorted(results)
210229
pool.close()
211230
pool.terminate()
212231

213232
print(results)
214233
eq(n//10, len(g))
215234

235+
pool.terminate()
236+
216237

217238
class TestGroupWithThreadSynchronizer(TestGroup, MixinGroupSyncTests):
218239

@@ -247,9 +268,8 @@ def test_synchronizer_property(self):
247268
class TestGroupWithProcessSynchronizer(TestGroup, MixinGroupSyncTests):
248269

249270
def create_store(self):
250-
path = 'test_sync'
251-
if os.path.exists(path):
252-
shutil.rmtree(path)
271+
path = tempfile.mkdtemp()
272+
atexit.register(atexit_rmtree, path)
253273
store = DirectoryStore(path)
254274
return store, None
255275

@@ -258,13 +278,15 @@ def create_group(self, store=None, path=None, read_only=False,
258278
if store is None:
259279
store, chunk_store = self.create_store()
260280
init_group(store, path=path, chunk_store=chunk_store)
261-
synchronizer = ProcessSynchronizer('test_sync_locks')
281+
sync_path = tempfile.mkdtemp()
282+
atexit.register(atexit_rmtree, sync_path)
283+
synchronizer = ProcessSynchronizer(sync_path)
262284
g = Group(store, path=path, read_only=read_only,
263285
synchronizer=synchronizer, chunk_store=chunk_store)
264286
return g
265287

266288
def create_pool(self):
267-
pool = ProcessPool(cpu_count())
289+
pool = ProcessPool(processes=cpu_count())
268290
return pool
269291

270292
def test_group_repr(self):

0 commit comments

Comments
 (0)