Skip to content

Commit 499f350

Browse files
committed
Merge branch 'issue325-udp-zk-reuse'
2 parents 510b41f + 0a7c637 commit 499f350

File tree

5 files changed

+105
-28
lines changed

5 files changed

+105
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ without compromising stable operations.
2828
- Calrissian integration: avoid unnecessary pulls of `alpine` image ([#1132]https://github.com/Open-EO/openeo-geopyspark-driver/issues/1132)
2929
- Calrissian integration: refactor config to a `CalrissianConfig` sub-config ([#1009](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1009))
3030
- `save_result`: allow non-string values in `GTiff` `file_metadata` ([#1142](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1142))
31+
- Add `udp_registry_zookeeper_client_reuse` config for `KazooClient` reuse in `ZooKeeperUserDefinedProcessRepository` ([#1037](https://github.com/Open-EO/openeo-geopyspark-driver/pull/1037))
3132

3233

3334
## 0.64.1

openeogeotrellis/backend.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,14 @@ def __init__(
337337
else ZooKeeperServiceRegistry()
338338
)
339339

340-
user_defined_processes = (
341-
# TODO #283 #285: eliminate is_ci_context, use some kind of config structure
342-
InMemoryUserDefinedProcessRepository() if not use_zookeeper or ConfigParams().is_ci_context
343-
else ZooKeeperUserDefinedProcessRepository(hosts=ConfigParams().zookeepernodes)
344-
)
340+
# TODO #283 #285: eliminate is_ci_context, use some kind of config structure
341+
if not use_zookeeper or ConfigParams().is_ci_context:
342+
user_defined_processes = InMemoryUserDefinedProcessRepository()
343+
else:
344+
user_defined_processes = ZooKeeperUserDefinedProcessRepository(
345+
hosts=ConfigParams().zookeepernodes,
346+
zk_client_reuse=get_backend_config().udp_registry_zookeeper_client_reuse,
347+
)
345348

346349
requests_session = requests_with_retry(total=3, backoff_factor=2)
347350
vault = Vault(ConfigParams().vault_addr, requests_session)

openeogeotrellis/config/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ class GpsBackendConfig(OpenEoBackendConfig):
117117
use_zk_job_registry: bool = True
118118
zk_job_registry_max_specification_size: Optional[int] = None
119119

120+
udp_registry_zookeeper_client_reuse: bool = False
121+
120122
ejr_api: Optional[str] = os.environ.get("OPENEO_EJR_API")
121123
ejr_backend_id: str = "unknown"
122124
ejr_credentials_vault_path: Optional[str] = os.environ.get("OPENEO_EJR_CREDENTIALS_VAULT_PATH")

openeogeotrellis/user_defined_process_repository.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
import contextlib
22
import json
33
import logging
4-
from typing import List, Dict
5-
from typing import Union
4+
from typing import Dict, List, Union
65

76
from kazoo.client import KazooClient
87
from kazoo.exceptions import NodeExistsError, NoNodeError
98
from kazoo.handlers.threading import KazooTimeoutError
109
from kazoo.retry import KazooRetry
11-
12-
from openeo_driver.backend import UserDefinedProcessMetadata, UserDefinedProcesses
10+
from openeo_driver.backend import UserDefinedProcesses, UserDefinedProcessMetadata
1311
from openeo_driver.errors import ProcessGraphNotFoundException
12+
1413
from openeogeotrellis.configparams import ConfigParams
1514

1615

@@ -20,19 +19,19 @@ class ZooKeeperUserDefinedProcessRepository(UserDefinedProcesses):
2019

2120
_log = logging.getLogger(__name__)
2221

23-
def __init__(self, hosts: List[str], root: str = "/openeo/udps"):
24-
self._hosts = ','.join(hosts)
22+
def __init__(self, hosts: List[str], root: str = "/openeo/udps", *, zk_client_reuse: bool = False):
23+
self._hosts = ",".join(hosts)
2524
self._root = root
25+
self._zk_client_reuse = zk_client_reuse
26+
self._zk_client_cache = None
2627

2728
@staticmethod
2829
def _serialize(spec: dict) -> bytes:
29-
return json.dumps({
30-
'specification': spec
31-
}).encode()
30+
return json.dumps({"specification": spec}).encode("utf8")
3231

3332
@staticmethod
3433
def _deserialize(data: bytes) -> dict:
35-
return json.loads(data.decode())
34+
return json.loads(data.decode("utf8"))
3635

3736
def save(self, user_id: str, process_id: str, spec: dict) -> None:
3837
with self._zk_client() as zk:
@@ -82,16 +81,33 @@ def delete(self, user_id: str, process_id: str) -> None:
8281

8382
@contextlib.contextmanager
8483
def _zk_client(self):
85-
kz_retry = KazooRetry(max_tries=10, delay=0.5, backoff=2)
86-
zk = KazooClient(hosts=self._hosts,connection_retry=kz_retry,
87-
command_retry=kz_retry, timeout=3.0)
88-
zk.start(timeout=15.0)
84+
create_new_client = (not self._zk_client_reuse) or (self._zk_client_cache is None)
85+
if create_new_client:
86+
kz_retry = KazooRetry(max_tries=10, delay=0.5, backoff=2)
87+
zk = KazooClient(
88+
hosts=self._hosts,
89+
connection_retry=kz_retry,
90+
command_retry=kz_retry,
91+
timeout=3.0,
92+
)
93+
zk.start(timeout=15.0)
94+
if self._zk_client_reuse:
95+
self._zk_client_cache = zk
96+
else:
97+
zk = self._zk_client_cache
8998

9099
try:
91100
yield zk
92101
finally:
93-
zk.stop()
94-
zk.close()
102+
if not self._zk_client_reuse:
103+
zk.stop()
104+
zk.close()
105+
106+
def __del__(self):
107+
if self._zk_client_cache:
108+
self._zk_client_cache.stop()
109+
self._zk_client_cache.close()
110+
self._zk_client_cache = None
95111

96112

97113
class InMemoryUserDefinedProcessRepository(UserDefinedProcesses):
@@ -122,7 +138,7 @@ def delete(self, user_id: str, process_id: str) -> None:
122138

123139

124140
def main():
125-
repo = ZooKeeperUserDefinedProcessRepository(hosts=ConfigParams().zookeepernodes)
141+
repo = ZooKeeperUserDefinedProcessRepository(hosts=ConfigParams().zookeepernodes, zk_client_reuse=True)
126142

127143
user_id = 'vdboschj'
128144
process_graph_id = 'evi'

tests/test_user_defined_process_repository.py

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import collections
22
import json
3+
from typing import Iterator
34
from unittest import mock
45

56
import pytest
@@ -16,7 +17,6 @@
1617
"add": {"process_id": "add", "arguments": {"x": 2, "y": 3}, "result": True}
1718
}
1819

19-
PG_2PLUS3_JSON = json.dumps(PG_2PLUS3)
2020

2121

2222
class TestInMemoryUserDefinedProcessRepository:
@@ -73,10 +73,28 @@ def udp_db(self) -> ZooKeeperUserDefinedProcessRepository:
7373
return udp_db
7474

7575
@pytest.fixture
76-
def zk(self) -> mock.MagicMock:
76+
def kazoo_client_factory(self) -> Iterator[mock.MagicMock]:
77+
"""
78+
Inspectable mock for `KazooClient` used by `ZooKeeperUserDefinedProcessRepository` to
79+
inspect the calls made to it (e.g. check client reuse).
80+
"""
81+
with mock.patch("openeogeotrellis.user_defined_process_repository.KazooClient") as KazooClient:
82+
yield KazooClient
83+
84+
@pytest.fixture
85+
def zk(self, kazoo_client_factory) -> mock.MagicMock:
7786
"""Fixture for `KazooClient instance used by ZooKeeperUserDefinedProcessRepository """
78-
with mock.patch('openeogeotrellis.user_defined_process_repository.KazooClient') as KazooClient:
79-
yield KazooClient()
87+
return kazoo_client_factory()
88+
89+
def _build_get_return_value(self, *, process_id="evi", pg=PG_2PLUS3):
90+
"""Helper to build a return value for a mocked `zk.get`"""
91+
data = ZooKeeperUserDefinedProcessRepository._serialize(
92+
spec={
93+
"id": process_id,
94+
"process_graph": pg,
95+
}
96+
)
97+
return (data, self.ZnodeStat(version=123))
8098

8199
def test_save_create(self, udp_db, zk):
82100
udp_db.save(user_id="john", process_id="evi", spec={"id": "evi", "process_graph": PG_2PLUS3})
@@ -95,8 +113,7 @@ def test_get_miss(self, udp_db, zk):
95113
assert res is None
96114

97115
def test_get(self, udp_db, zk):
98-
data = ('{"specification": {"id": "evi", "process_graph": %s}}' % PG_2PLUS3_JSON).encode("utf-8")
99-
zk.get.return_value = (data, self.ZnodeStat(version=123))
116+
zk.get.return_value = self._build_get_return_value(process_id="evi", pg=PG_2PLUS3)
100117
res = udp_db.get(user_id="john", process_id="evi")
101118
assert res == UserDefinedProcessMetadata(process_graph=PG_2PLUS3, id="evi")
102119

@@ -114,3 +131,41 @@ def test_delete_miss(self, udp_db, zk):
114131
zk.delete.side_effect = NoNodeError()
115132
with pytest.raises(ProcessGraphNotFoundException):
116133
udp_db.delete(user_id="john", process_id="evi1")
134+
135+
@pytest.mark.parametrize(
136+
["zk_client_reuse", "expected_create_call_counts", "expected_shutdown_call_counts"],
137+
[
138+
(False, [0, 1, 2, 3], [3, 3]),
139+
(True, [0, 1, 1, 1], [0, 1]),
140+
],
141+
)
142+
def test_client_reuse(
143+
self, kazoo_client_factory, zk_client_reuse, expected_create_call_counts, expected_shutdown_call_counts
144+
):
145+
udp_db = ZooKeeperUserDefinedProcessRepository(hosts=["zk1.test", "zk2.test"], zk_client_reuse=zk_client_reuse)
146+
147+
# Check reuse
148+
create_call_counts = []
149+
create_call_counts.append(kazoo_client_factory.call_count)
150+
udp_db.save(user_id="john", process_id="aaa", spec={"id": "aaa", "process_graph": PG_2PLUS3})
151+
create_call_counts.append(kazoo_client_factory.call_count)
152+
udp_db.save(user_id="john", process_id="bbb", spec={"id": "bbb", "process_graph": PG_2PLUS3})
153+
create_call_counts.append(kazoo_client_factory.call_count)
154+
155+
kazoo_client_factory.return_value.get.return_value = self._build_get_return_value(process_id="ccc")
156+
_ = udp_db.get(user_id="john", process_id="ccc")
157+
create_call_counts.append(kazoo_client_factory.call_count)
158+
159+
assert create_call_counts == expected_create_call_counts
160+
161+
# Check stop/close operations on `del`
162+
stop_call_counts = []
163+
close_call_counts = []
164+
stop_call_counts.append(kazoo_client_factory.return_value.stop.call_count)
165+
close_call_counts.append(kazoo_client_factory.return_value.close.call_count)
166+
del udp_db
167+
stop_call_counts.append(kazoo_client_factory.return_value.stop.call_count)
168+
close_call_counts.append(kazoo_client_factory.return_value.close.call_count)
169+
170+
assert stop_call_counts == expected_shutdown_call_counts
171+
assert close_call_counts == expected_shutdown_call_counts

0 commit comments

Comments
 (0)