Skip to content

[UX][k8s] show-gpus for all allowed contexts #5362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 87 additions & 35 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
listed in "sky --help". Take care to put logically connected commands close to
each other.
"""
import collections
import copy
import datetime
import functools
Expand Down Expand Up @@ -3413,7 +3414,7 @@ def _list_to_str(lst):

# TODO(zhwu,romilb): We should move most of these kubernetes related
# queries into the backend, especially behind the server.
def _get_kubernetes_realtime_gpu_table(
def _get_kubernetes_realtime_gpu_tables(
context: Optional[str] = None,
name_filter: Optional[str] = None,
quantity_filter: Optional[int] = None):
Expand All @@ -3423,42 +3424,67 @@ def _get_kubernetes_realtime_gpu_table(
else:
qty_header = 'REQUESTABLE_QTY_PER_NODE'
free_header = 'TOTAL_FREE_GPUS'
realtime_gpu_table = log_utils.create_table(
['GPU', qty_header, 'TOTAL_GPUS', free_header])
realtime_gpu_availability_list = sdk.stream_and_get(

realtime_gpu_availability_lists = sdk.stream_and_get(
sdk.realtime_kubernetes_gpu_availability(
context=context,
name_filter=name_filter,
quantity_filter=quantity_filter))
if not realtime_gpu_availability_list:
err_msg = 'No GPUs found in Kubernetes cluster. '
if not realtime_gpu_availability_lists:
err_msg = 'No GPUs found in any allowed Kubernetes cluster. '
debug_msg = 'To further debug, run: sky check '
if name_filter is not None:
gpu_info_msg = f' {name_filter!r}'
if quantity_filter is not None:
gpu_info_msg += (' with requested quantity'
f' {quantity_filter}')
err_msg = (f'Resources{gpu_info_msg} not found '
'in Kubernetes cluster. ')
'in any allowed Kubernetes cluster. ')
debug_msg = ('To show available accelerators on kubernetes,'
' run: sky show-gpus --cloud kubernetes ')
full_err_msg = (err_msg + kubernetes_constants.NO_GPU_HELP_MESSAGE +
debug_msg)
raise ValueError(full_err_msg)
no_permissions_str = '<no permissions>'
for realtime_gpu_availability in sorted(realtime_gpu_availability_list):
gpu_availability = models.RealtimeGpuAvailability(
*realtime_gpu_availability)
available_qty = (gpu_availability.available
if gpu_availability.available != -1 else
no_permissions_str)
realtime_gpu_table.add_row([
gpu_availability.gpu,
_list_to_str(gpu_availability.counts),
gpu_availability.capacity,
available_qty,
])
return realtime_gpu_table
realtime_gpu_infos = []
total_gpu_info: Dict[str, List[int]] = collections.defaultdict(
lambda: [0, 0])

for (ctx, availability_list) in realtime_gpu_availability_lists:
realtime_gpu_table = log_utils.create_table(
['GPU', qty_header, 'TOTAL_GPUS', free_header])
for realtime_gpu_availability in sorted(availability_list):
gpu_availability = models.RealtimeGpuAvailability(
*realtime_gpu_availability)
available_qty = (gpu_availability.available
if gpu_availability.available != -1 else
no_permissions_str)
realtime_gpu_table.add_row([
gpu_availability.gpu,
_list_to_str(gpu_availability.counts),
gpu_availability.capacity,
available_qty,
])
gpu = gpu_availability.gpu
capacity = gpu_availability.capacity
# we want total, so skip permission denied.
available = max(gpu_availability.available, 0)
if capacity > 0:
total_gpu_info[gpu][0] += capacity
total_gpu_info[gpu][1] += available
realtime_gpu_infos.append((ctx, realtime_gpu_table))

# display an aggregated table for all contexts
# if there are more than one contexts with GPUs
if len(realtime_gpu_infos) > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how this works given your example screenshot clearly works for cases where there is only 1 row for total GPU tables. I would expect this condition to be len(realtime_gpu_infos) > 0, or the condition to be based on total_gpu_info instead of realtime_gpu_infos - could you explain what the logic is here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the list realtime_gpu_infos is a list of tables, where each table corresponds to the gpu information of a single k8s cluster. Therefore, if there is only one (or zero) k8s cluster gpu table, then there is technically nothing to aggregate as the resultant aggregate information and the one table will be identical. In my example photograph above, even though there is only one gpu per k8s cluster, there are two k8s clusters, so we can aggregate the gpu information.

total_realtime_gpu_table = log_utils.create_table(
['GPU', 'TOTAL_GPUS', free_header])
for gpu, stats in total_gpu_info.items():
total_realtime_gpu_table.add_row([gpu, stats[0], stats[1]])
else:
total_realtime_gpu_table = None

return realtime_gpu_infos, total_realtime_gpu_table

def _format_kubernetes_node_info(context: Optional[str]):
node_table = log_utils.create_table(
Expand All @@ -3479,7 +3505,7 @@ def _format_kubernetes_node_info(context: Optional[str]):
'Kubernetes per node accelerator availability ')
if nodes_info.hint:
k8s_per_node_acc_message += nodes_info.hint
return (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
return (f'{colorama.Fore.LIGHTMAGENTA_EX}{colorama.Style.NORMAL}'
f'{k8s_per_node_acc_message}'
f'{colorama.Style.RESET_ALL}\n'
f'{node_table.get_string()}')
Expand Down Expand Up @@ -3516,22 +3542,32 @@ def _output() -> Generator[str, None, None]:
# If --cloud kubernetes is not specified, we want to catch
# the case where no GPUs are available on the cluster and
# print the warning at the end.
k8s_realtime_table = _get_kubernetes_realtime_gpu_table(
context)
k8s_realtime_infos, total_table = _get_kubernetes_realtime_gpu_tables(context) # pylint: disable=line-too-long
except ValueError as e:
if not cloud_is_kubernetes:
# Make it a note if cloud is not kubernetes
k8s_messages += 'Note: '
k8s_messages += str(e)
else:
print_section_titles = True
context_str = f'(Context: {context})' if context else ''
yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'Kubernetes GPUs {context_str}'
f'{colorama.Style.RESET_ALL}\n')
yield from k8s_realtime_table.get_string()
yield '\n\n'
yield _format_kubernetes_node_info(context)

# print total table
if total_table is not None:
yield (f'{colorama.Fore.GREEN}{colorama.Style.BRIGHT}'
'Total Kubernetes GPUs'
f'{colorama.Style.RESET_ALL}\n')
yield from total_table.get_string()
yield '\n-----\n\n'

# print individual infos.
for (ctx, k8s_realtime_table) in k8s_realtime_infos:
context_str = f'(Context: {ctx})' if ctx else ''
yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'Kubernetes GPUs {context_str}'
f'{colorama.Style.RESET_ALL}\n')
yield from k8s_realtime_table.get_string()
yield '\n\n'
yield _format_kubernetes_node_info(ctx) + '\n-----\n\n'
if kubernetes_autoscaling:
k8s_messages += (
'\n' + kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE)
Expand Down Expand Up @@ -3620,13 +3656,29 @@ def _output() -> Generator[str, None, None]:
# Print section title if not showing all and instead a specific
# accelerator is requested
print_section_titles = True
yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'Kubernetes GPUs{colorama.Style.RESET_ALL}\n')
# TODO(romilb): Show filtered per node GPU availability here as well
try:
k8s_realtime_table = _get_kubernetes_realtime_gpu_table(
name_filter=name, quantity_filter=quantity)
yield from k8s_realtime_table.get_string()
k8s_realtime_infos, total_table = _get_kubernetes_realtime_gpu_tables( # pylint: disable=line-too-long
context=region,
name_filter=name,
quantity_filter=quantity)

# print total table
if total_table is not None:
yield (f'{colorama.Fore.GREEN}{colorama.Style.BRIGHT}'
'Total Kubernetes GPUs'
f'{colorama.Style.RESET_ALL}\n')
yield from total_table.get_string()
yield '\n-----\n\n'

# print individual tables
for (ctx, k8s_realtime_table) in k8s_realtime_infos:
context_str = f'(Context: {ctx})' if ctx else ''
yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'Kubernetes GPUs {context_str}'
f'{colorama.Style.RESET_ALL}\n')
yield from k8s_realtime_table.get_string()
yield '\n\n'
except ValueError as e:
# In the case of a specific accelerator, show the error message
# immediately (e.g., "Resources H100 not found ...")
Expand Down
14 changes: 7 additions & 7 deletions sky/clouds/service_catalog/kubernetes_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,16 @@ def _list_accelerators(

accelerators_available = accelerator_count - allocated_qty

# Initialize the entry if it doesn't exist yet
if accelerator_name not in total_accelerators_available:
total_accelerators_available[accelerator_name] = 0

if accelerators_available >= min_quantity_filter:
quantized_availability = min_quantity_filter * (
accelerators_available // min_quantity_filter)
total_accelerators_available[accelerator_name] = (
total_accelerators_available.get(accelerator_name, 0) +
quantized_availability)
if quantized_availability > 0:
# only increment when quantized availability is positive
# to avoid assertion errors checking keyset sizes in
# core.py _realtime_kubernetes_gpu_availability_single
total_accelerators_available[accelerator_name] = (
total_accelerators_available.get(
accelerator_name, 0) + quantized_availability)

result = []

Expand Down
87 changes: 58 additions & 29 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,47 +1012,76 @@ def realtime_kubernetes_gpu_availability(
context: Optional[str] = None,
name_filter: Optional[str] = None,
quantity_filter: Optional[int] = None
) -> List[models.RealtimeGpuAvailability]:

counts, capacity, available = service_catalog.list_accelerator_realtime(
gpus_only=True,
clouds='kubernetes',
name_filter=name_filter,
region_filter=context,
quantity_filter=quantity_filter,
case_sensitive=False)
assert (set(counts.keys()) == set(capacity.keys()) == set(
available.keys())), (f'Keys of counts ({list(counts.keys())}), '
f'capacity ({list(capacity.keys())}), '
f'and available ({list(available.keys())}) '
'must be same.')
if len(counts) == 0:
err_msg = 'No GPUs found in Kubernetes cluster. '
) -> List[Tuple[str, List[models.RealtimeGpuAvailability]]]:

if context is None:
context_list = clouds.Kubernetes.existing_allowed_contexts()
else:
context_list = [context]

def _realtime_kubernetes_gpu_availability_single(
context: Optional[str] = None,
name_filter: Optional[str] = None,
quantity_filter: Optional[int] = None
) -> List[models.RealtimeGpuAvailability]:
counts, capacity, available = service_catalog.list_accelerator_realtime(
gpus_only=True,
clouds='kubernetes',
name_filter=name_filter,
region_filter=context,
quantity_filter=quantity_filter,
case_sensitive=False)
assert (set(counts.keys()) == set(capacity.keys()) == set(
available.keys())), (f'Keys of counts ({list(counts.keys())}), '
f'capacity ({list(capacity.keys())}), '
f'and available ({list(available.keys())}) '
'must be the same.')
realtime_gpu_availability_list: List[
models.RealtimeGpuAvailability] = []

for gpu, _ in sorted(counts.items()):
realtime_gpu_availability_list.append(
models.RealtimeGpuAvailability(
gpu,
counts.pop(gpu),
capacity[gpu],
available[gpu],
))
return realtime_gpu_availability_list

availability_lists: List[Tuple[str,
List[models.RealtimeGpuAvailability]]] = []
cumulative_count = 0
parallel_queried = subprocess_utils.run_in_parallel(
lambda ctx: _realtime_kubernetes_gpu_availability_single(
context=ctx,
name_filter=name_filter,
quantity_filter=quantity_filter), context_list)

for ctx, queried in zip(context_list, parallel_queried):
cumulative_count += len(queried)
if len(queried) == 0:
# don't add gpu results for clusters that don't have any
logger.debug(f'No gpus found in k8s cluster {ctx}')
continue
availability_lists.append((ctx, queried))

if cumulative_count == 0:
err_msg = 'No GPUs found in any Kubernetes clusters. '
debug_msg = 'To further debug, run: sky check '
if name_filter is not None:
gpu_info_msg = f' {name_filter!r}'
if quantity_filter is not None:
gpu_info_msg += (' with requested quantity'
f' {quantity_filter}')
err_msg = (f'Resources{gpu_info_msg} not found '
'in Kubernetes cluster. ')
'in Kubernetes clusters. ')
debug_msg = ('To show available accelerators on kubernetes,'
' run: sky show-gpus --cloud kubernetes ')
full_err_msg = (err_msg + kubernetes_constants.NO_GPU_HELP_MESSAGE +
debug_msg)
raise ValueError(full_err_msg)

realtime_gpu_availability_list: List[models.RealtimeGpuAvailability] = []

for gpu, _ in sorted(counts.items()):
realtime_gpu_availability_list.append(
models.RealtimeGpuAvailability(
gpu,
counts.pop(gpu),
capacity[gpu],
available[gpu],
))
return realtime_gpu_availability_list
return availability_lists


# =================
Expand Down