Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions torchx/cli/cmd_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import argparse
import logging

from torchx.cli.cmd_base import SubCommand
from torchx.runner import get_runner

logger: logging.Logger = logging.getLogger(__name__)


class CmdDelete(SubCommand):
def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
subparser.add_argument(
"app_handle",
type=str,
help="torchx app handle (e.g. local://session-name/app-id)",
)

def run(self, args: argparse.Namespace) -> None:
app_handle = args.app_handle
runner = get_runner()
runner.delete(app_handle)
2 changes: 2 additions & 0 deletions torchx/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from torchx.cli.cmd_base import SubCommand
from torchx.cli.cmd_cancel import CmdCancel
from torchx.cli.cmd_configure import CmdConfigure
from torchx.cli.cmd_delete import CmdDelete
from torchx.cli.cmd_describe import CmdDescribe
from torchx.cli.cmd_list import CmdList
from torchx.cli.cmd_log import CmdLog
Expand All @@ -37,6 +38,7 @@ def get_default_sub_cmds() -> Dict[str, SubCommand]:
"builtins": CmdBuiltins(),
"cancel": CmdCancel(),
"configure": CmdConfigure(),
"delete": CmdDelete(),
"describe": CmdDescribe(),
"list": CmdList(),
"log": CmdLog(),
Expand Down
28 changes: 28 additions & 0 deletions torchx/cli/test/cmd_delete_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import argparse
import unittest
from unittest.mock import MagicMock, patch

from torchx.cli.cmd_delete import CmdDelete


class CmdDeleteTest(unittest.TestCase):
@patch("torchx.runner.api.Runner.delete")
def test_run(self, delete: MagicMock) -> None:
parser = argparse.ArgumentParser()
cmd_delete = CmdDelete()
cmd_delete.add_arguments(parser)

args = parser.parse_args(["foo://session/id"])
cmd_delete.run(args)

self.assertEqual(delete.call_count, 1)
delete.assert_called_with("foo://session/id")
10 changes: 10 additions & 0 deletions torchx/runner/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,16 @@ def cancel(self, app_handle: AppHandle) -> None:
if status is not None and not status.is_terminal():
scheduler.cancel(app_id)

def delete(self, app_handle: AppHandle) -> None:
"""
Deletes the application from the scheduler.
"""
scheduler, scheduler_backend, app_id = self._scheduler_app_id(app_handle)
with log_event("delete", scheduler_backend, app_id):
status = self.status(app_handle)
if status is not None:
scheduler.delete(app_id)

def stop(self, app_handle: AppHandle) -> None:
"""
See method ``cancel``.
Expand Down
4 changes: 4 additions & 0 deletions torchx/runner/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,10 @@ def test_cancel(self, _) -> None:
with self.get_runner() as runner:
self.assertIsNone(runner.cancel("local_dir://test_session/unknown_app_id"))

def test_delete(self, _) -> None:
with self.get_runner() as runner:
self.assertIsNone(runner.delete("local_dir://test_session/unknown_app_id"))

def test_stop(self, _) -> None:
with self.get_runner() as runner:
self.assertIsNone(runner.stop("local_dir://test_session/unknown_app_id"))
Expand Down
22 changes: 22 additions & 0 deletions torchx/schedulers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,28 @@ def cancel(self, app_id: str) -> None:
# do nothing if the app does not exist
return

def delete(self, app_id: str) -> None:
"""
Deletes the job information for the specified ``app_id`` from the
scheduler's data-plane. Basically "deep-purging" the job from the
scheduler's data-plane. Calling this API on a "live" job (e.g in a
non-terminal status such as PENDING or RUNNING) cancels the job.

Note that this API is only relevant for schedulers for which its
data-plane persistently stores the "JobDefinition" (which is often
versioned). AWS Batch and Kubernetes are examples of such schedulers.
On these schedulers, a finished job may fall out of the data-plane
(e.g. really old finished jobs get deleted) but the JobDefinition is
typically permanently stored. In this case, calling
:py:meth:`~cancel` would not delete the job definition.

In schedulers with no such feature (e.g. SLURM)
:py:meth:`~delete` is the same as :py:meth:`~cancel`, which is the
default implementation. Hence implementors of such schedulers need not
override this method.
"""
self.cancel(app_id)

def log_iter(
self,
app_id: str,
Expand Down
37 changes: 37 additions & 0 deletions torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,16 @@ class KubernetesScheduler(
$ torchx status kubernetes://torchx_user/1234
...

**Cancellation**

Canceling a job aborts it while preserving the job spec for inspection
and cloning via kubectl apply. Use the delete command to remove the job entirely:

.. code-block:: bash

$ torchx cancel kubernetes://namespace/jobname # abort, preserves spec
$ torchx delete kubernetes://namespace/jobname # delete completely

**Config Options**

.. runopts::
Expand Down Expand Up @@ -818,6 +828,33 @@ def _validate(self, app: AppDef, scheduler: str, cfg: KubernetesOpts) -> None:
pass

def _cancel_existing(self, app_id: str) -> None:
"""
Abort a Volcano job while preserving the spec for inspection.
"""
namespace, name = app_id.split(":")
vcjob = self._custom_objects_api().get_namespaced_custom_object(
group="batch.volcano.sh",
version="v1alpha1",
namespace=namespace,
plural="jobs",
name=name,
)
vcjob["status"]["state"]["phase"] = "Aborted"
self._custom_objects_api().replace_namespaced_custom_object_status(
group="batch.volcano.sh",
version="v1alpha1",
namespace=namespace,
plural="jobs",
name=name,
body=vcjob,
)

def delete(self, app_id: str) -> None:
"""
Delete a Volcano job completely from the cluster.
"""
if not self.exists(app_id):
return
Comment on lines +856 to +857
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're going to have to templetize delete() so that this runs for all implementation of delete() otherwise each sub-class is going to have to check this.

this behavior also needs to be documented at the parent level (e.g. that calling delete on an app_id that has fallen out of the scheduler's dataplane does nothing and the user has to manually track down any dangling resources related to the job manually)

namespace, name = app_id.split(":")
self._custom_objects_api().delete_namespaced_custom_object(
group="batch.volcano.sh",
Expand Down
38 changes: 35 additions & 3 deletions torchx/schedulers/test/kubernetes_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,11 +800,26 @@ def test_runopts(self) -> None:
},
)

@patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> None:
@patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object")
@patch("kubernetes.client.CustomObjectsApi.replace_namespaced_custom_object_status")
def test_cancel_existing(
self,
replace_namespaced_custom_object_status: MagicMock,
get_namespaced_custom_object: MagicMock,
) -> None:
scheduler = create_scheduler("test")
get_namespaced_custom_object.return_value = {
"status": {"state": {"phase": "Running"}}
}
scheduler._cancel_existing("testnamespace:testjob")
call = delete_namespaced_custom_object.call_args
get_namespaced_custom_object.assert_called_once_with(
group="batch.volcano.sh",
version="v1alpha1",
namespace="testnamespace",
plural="jobs",
name="testjob",
)
call = replace_namespaced_custom_object_status.call_args
args, kwargs = call
self.assertEqual(
kwargs,
Expand All @@ -814,9 +829,26 @@ def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> No
"namespace": "testnamespace",
"plural": "jobs",
"name": "testjob",
"body": {"status": {"state": {"phase": "Aborted"}}},
},
)

@patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
@patch("torchx.schedulers.kubernetes_scheduler.KubernetesScheduler.exists")
def test_delete(
self, exists: MagicMock, delete_namespaced_custom_object: MagicMock
) -> None:
scheduler = create_scheduler("test")
exists.return_value = True
scheduler.delete("testnamespace:testjob")
delete_namespaced_custom_object.assert_called_once_with(
group="batch.volcano.sh",
version="v1alpha1",
namespace="testnamespace",
plural="jobs",
name="testjob",
)

@patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object")
def test_list(self, list_namespaced_custom_object: MagicMock) -> None:
with patch(
Expand Down
Loading