2727See the
2828`Volcano Quickstart <https://github.com/volcano-sh/volcano>`_
2929for more information.
30+
31+ Pod Overlay
32+ ===========
33+
34+ You can overlay arbitrary Kubernetes PodSpec fields on generated pods using the ``pod``
35+ scheduler argument.
36+
37+ The overlay can be provided as a dict or YAML file path:
38+
39+ .. code:: bash
40+
41+ # Inline dict
42+ torchx run --scheduler kubernetes \\
43+ --scheduler_args 'pod={"spec":{"nodeSelector":{"gpu":"true"}}}' \\
44+ my_component.py
45+
46+ # From YAML file
47+ torchx run --scheduler kubernetes \\
48+ --scheduler_args pod=pod_overlay.yaml \\
49+ my_component.py
50+
51+ Example ``pod_overlay.yaml``:
52+
53+ .. code:: yaml
54+
55+ spec:
56+ nodeSelector:
57+ node.kubernetes.io/instance-type: p4d.24xlarge
58+ tolerations:
59+ - key: nvidia.com/gpu
60+ operator: Exists
61+ effect: NoSchedule
62+ affinity:
63+ podAntiAffinity:
64+ requiredDuringSchedulingIgnoredDuringExecution:
65+ - labelSelector:
66+ matchExpressions:
67+ - key: app
68+ operator: In
69+ values: [trainer]
70+ topologyKey: kubernetes.io/hostname
71+
72+ The overlay is deep-merged with the generated pod spec, preserving existing fields
73+ and adding or overriding specified ones.
3074"""
3175
3276import json
4589 Tuple ,
4690 TYPE_CHECKING ,
4791 TypedDict ,
92+ Union ,
4893)
4994
5095import torchx
97142RESERVED_MILLICPU = 100
98143RESERVED_MEMMB = 1024
99144
145+
146+ def _load_pod_overlay (pod : Union [str , Dict [str , Any ]]) -> Dict [str , Any ]:
147+ """Load pod overlay from dict or YAML file path."""
148+ if isinstance (pod , str ):
149+ try :
150+ with open (pod ) as f :
151+ return yaml .safe_load (f ) or {}
152+ except Exception as e :
153+ raise ValueError (f"Failed to load pod overlay from file { pod } : { e } " ) from e
154+ elif isinstance (pod , dict ):
155+ return pod
156+ else :
157+ raise ValueError (f"pod must be a dict or file path string, got { type (pod )} " )
158+
159+
160+ def _apply_pod_overlay (pod : "V1Pod" , overlay : Dict [str , Any ]) -> None :
161+ """Apply overlay dict to V1Pod object, merging nested fields."""
162+ from kubernetes import client
163+
164+ api = client .ApiClient ()
165+ pod_dict = api .sanitize_for_serialization (pod )
166+
167+ def deep_merge (base : Dict [str , Any ], overlay : Dict [str , Any ]) -> None :
168+ for key , value in overlay .items ():
169+ if isinstance (value , dict ) and key in base and isinstance (base [key ], dict ):
170+ deep_merge (base [key ], value )
171+ else :
172+ base [key ] = value
173+
174+ deep_merge (pod_dict , overlay )
175+
176+ merged_pod = api ._ApiClient__deserialize (pod_dict , "V1Pod" )
177+ pod .spec = merged_pod .spec
178+ pod .metadata = merged_pod .metadata
179+
180+
100181RETRY_POLICIES : Mapping [str , Iterable [Mapping [str , str ]]] = {
101182 RetryPolicy .REPLICA : [],
102183 RetryPolicy .APPLICATION : [
@@ -369,6 +450,7 @@ def app_to_resource(
369450 queue : str ,
370451 service_account : Optional [str ],
371452 priority_class : Optional [str ] = None ,
453+ pod_overlay : Optional [Dict [str , Any ]] = None ,
372454) -> Dict [str , object ]:
373455 """
374456 app_to_resource creates a volcano job kubernetes resource definition from
@@ -402,6 +484,8 @@ def app_to_resource(
402484 replica_role .env ["TORCHX_IMAGE" ] = replica_role .image
403485
404486 pod = role_to_pod (name , replica_role , service_account )
487+ if pod_overlay :
488+ _apply_pod_overlay (pod , pod_overlay )
405489 pod .metadata .labels .update (
406490 pod_labels (
407491 app = app ,
@@ -471,6 +555,7 @@ class KubernetesOpts(TypedDict, total=False):
471555 image_repo : Optional [str ]
472556 service_account : Optional [str ]
473557 priority_class : Optional [str ]
558+ pod : Union [str , Dict [str , Any ]]
474559
475560
476561class KubernetesScheduler (
@@ -636,7 +721,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[KubernetesJob]) -> str:
636721 else :
637722 raise
638723
639- return f' { namespace } :{ resp [" metadata" ][ " name" ] } '
724+ return f" { namespace } :{ resp [' metadata' ][ ' name' ] } "
640725
641726 def _submit_dryrun (
642727 self , app : AppDef , cfg : KubernetesOpts
@@ -658,7 +743,12 @@ def _submit_dryrun(
658743 priority_class , str
659744 ), "priority_class must be a str"
660745
661- resource = app_to_resource (app , queue , service_account , priority_class )
746+ pod = cfg .get ("pod" )
747+ pod_overlay = _load_pod_overlay (pod ) if pod else None
748+
749+ resource = app_to_resource (
750+ app , queue , service_account , priority_class , pod_overlay
751+ )
662752 req = KubernetesJob (
663753 resource = resource ,
664754 images_to_push = images_to_push ,
@@ -703,6 +793,11 @@ def _run_opts(self) -> runopts:
703793 type_ = str ,
704794 help = "The name of the PriorityClass to set on the job specs" ,
705795 )
796+ opts .add (
797+ "pod" ,
798+ type_ = Union [str , dict ],
799+ help = "Pod overlay as dict or YAML file path to merge with generated pod specs" ,
800+ )
706801 return opts
707802
708803 def describe (self , app_id : str ) -> Optional [DescribeAppResponse ]:
0 commit comments