diff --git a/.github/workflows/integration_gpu_cluster_create.yaml b/.github/workflows/integration_gpu_cluster_create.yaml index b2de55c16..06fa1174f 100644 --- a/.github/workflows/integration_gpu_cluster_create.yaml +++ b/.github/workflows/integration_gpu_cluster_create.yaml @@ -27,7 +27,7 @@ jobs: group: nightly-test-cluster-group-gpu cancel-in-progress: false env: - GPU_CLUSTER_NAME: nightly-xpk-b200 + GPU_CLUSTER_NAME: nightly-xpk-h100 WORKLOAD_NAME: xpktest-gpu-nightly-${{ github.run_attempt }} steps: - uses: actions/download-artifact@v4 @@ -55,21 +55,21 @@ jobs: # 4. Set Env Var for the host (GitHub Runner) echo "GOOGLE_APPLICATION_CREDENTIALS=$HOME/.config/gcloud/application_default_credentials.json" >> $GITHUB_ENV - - name: Create an XPK Cluster with 1 x b200 GPU - run: xpk cluster create --cluster $GPU_CLUSTER_NAME --device-type=b200-8 --zone=asia-northeast1-b --default-pool-cpu-machine-type=n1-standard-16 --spot + - name: Create an XPK Cluster with 1 x h100 GPU + run: xpk cluster create --cluster $GPU_CLUSTER_NAME --device-type=h100-mega-80gb-8 --zone=asia-southeast1-b --default-pool-cpu-machine-type=e2-standard-8 --spot - name: Authenticate Docker run: gcloud auth configure-docker --quiet - name: Run a base-docker-image workload - run: xpk workload create --cluster $GPU_CLUSTER_NAME --workload $WORKLOAD_NAME --docker-image='nvidia/cuda:12.1.0-base-ubuntu22.04' --command "nvidia-smi" --zone=asia-northeast1-b --device-type=b200-8 + run: xpk workload create --cluster $GPU_CLUSTER_NAME --workload $WORKLOAD_NAME --docker-image='nvidia/cuda:12.1.0-base-ubuntu22.04' --command "nvidia-smi" --zone=asia-southeast1-b --device-type=h100-mega-80gb-8 - name: List out the workloads on the cluster - run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b + run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b - name: Wait for workload completion and confirm it succeeded - run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b --wait-for-job-completion $WORKLOAD_NAME --timeout 600 + run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b --wait-for-job-completion $WORKLOAD_NAME --timeout 600 - name: Delete the workload on the cluster - run: xpk workload delete --workload $WORKLOAD_NAME --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b + run: xpk workload delete --workload $WORKLOAD_NAME --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b - name: Delete the cluster created if: always() - run: xpk cluster delete --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b --force + run: xpk cluster delete --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b --force - name: Upload cluster nodepool creation log if: always() uses: actions/upload-artifact@v4 diff --git a/src/xpk/core/blueprint/blueprint_generator.py b/src/xpk/core/blueprint/blueprint_generator.py index c75c33f4f..d200640b0 100644 --- a/src/xpk/core/blueprint/blueprint_generator.py +++ b/src/xpk/core/blueprint/blueprint_generator.py @@ -257,7 +257,7 @@ def generate_a3_mega_blueprint( source="modules/management/kubectl-apply", use=["gke_cluster"], settings={ - "jobset": {"install": True, "version": "v0.7.2"}, + "jobset": {"install": True, "version": "v0.8.1"}, "apply_manifests": [{ "source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml' }], @@ -634,7 +634,7 @@ def generate_a3_ultra_blueprint( source="modules/management/kubectl-apply", use=[cluster_id], settings={ - "jobset": {"install": True, "version": "v0.7.2"}, + "jobset": {"install": True, "version": "v0.8.1"}, "apply_manifests": [ {"source": nccl_installer_path}, {"source": mlgru_disable_path}, @@ -917,7 +917,7 @@ def generate_a4_blueprint( source="modules/management/kubectl-apply", use=[cluster_id], settings={ - "jobset": {"install": True, "version": "v0.7.2"}, + "jobset": {"install": True, "version": "v0.8.1"}, "apply_manifests": [ {"source": nccl_installer_path}, { diff --git a/src/xpk/core/blueprint/testing/data/a3_mega.yaml b/src/xpk/core/blueprint/testing/data/a3_mega.yaml index 71462240f..b7a0abecd 100644 --- a/src/xpk/core/blueprint/testing/data/a3_mega.yaml +++ b/src/xpk/core/blueprint/testing/data/a3_mega.yaml @@ -109,7 +109,7 @@ deployment_groups: settings: jobset: install: true - version: v0.7.2 + version: v0.8.1 apply_manifests: - source: $(ghpc_stage("xpk-gke-a3-megagpu"))/storage_crd.yaml diff --git a/src/xpk/core/blueprint/testing/data/a3_mega_spot.yaml b/src/xpk/core/blueprint/testing/data/a3_mega_spot.yaml index e51f5e9f7..06083251e 100644 --- a/src/xpk/core/blueprint/testing/data/a3_mega_spot.yaml +++ b/src/xpk/core/blueprint/testing/data/a3_mega_spot.yaml @@ -105,7 +105,7 @@ deployment_groups: settings: jobset: install: true - version: v0.7.2 + version: v0.8.1 apply_manifests: - source: $(ghpc_stage("xpk-gke-a3-megagpu"))/storage_crd.yaml diff --git a/src/xpk/core/blueprint/testing/data/a3_ultra.yaml b/src/xpk/core/blueprint/testing/data/a3_ultra.yaml index 5fbdac986..8d955e155 100644 --- a/src/xpk/core/blueprint/testing/data/a3_ultra.yaml +++ b/src/xpk/core/blueprint/testing/data/a3_ultra.yaml @@ -153,7 +153,7 @@ deployment_groups: settings: jobset: install: true - version: v0.7.2 + version: v0.8.1 apply_manifests: - source: $(ghpc_stage("xpk-gke-a3-ultra"))/nccl-installer.yaml - source: $(ghpc_stage("xpk-gke-a3-ultra"))/mlgru-disable.yaml diff --git a/src/xpk/core/blueprint/testing/data/a4.yaml b/src/xpk/core/blueprint/testing/data/a4.yaml index 755a33430..6f3a2a4a4 100644 --- a/src/xpk/core/blueprint/testing/data/a4.yaml +++ b/src/xpk/core/blueprint/testing/data/a4.yaml @@ -165,7 +165,7 @@ deployment_groups: settings: jobset: install: true - version: v0.7.2 + version: v0.8.1 apply_manifests: - source: $(ghpc_stage("xpk-gke-a4"))/nccl-rdma-installer-a4.yaml - source: $(ghpc_stage("xpk-gke-a4"))/storage_crd.yaml diff --git a/src/xpk/core/workload.py b/src/xpk/core/workload.py index 7aba829af..35d61a752 100644 --- a/src/xpk/core/workload.py +++ b/src/xpk/core/workload.py @@ -14,87 +14,13 @@ limitations under the License. """ +import json import re from ..utils.console import xpk_exit, xpk_print from .commands import run_command_for_value from .gcloud_context import get_cluster_location -def workload_list_awk_command(filter_key) -> str: - """Function returns the awk command needed from the filter specified. - - Args: - filter_key: workload list filter to awk against - - Returns: - awk command to use in filtering workload list. - """ - - return f" | awk -e 'NR == 1 || {filter_key} {{print $0}}'" - - -def determine_workload_list_filter_by_status(args) -> str: - """Function to create the filtered view of workload list. - - Args: - args: user provided arguments for running the command. - - Returns: - the argument needed to filter by status of jobs in workload list. - """ - - # Argument positions related to columns created by workload list command. - status_arg = '$7' - running_vms_arg = '$5' - status_verbose_arg = '$9' - if args.filter_by_status == 'EVERYTHING': - return '' - elif args.filter_by_status == 'RUNNING': - # Running includes the status Admitted or Evicted, and when the number of - # vms running is > 0. - return workload_list_awk_command( - f'({status_arg} ~ "Admitted|Evicted" && {running_vms_arg} ~ /^[0-9]+$/' - f' && {running_vms_arg} > 0)' - ) - elif args.filter_by_status == 'QUEUED': - # Queued includes the status Admitted or Evicted, and when the number of - # vms running is 0. - return workload_list_awk_command( - f'({status_arg} ~ "Admitted|Evicted|QuotaReserved" &&' - f' ({running_vms_arg} ~ "" || {running_vms_arg} == 0))' - ) - elif args.filter_by_status == 'FINISHED': - return workload_list_awk_command(f'{status_arg} == "Finished"') - elif args.filter_by_status == 'FAILED': - # Failed includes the status Finished, and when the verbose reason is failed. - return workload_list_awk_command( - f'({status_arg} == "Finished" && {status_verbose_arg} ~ "failed")' - ) - elif args.filter_by_status == 'SUCCESSFUL': - # Failed includes the status Finished, and when the verbose reason is finished/success. - return workload_list_awk_command( - f'({status_arg} == "Finished" && {status_verbose_arg} ~ "finished")' - ) - raise RuntimeError(f'Can not find filter type: {args.filter_by_status}') - - -def determine_workload_list_filter_by_job(args) -> str: - """Function to filter view of workload list based on job name. - - Args: - args: user provided arguments for running the command. - - Returns: - the argument needed to filter job names from workload list - """ - # Argument positions related to columns created by workload list command. - if not hasattr(args, 'filter_by_job') or args.filter_by_job is None: - return '' - else: - job_name_arg = '$1' - return workload_list_awk_command(f'{job_name_arg} ~ "{args.filter_by_job}"') - - def get_workload_list(args) -> tuple[int, str]: """Function to get the list of the workloads in the cluster. @@ -105,34 +31,108 @@ def get_workload_list(args) -> tuple[int, str]: return_code: 0 if successful and 1 otherwise. return_value: workloads in the cluster matching the criteria. """ - columns = { - 'Jobset Name': '.metadata.ownerReferences[0].name', - 'Created Time': '.metadata.creationTimestamp', - 'Priority': '.spec.priorityClassName', - 'TPU VMs Needed': '.spec.podSets[0].count', - 'TPU VMs Running/Ran': '.status.admission.podSetAssignments[-1].count', - 'TPU VMs Done': '.status.reclaimablePods[0].count', - 'Status': '.status.conditions[-1].type', - 'Status Message': '.status.conditions[-1].message', - 'Status Time': '.status.conditions[-1].lastTransitionTime', - } - s = ','.join([key + ':' + value for key, value in columns.items()]) - - workload_list_filter_status_cmd = determine_workload_list_filter_by_status( - args - ) - workload_list_filter_job_cmd = determine_workload_list_filter_by_job(args) - command = ( - f'kubectl get workloads --ignore-not-found -o=custom-columns="{s}" ' - f'{workload_list_filter_status_cmd} {workload_list_filter_job_cmd}' - ) + command = 'kubectl get workloads --ignore-not-found -o json' task = f'List Jobs with filter-by-status={args.filter_by_status}' - if hasattr(args, 'filter_by_job'): + if hasattr(args, 'filter_by_job') and args.filter_by_job: task += f' with filter-by-job={args.filter_by_job}' return_code, return_value = run_command_for_value(command, task) - return return_code, return_value + if return_code != 0: + return return_code, return_value + + try: + if not return_value.strip(): + workloads_json = {'items': []} + else: + workloads_json = json.loads(return_value) + except json.JSONDecodeError: + return 1, f"Failed to parse kubectl output as JSON: {return_value}" + + items = workloads_json.get('items', []) + + rows = [] + headers = [ + 'Jobset Name', + 'Created Time', + 'Priority', + 'TPU VMs Needed', + 'TPU VMs Running/Ran', + 'TPU VMs Done', + 'Status', + 'Status Message', + 'Status Time', + ] + + for item in items: + metadata = item.get('metadata', {}) + spec = item.get('spec', {}) + status = item.get('status', {}) + + owner_refs = metadata.get('ownerReferences', []) + jobset_name = owner_refs[0].get('name', '') if owner_refs else '' + + if hasattr(args, 'filter_by_job') and args.filter_by_job: + if not re.search(args.filter_by_job, jobset_name): + continue + + created_time = metadata.get('creationTimestamp', '') + priority = spec.get('priorityClassName', '') + + pod_sets = spec.get('podSets', []) + tpu_vms_needed = str(pod_sets[0].get('count', '')) if pod_sets else '' + + admission = status.get('admission', {}) + pod_set_assignments = admission.get('podSetAssignments', []) + tpu_vms_running = str(pod_set_assignments[-1].get('count', '')) if pod_set_assignments else '' + + reclaimable = status.get('reclaimablePods', []) + tpu_vms_done = str(reclaimable[0].get('count', '')) if reclaimable else '' + + conditions = status.get('conditions', []) + if conditions: + latest_cond = sorted(conditions, key=lambda c: c.get('lastTransitionTime', ''))[-1] + cond_type = latest_cond.get('type', '') + cond_msg = latest_cond.get('message', '') + cond_time = latest_cond.get('lastTransitionTime', '') + else: + cond_type = '' + cond_msg = '' + cond_time = '' + + keep = False + filter_status = getattr(args, 'filter_by_status', 'EVERYTHING') + if filter_status == 'EVERYTHING': + keep = True + elif filter_status == 'RUNNING': + if cond_type in ('Admitted', 'Evicted') and tpu_vms_running.isdigit() and int(tpu_vms_running) > 0: + keep = True + elif filter_status == 'QUEUED': + if cond_type in ('Admitted', 'Evicted', 'QuotaReserved') and (tpu_vms_running == '' or tpu_vms_running == '0'): + keep = True + elif filter_status == 'FINISHED': + if cond_type == 'Finished': + keep = True + elif filter_status == 'FAILED': + if cond_type == 'Finished' and 'failed' in cond_msg.lower(): + keep = True + elif filter_status == 'SUCCESSFUL': + if cond_type == 'Finished' and ('finished' in cond_msg.lower() or 'success' in cond_msg.lower()): + keep = True + else: + return 1, f"Can not find filter type: {filter_status}" + + if keep: + rows.append([jobset_name, created_time, priority, tpu_vms_needed, tpu_vms_running, tpu_vms_done, cond_type, cond_msg, cond_time]) + + all_data = [headers] + rows + col_widths = [max(len(str(item)) for item in col) for col in zip(*all_data)] + + lines = [] + for row in all_data: + lines.append(' '.join(str(item).ljust(width) for item, width in zip(row, col_widths))) + + return 0, '\n'.join(lines) def check_if_workload_exists(args) -> bool: @@ -199,7 +199,7 @@ def wait_for_job_completion(args) -> int: f'{timeout_val}s' if timeout_val != -1 else 'max timeout (1 week)' ) wait_cmd = ( - "kubectl wait --for jsonpath='.status.conditions[-1].type'=Finished" + 'kubectl wait --for=condition=Finished' f' workload {full_workload_name} --timeout={timeout_val}s' ) return_code, return_value = run_command_for_value( @@ -227,7 +227,7 @@ def wait_for_job_completion(args) -> int: ) status_cmd = ( f'kubectl get jobset {args.workload} -o' - " jsonpath='{.status.conditions[-1].type}'" + " jsonpath='{.status.conditions[?(@.type==\"Completed\")].status}'" ) return_code, return_value = run_command_for_value( status_cmd, 'Get jobset status' @@ -235,11 +235,30 @@ def wait_for_job_completion(args) -> int: if return_code != 0: xpk_print(f'Get workload status request returned ERROR {return_code}') return return_code - xpk_print(f'Your workload finished with status: {return_value}') - if return_value != 'Completed': - xpk_print('Your workload did not complete successfully') - return 125 - return 0 + + if return_value == 'True': + xpk_print(f'Your workload finished with Completed status: {return_value}') + return 0 + + # If not True, check for Failed condition to output a cleaner message + failed_status_cmd = ( + f'kubectl get jobset {args.workload} -o' + " jsonpath='{.status.conditions[?(@.type==\"Failed\")].status}'" + ) + failed_return_code, failed_return_value = run_command_for_value( + failed_status_cmd, 'Get jobset failed status' + ) + if failed_return_code != 0: + xpk_print(f'Get workload failed status request returned ERROR {failed_return_code}') + return failed_return_code + + if failed_return_value == 'True': + xpk_print(f'Your workload failed with Failed status: {failed_return_value}') + else: + xpk_print('Your workload finished without a Completed or Failed status') + + xpk_print('Your workload did not complete successfully') + return 125 GCP_NAME_FILTER_VALUE_REGEX = re.compile(r'[a-z0-9\-]+')