diff --git a/sky/cli.py b/sky/cli.py index 435efb95c79..68f64ec7166 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -23,6 +23,7 @@ listed in "sky --help". Take care to put logically connected commands close to each other. """ +import collections import copy import datetime import functools @@ -3413,7 +3414,7 @@ def _list_to_str(lst): # TODO(zhwu,romilb): We should move most of these kubernetes related # queries into the backend, especially behind the server. - def _get_kubernetes_realtime_gpu_table( + def _get_kubernetes_realtime_gpu_tables( context: Optional[str] = None, name_filter: Optional[str] = None, quantity_filter: Optional[int] = None): @@ -3423,15 +3424,14 @@ def _get_kubernetes_realtime_gpu_table( else: qty_header = 'REQUESTABLE_QTY_PER_NODE' free_header = 'TOTAL_FREE_GPUS' - realtime_gpu_table = log_utils.create_table( - ['GPU', qty_header, 'TOTAL_GPUS', free_header]) - realtime_gpu_availability_list = sdk.stream_and_get( + + realtime_gpu_availability_lists = sdk.stream_and_get( sdk.realtime_kubernetes_gpu_availability( context=context, name_filter=name_filter, quantity_filter=quantity_filter)) - if not realtime_gpu_availability_list: - err_msg = 'No GPUs found in Kubernetes cluster. ' + if not realtime_gpu_availability_lists: + err_msg = 'No GPUs found in any allowed Kubernetes cluster. ' debug_msg = 'To further debug, run: sky check ' if name_filter is not None: gpu_info_msg = f' {name_filter!r}' @@ -3439,26 +3439,52 @@ def _get_kubernetes_realtime_gpu_table( gpu_info_msg += (' with requested quantity' f' {quantity_filter}') err_msg = (f'Resources{gpu_info_msg} not found ' - 'in Kubernetes cluster. ') + 'in any allowed Kubernetes cluster. ') debug_msg = ('To show available accelerators on kubernetes,' ' run: sky show-gpus --cloud kubernetes ') full_err_msg = (err_msg + kubernetes_constants.NO_GPU_HELP_MESSAGE + debug_msg) raise ValueError(full_err_msg) no_permissions_str = '' - for realtime_gpu_availability in sorted(realtime_gpu_availability_list): - gpu_availability = models.RealtimeGpuAvailability( - *realtime_gpu_availability) - available_qty = (gpu_availability.available - if gpu_availability.available != -1 else - no_permissions_str) - realtime_gpu_table.add_row([ - gpu_availability.gpu, - _list_to_str(gpu_availability.counts), - gpu_availability.capacity, - available_qty, - ]) - return realtime_gpu_table + realtime_gpu_infos = [] + total_gpu_info: Dict[str, List[int]] = collections.defaultdict( + lambda: [0, 0]) + + for (ctx, availability_list) in realtime_gpu_availability_lists: + realtime_gpu_table = log_utils.create_table( + ['GPU', qty_header, 'TOTAL_GPUS', free_header]) + for realtime_gpu_availability in sorted(availability_list): + gpu_availability = models.RealtimeGpuAvailability( + *realtime_gpu_availability) + available_qty = (gpu_availability.available + if gpu_availability.available != -1 else + no_permissions_str) + realtime_gpu_table.add_row([ + gpu_availability.gpu, + _list_to_str(gpu_availability.counts), + gpu_availability.capacity, + available_qty, + ]) + gpu = gpu_availability.gpu + capacity = gpu_availability.capacity + # we want total, so skip permission denied. + available = max(gpu_availability.available, 0) + if capacity > 0: + total_gpu_info[gpu][0] += capacity + total_gpu_info[gpu][1] += available + realtime_gpu_infos.append((ctx, realtime_gpu_table)) + + # display an aggregated table for all contexts + # if there are more than one contexts with GPUs + if len(realtime_gpu_infos) > 1: + total_realtime_gpu_table = log_utils.create_table( + ['GPU', 'TOTAL_GPUS', free_header]) + for gpu, stats in total_gpu_info.items(): + total_realtime_gpu_table.add_row([gpu, stats[0], stats[1]]) + else: + total_realtime_gpu_table = None + + return realtime_gpu_infos, total_realtime_gpu_table def _format_kubernetes_node_info(context: Optional[str]): node_table = log_utils.create_table( @@ -3479,7 +3505,7 @@ def _format_kubernetes_node_info(context: Optional[str]): 'Kubernetes per node accelerator availability ') if nodes_info.hint: k8s_per_node_acc_message += nodes_info.hint - return (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + return (f'{colorama.Fore.LIGHTMAGENTA_EX}{colorama.Style.NORMAL}' f'{k8s_per_node_acc_message}' f'{colorama.Style.RESET_ALL}\n' f'{node_table.get_string()}') @@ -3516,8 +3542,7 @@ def _output() -> Generator[str, None, None]: # If --cloud kubernetes is not specified, we want to catch # the case where no GPUs are available on the cluster and # print the warning at the end. - k8s_realtime_table = _get_kubernetes_realtime_gpu_table( - context) + k8s_realtime_infos, total_table = _get_kubernetes_realtime_gpu_tables(context) # pylint: disable=line-too-long except ValueError as e: if not cloud_is_kubernetes: # Make it a note if cloud is not kubernetes @@ -3525,13 +3550,24 @@ def _output() -> Generator[str, None, None]: k8s_messages += str(e) else: print_section_titles = True - context_str = f'(Context: {context})' if context else '' - yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Kubernetes GPUs {context_str}' - f'{colorama.Style.RESET_ALL}\n') - yield from k8s_realtime_table.get_string() - yield '\n\n' - yield _format_kubernetes_node_info(context) + + # print total table + if total_table is not None: + yield (f'{colorama.Fore.GREEN}{colorama.Style.BRIGHT}' + 'Total Kubernetes GPUs' + f'{colorama.Style.RESET_ALL}\n') + yield from total_table.get_string() + yield '\n-----\n\n' + + # print individual infos. + for (ctx, k8s_realtime_table) in k8s_realtime_infos: + context_str = f'(Context: {ctx})' if ctx else '' + yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes GPUs {context_str}' + f'{colorama.Style.RESET_ALL}\n') + yield from k8s_realtime_table.get_string() + yield '\n\n' + yield _format_kubernetes_node_info(ctx) + '\n-----\n\n' if kubernetes_autoscaling: k8s_messages += ( '\n' + kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE) @@ -3620,13 +3656,29 @@ def _output() -> Generator[str, None, None]: # Print section title if not showing all and instead a specific # accelerator is requested print_section_titles = True - yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Kubernetes GPUs{colorama.Style.RESET_ALL}\n') # TODO(romilb): Show filtered per node GPU availability here as well try: - k8s_realtime_table = _get_kubernetes_realtime_gpu_table( - name_filter=name, quantity_filter=quantity) - yield from k8s_realtime_table.get_string() + k8s_realtime_infos, total_table = _get_kubernetes_realtime_gpu_tables( # pylint: disable=line-too-long + context=region, + name_filter=name, + quantity_filter=quantity) + + # print total table + if total_table is not None: + yield (f'{colorama.Fore.GREEN}{colorama.Style.BRIGHT}' + 'Total Kubernetes GPUs' + f'{colorama.Style.RESET_ALL}\n') + yield from total_table.get_string() + yield '\n-----\n\n' + + # print individual tables + for (ctx, k8s_realtime_table) in k8s_realtime_infos: + context_str = f'(Context: {ctx})' if ctx else '' + yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes GPUs {context_str}' + f'{colorama.Style.RESET_ALL}\n') + yield from k8s_realtime_table.get_string() + yield '\n\n' except ValueError as e: # In the case of a specific accelerator, show the error message # immediately (e.g., "Resources H100 not found ...") diff --git a/sky/clouds/service_catalog/kubernetes_catalog.py b/sky/clouds/service_catalog/kubernetes_catalog.py index d9a0344fb3f..f881d803a5f 100644 --- a/sky/clouds/service_catalog/kubernetes_catalog.py +++ b/sky/clouds/service_catalog/kubernetes_catalog.py @@ -261,16 +261,16 @@ def _list_accelerators( accelerators_available = accelerator_count - allocated_qty - # Initialize the entry if it doesn't exist yet - if accelerator_name not in total_accelerators_available: - total_accelerators_available[accelerator_name] = 0 - if accelerators_available >= min_quantity_filter: quantized_availability = min_quantity_filter * ( accelerators_available // min_quantity_filter) - total_accelerators_available[accelerator_name] = ( - total_accelerators_available.get(accelerator_name, 0) + - quantized_availability) + if quantized_availability > 0: + # only increment when quantized availability is positive + # to avoid assertion errors checking keyset sizes in + # core.py _realtime_kubernetes_gpu_availability_single + total_accelerators_available[accelerator_name] = ( + total_accelerators_available.get( + accelerator_name, 0) + quantized_availability) result = [] diff --git a/sky/core.py b/sky/core.py index d0b01537382..2e5e4900ca9 100644 --- a/sky/core.py +++ b/sky/core.py @@ -1012,22 +1012,62 @@ def realtime_kubernetes_gpu_availability( context: Optional[str] = None, name_filter: Optional[str] = None, quantity_filter: Optional[int] = None -) -> List[models.RealtimeGpuAvailability]: - - counts, capacity, available = service_catalog.list_accelerator_realtime( - gpus_only=True, - clouds='kubernetes', - name_filter=name_filter, - region_filter=context, - quantity_filter=quantity_filter, - case_sensitive=False) - assert (set(counts.keys()) == set(capacity.keys()) == set( - available.keys())), (f'Keys of counts ({list(counts.keys())}), ' - f'capacity ({list(capacity.keys())}), ' - f'and available ({list(available.keys())}) ' - 'must be same.') - if len(counts) == 0: - err_msg = 'No GPUs found in Kubernetes cluster. ' +) -> List[Tuple[str, List[models.RealtimeGpuAvailability]]]: + + if context is None: + context_list = clouds.Kubernetes.existing_allowed_contexts() + else: + context_list = [context] + + def _realtime_kubernetes_gpu_availability_single( + context: Optional[str] = None, + name_filter: Optional[str] = None, + quantity_filter: Optional[int] = None + ) -> List[models.RealtimeGpuAvailability]: + counts, capacity, available = service_catalog.list_accelerator_realtime( + gpus_only=True, + clouds='kubernetes', + name_filter=name_filter, + region_filter=context, + quantity_filter=quantity_filter, + case_sensitive=False) + assert (set(counts.keys()) == set(capacity.keys()) == set( + available.keys())), (f'Keys of counts ({list(counts.keys())}), ' + f'capacity ({list(capacity.keys())}), ' + f'and available ({list(available.keys())}) ' + 'must be the same.') + realtime_gpu_availability_list: List[ + models.RealtimeGpuAvailability] = [] + + for gpu, _ in sorted(counts.items()): + realtime_gpu_availability_list.append( + models.RealtimeGpuAvailability( + gpu, + counts.pop(gpu), + capacity[gpu], + available[gpu], + )) + return realtime_gpu_availability_list + + availability_lists: List[Tuple[str, + List[models.RealtimeGpuAvailability]]] = [] + cumulative_count = 0 + parallel_queried = subprocess_utils.run_in_parallel( + lambda ctx: _realtime_kubernetes_gpu_availability_single( + context=ctx, + name_filter=name_filter, + quantity_filter=quantity_filter), context_list) + + for ctx, queried in zip(context_list, parallel_queried): + cumulative_count += len(queried) + if len(queried) == 0: + # don't add gpu results for clusters that don't have any + logger.debug(f'No gpus found in k8s cluster {ctx}') + continue + availability_lists.append((ctx, queried)) + + if cumulative_count == 0: + err_msg = 'No GPUs found in any Kubernetes clusters. ' debug_msg = 'To further debug, run: sky check ' if name_filter is not None: gpu_info_msg = f' {name_filter!r}' @@ -1035,24 +1075,13 @@ def realtime_kubernetes_gpu_availability( gpu_info_msg += (' with requested quantity' f' {quantity_filter}') err_msg = (f'Resources{gpu_info_msg} not found ' - 'in Kubernetes cluster. ') + 'in Kubernetes clusters. ') debug_msg = ('To show available accelerators on kubernetes,' ' run: sky show-gpus --cloud kubernetes ') full_err_msg = (err_msg + kubernetes_constants.NO_GPU_HELP_MESSAGE + debug_msg) raise ValueError(full_err_msg) - - realtime_gpu_availability_list: List[models.RealtimeGpuAvailability] = [] - - for gpu, _ in sorted(counts.items()): - realtime_gpu_availability_list.append( - models.RealtimeGpuAvailability( - gpu, - counts.pop(gpu), - capacity[gpu], - available[gpu], - )) - return realtime_gpu_availability_list + return availability_lists # =================