Skip to content

Commit 90303c1

Browse files
committed
Issue #115 fix ConnectionClosedError issue
1 parent 8b08a3f commit 90303c1

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

src/openeo_aggregator/partitionedjobs/zookeeper.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,16 @@ def obtain_new_pjob_id(self, user_id: str, initial_value: bytes = b"", attempts:
9595
"""Obtain new, unique partitioned job id"""
9696
# A couple of pjob_id attempts: start with current time based name and a suffix to counter collisions (if any)
9797
base_pjob_id = "pj-" + Clock.utcnow().strftime("%Y%m%d-%H%M%S")
98-
for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, attempts)]:
99-
try:
100-
self._client.create(path=self._path(user_id, pjob_id), value=initial_value, makepath=True)
101-
# We obtained our unique id
102-
return pjob_id
103-
except NodeExistsError:
104-
# TODO: check that NodeExistsError is thrown on existing job_ids
105-
# TODO: add a sleep() to back off a bit?
106-
continue
98+
with self._connect():
99+
for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, attempts)]:
100+
try:
101+
self._client.create(path=self._path(user_id, pjob_id), value=initial_value, makepath=True)
102+
# We obtained our unique id
103+
return pjob_id
104+
except NodeExistsError:
105+
# TODO: check that NodeExistsError is thrown on existing job_ids
106+
# TODO: add a sleep() to back off a bit?
107+
continue
107108
raise PartitionedJobFailure("Too much attempts to create new pjob_id")
108109

109110
def insert(self, user_id: str, pjob: PartitionedJob) -> str:
@@ -147,12 +148,13 @@ def insert_sjob(
147148
title: Optional[str] = None,
148149
status: str = STATUS_INSERTED,
149150
):
150-
self._client.create(
151-
path=self._path(user_id, pjob_id, "sjobs", sjob_id),
152-
value=self.serialize(process_graph=subjob.process_graph, backend_id=subjob.backend_id, title=title),
153-
makepath=True,
154-
)
155-
self.set_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=status, create=True)
151+
with self._connect():
152+
self._client.create(
153+
path=self._path(user_id, pjob_id, "sjobs", sjob_id),
154+
value=self.serialize(process_graph=subjob.process_graph, backend_id=subjob.backend_id, title=title),
155+
makepath=True,
156+
)
157+
self.set_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=status, create=True)
156158

157159
def get_pjob_metadata(self, user_id: str, pjob_id: str) -> dict:
158160
"""Get metadata of partitioned job, given by storage id."""

src/openeo_aggregator/testing.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ def stop(self):
3232
assert self.state == "open"
3333
self.state = "closed"
3434

35+
def _assert_open(self):
36+
if not self.state == "open":
37+
raise kazoo.exceptions.ConnectionClosedError("Connection has been closed")
38+
3539
def create(self, path: str, value, makepath: bool = False):
40+
self._assert_open()
3641
if path in self.data:
3742
raise kazoo.exceptions.NodeExistsError()
3843
parent = str(pathlib.Path(path).parent)
@@ -44,20 +49,24 @@ def create(self, path: str, value, makepath: bool = False):
4449
self.data[path] = value
4550

4651
def exists(self, path):
52+
self._assert_open()
4753
return path in self.data
4854

4955
def get(self, path):
56+
self._assert_open()
5057
if path not in self.data:
5158
raise kazoo.exceptions.NoNodeError()
5259
return (self.data[path], None)
5360

5461
def get_children(self, path):
62+
self._assert_open()
5563
if path not in self.data:
5664
raise kazoo.exceptions.NoNodeError()
5765
parent = path.split("/")
5866
return [p.split("/")[-1] for p in self.data if p.split("/")[:-1] == parent]
5967

6068
def set(self, path, value, version=-1):
69+
self._assert_open()
6170
if path not in self.data:
6271
raise kazoo.exceptions.NoNodeError()
6372
self.data[path] = value

0 commit comments

Comments
 (0)