Skip to content

Commit 6b8f1aa

Browse files
committed
Cluster API: Naming things and refactoring
1 parent b07278b commit 6b8f1aa

File tree

13 files changed

+259
-246
lines changed

13 files changed

+259
-246
lines changed

cratedb_toolkit/cli.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from .docs.cli import cli as docs_cli
1111
from .info.cli import cli as info_cli
1212
from .io.cli import cli as io_cli
13-
from .job.cli import cli_list_jobs
1413
from .query.cli import cli as query_cli
1514
from .settings.cli import cli as settings_cli
1615
from .shell.cli import cli as shell_cli
@@ -37,4 +36,3 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
3736
cli.add_command(shell_cli, name="shell")
3837
cli.add_command(settings_cli, name="settings")
3938
cli.add_command(tail_cli, name="tail")
40-
cli.add_command(cli_list_jobs)

cratedb_toolkit/cluster/cli.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import contextlib
2+
import json
23
import logging
4+
import sys
35

46
import click
7+
import yaml
58
from click import ClickException
69
from click_aliases import ClickAliasedGroup
710

@@ -45,6 +48,15 @@ def help_stop():
4548
"""
4649

4750

51+
def help_list_jobs():
52+
"""
53+
List jobs on cluster.
54+
55+
ctk cluster list-jobs
56+
croud clusters import-jobs
57+
"""
58+
59+
4860
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
4961
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
5062
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@@ -106,6 +118,39 @@ def stop(ctx: click.Context, cluster_id: str, cluster_name: str):
106118
jd(cluster.info.asdict())
107119

108120

121+
@make_command(cli, name="list-jobs", help=help_list_jobs)
122+
@option_cluster_id
123+
@option_cluster_name
124+
@click.option(
125+
"--format",
126+
"output_format",
127+
type=click.Choice(["json", "yaml"], case_sensitive=False),
128+
default="json",
129+
required=False,
130+
help="The output format for the result of the operation",
131+
)
132+
@click.pass_context
133+
def list_jobs(ctx: click.Context, cluster_id: str, cluster_name: str, output_format: str):
134+
"""
135+
List jobs on cluster.
136+
"""
137+
138+
with handle_command_errors("list cluster jobs"):
139+
# Acquire the database cluster handle and acquire job information.
140+
cluster = ManagedCluster(cluster_id=cluster_id, cluster_name=cluster_name).probe()
141+
if not cluster.operation:
142+
raise CroudException("Cluster does not support job operations")
143+
data = cluster.operation.list_jobs()
144+
145+
# Display job information.
146+
if output_format == "json":
147+
print(json.dumps(data, indent=2), file=sys.stdout) # noqa: T201
148+
elif output_format == "yaml":
149+
print(yaml.dump(data), file=sys.stdout) # noqa: T201
150+
else:
151+
raise ValueError(f"Unknown output format: {output_format}")
152+
153+
109154
@contextlib.contextmanager
110155
def handle_command_errors(operation_name):
111156
"""Handle common command errors and exit with appropriate error messages."""

cratedb_toolkit/cluster/core.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import click
1111
from boltons.urlutils import URL
1212

13-
from cratedb_toolkit.cluster.croud import CloudManager
13+
from cratedb_toolkit.cluster.croud import CloudClusterServices, CloudRootServices
1414
from cratedb_toolkit.cluster.guide import DataImportGuide
1515
from cratedb_toolkit.cluster.model import ClientBundle, ClusterBase, ClusterInformation
1616
from cratedb_toolkit.config import CONFIG
@@ -132,7 +132,8 @@ def __init__(
132132
"Failed to address cluster: Either cluster identifier or name needs to be specified"
133133
)
134134

135-
self.cm = CloudManager()
135+
self.root = CloudRootServices()
136+
self.operation: t.Optional[CloudClusterServices] = None
136137
self._jwt_ctx: t.ContextManager = nullcontext()
137138
self._client_bundle: t.Optional[ClientBundle] = None
138139

@@ -148,15 +149,18 @@ def __exit__(self, exc_type, exc_val, exc_tb):
148149

149150
try:
150151
self.close_connections()
151-
if self.stop_on_exit:
152-
self.stop()
153-
logger.info(f"Successfully stopped cluster: id={self.cluster_id}, name={self.cluster_name}")
154152
except Exception as ex:
155-
logger.error(f"Failed to stop cluster: {ex}")
156-
# Don't swallow the original exception
157-
return False
153+
logger.error(f"Failed to close connections: {ex}")
154+
finally:
155+
if self.stop_on_exit:
156+
try:
157+
self.stop()
158+
logger.info(f"Successfully stopped cluster: id={self.cluster_id}, name={self.cluster_name}")
159+
except Exception as ex:
160+
logger.error(f"Failed to stop cluster: {ex}")
158161

159-
return False # Don't suppress any exceptions
162+
# Don't suppress the original exception.
163+
return False
160164

161165
@classmethod
162166
@flexfun(domain="settings")
@@ -202,13 +206,19 @@ def probe(self) -> "ManagedCluster":
202206
Probe a CrateDB Cloud cluster, API-wise.
203207
204208
TODO: Investigate callers, and reduce number of invocations.
209+
TODO: self._jwt_ctx is created once in probe() and reused for every query.
210+
If the per-cluster JWT expires (default 1 h) subsequent queries will fail.
211+
Consider refreshing self._jwt_ctx (or simply calling jwt_token_patch()
212+
inside query()) when probe() is older than e.g. 30 min.
205213
"""
206214
try:
207215
self.info = ClusterInformation.from_id_or_name(cluster_id=self.cluster_id, cluster_name=self.cluster_name)
208216
self.cluster_id = self.info.cloud["id"]
209217
self.cluster_name = self.info.cloud["name"]
210218
self.address = DatabaseAddress.from_httpuri(self.info.cloud["url"])
211219
self._jwt_ctx = jwt_token_patch(self.info.jwt.token)
220+
if self.cluster_id:
221+
self.operation = CloudClusterServices(cluster_id=self.cluster_id)
212222

213223
except (CroudException, DatabaseAddressMissingError) as ex:
214224
self.exists = False
@@ -269,14 +279,13 @@ def deploy(self) -> "ManagedCluster":
269279
270280
Command: ctk cluster start
271281
"""
272-
# FIXME: Accept id or name.
273282
if self.cluster_name is None:
274283
raise DatabaseAddressMissingError("Need cluster name to deploy")
275284

276285
# Find the existing project by name (equals cluster name).
277286
project_id = None
278287
try:
279-
projects = self.cm.list_projects()
288+
projects = self.root.list_projects()
280289
for project in projects:
281290
if project["name"] == self.cluster_name:
282291
project_id = project["id"]
@@ -287,11 +296,11 @@ def deploy(self) -> "ManagedCluster":
287296

288297
# Create a new project if none exists.
289298
if not project_id:
290-
project = self.cm.create_project(name=self.cluster_name, organization_id=self.settings.organization_id)
299+
project = self.root.create_project(name=self.cluster_name, organization_id=self.settings.organization_id)
291300
project_id = project["id"]
292301
logger.info(f"Created project: {project_id}")
293302

294-
cluster_info = self.cm.deploy_cluster(
303+
cluster_info = self.root.deploy_cluster(
295304
name=self.cluster_name, project_id=project_id, subscription_id=self.settings.subscription_id
296305
)
297306

@@ -309,7 +318,8 @@ def resume(self) -> "ManagedCluster":
309318
if self.cluster_id is None:
310319
raise DatabaseAddressMissingError("Need cluster identifier to resume cluster")
311320
logger.info(f"Resuming CrateDB Cloud Cluster: id={self.cluster_id}, name={self.cluster_name}")
312-
self.cm.resume_cluster(identifier=self.cluster_id)
321+
if self.operation:
322+
self.operation.resume()
313323
self.probe()
314324
return self
315325

@@ -322,7 +332,8 @@ def stop(self) -> "ManagedCluster":
322332
if self.cluster_id is None:
323333
raise DatabaseAddressMissingError("Need cluster identifier to stop cluster")
324334
logger.info(f"Stopping CrateDB Cloud Cluster: id={self.cluster_id}, name={self.cluster_name}")
325-
self.cm.suspend_cluster(identifier=self.cluster_id)
335+
if self.operation:
336+
self.operation.suspend()
326337
self.probe()
327338
return self
328339

@@ -348,7 +359,6 @@ def load_table(
348359
self.probe()
349360
target = target or TableAddress()
350361

351-
# FIXME: Accept id or name.
352362
if self.cluster_id is None:
353363
raise DatabaseAddressMissingError("Need cluster identifier to load table")
354364

0 commit comments

Comments
 (0)