Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/integration_gpu_cluster_create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
group: nightly-test-cluster-group-gpu
cancel-in-progress: false
env:
GPU_CLUSTER_NAME: nightly-xpk-b200
GPU_CLUSTER_NAME: nightly-xpk-h100
WORKLOAD_NAME: xpktest-gpu-nightly-${{ github.run_attempt }}
steps:
- uses: actions/download-artifact@v4
Expand Down Expand Up @@ -55,21 +55,21 @@ jobs:

# 4. Set Env Var for the host (GitHub Runner)
echo "GOOGLE_APPLICATION_CREDENTIALS=$HOME/.config/gcloud/application_default_credentials.json" >> $GITHUB_ENV
- name: Create an XPK Cluster with 1 x b200 GPU
run: xpk cluster create --cluster $GPU_CLUSTER_NAME --device-type=b200-8 --zone=asia-northeast1-b --default-pool-cpu-machine-type=n1-standard-16 --spot
- name: Create an XPK Cluster with 1 x h100 GPU
run: xpk cluster create --cluster $GPU_CLUSTER_NAME --device-type=h100-mega-80gb-8 --zone=asia-southeast1-b --default-pool-cpu-machine-type=e2-standard-8 --spot
- name: Authenticate Docker
run: gcloud auth configure-docker --quiet
- name: Run a base-docker-image workload
run: xpk workload create --cluster $GPU_CLUSTER_NAME --workload $WORKLOAD_NAME --docker-image='nvidia/cuda:12.1.0-base-ubuntu22.04' --command "nvidia-smi" --zone=asia-northeast1-b --device-type=b200-8
run: xpk workload create --cluster $GPU_CLUSTER_NAME --workload $WORKLOAD_NAME --docker-image='nvidia/cuda:12.1.0-base-ubuntu22.04' --command "nvidia-smi" --zone=asia-southeast1-b --device-type=h100-mega-80gb-8
- name: List out the workloads on the cluster
run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b
run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b
- name: Wait for workload completion and confirm it succeeded
run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b --wait-for-job-completion $WORKLOAD_NAME --timeout 600
run: xpk workload list --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b --wait-for-job-completion $WORKLOAD_NAME --timeout 600
- name: Delete the workload on the cluster
run: xpk workload delete --workload $WORKLOAD_NAME --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b
run: xpk workload delete --workload $WORKLOAD_NAME --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b
- name: Delete the cluster created
if: always()
run: xpk cluster delete --cluster $GPU_CLUSTER_NAME --zone=asia-northeast1-b --force
run: xpk cluster delete --cluster $GPU_CLUSTER_NAME --zone=asia-southeast1-b --force
- name: Upload cluster nodepool creation log
if: always()
uses: actions/upload-artifact@v4
Expand Down
6 changes: 3 additions & 3 deletions src/xpk/core/blueprint/blueprint_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def generate_a3_mega_blueprint(
source="modules/management/kubectl-apply",
use=["gke_cluster"],
settings={
"jobset": {"install": True, "version": "v0.7.2"},
"jobset": {"install": True, "version": "v0.8.1"},
"apply_manifests": [{
"source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'
}],
Expand Down Expand Up @@ -634,7 +634,7 @@ def generate_a3_ultra_blueprint(
source="modules/management/kubectl-apply",
use=[cluster_id],
settings={
"jobset": {"install": True, "version": "v0.7.2"},
"jobset": {"install": True, "version": "v0.8.1"},
"apply_manifests": [
{"source": nccl_installer_path},
{"source": mlgru_disable_path},
Expand Down Expand Up @@ -917,7 +917,7 @@ def generate_a4_blueprint(
source="modules/management/kubectl-apply",
use=[cluster_id],
settings={
"jobset": {"install": True, "version": "v0.7.2"},
"jobset": {"install": True, "version": "v0.8.1"},
"apply_manifests": [
{"source": nccl_installer_path},
{
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/blueprint/testing/data/a3_mega.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ deployment_groups:
settings:
jobset:
install: true
version: v0.7.2
version: v0.8.1
apply_manifests:
- source: $(ghpc_stage("xpk-gke-a3-megagpu"))/storage_crd.yaml

Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/blueprint/testing/data/a3_mega_spot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ deployment_groups:
settings:
jobset:
install: true
version: v0.7.2
version: v0.8.1
apply_manifests:
- source: $(ghpc_stage("xpk-gke-a3-megagpu"))/storage_crd.yaml

Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/blueprint/testing/data/a3_ultra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ deployment_groups:
settings:
jobset:
install: true
version: v0.7.2
version: v0.8.1
apply_manifests:
- source: $(ghpc_stage("xpk-gke-a3-ultra"))/nccl-installer.yaml
- source: $(ghpc_stage("xpk-gke-a3-ultra"))/mlgru-disable.yaml
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/blueprint/testing/data/a4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ deployment_groups:
settings:
jobset:
install: true
version: v0.7.2
version: v0.8.1
apply_manifests:
- source: $(ghpc_stage("xpk-gke-a4"))/nccl-rdma-installer-a4.yaml
- source: $(ghpc_stage("xpk-gke-a4"))/storage_crd.yaml
Expand Down
229 changes: 124 additions & 105 deletions src/xpk/core/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,87 +14,13 @@
limitations under the License.
"""

import json
import re
from ..utils.console import xpk_exit, xpk_print
from .commands import run_command_for_value
from .gcloud_context import get_cluster_location


def workload_list_awk_command(filter_key) -> str:
"""Function returns the awk command needed from the filter specified.

Args:
filter_key: workload list filter to awk against

Returns:
awk command to use in filtering workload list.
"""

return f" | awk -e 'NR == 1 || {filter_key} {{print $0}}'"


def determine_workload_list_filter_by_status(args) -> str:
"""Function to create the filtered view of workload list.

Args:
args: user provided arguments for running the command.

Returns:
the argument needed to filter by status of jobs in workload list.
"""

# Argument positions related to columns created by workload list command.
status_arg = '$7'
running_vms_arg = '$5'
status_verbose_arg = '$9'
if args.filter_by_status == 'EVERYTHING':
return ''
elif args.filter_by_status == 'RUNNING':
# Running includes the status Admitted or Evicted, and when the number of
# vms running is > 0.
return workload_list_awk_command(
f'({status_arg} ~ "Admitted|Evicted" && {running_vms_arg} ~ /^[0-9]+$/'
f' && {running_vms_arg} > 0)'
)
elif args.filter_by_status == 'QUEUED':
# Queued includes the status Admitted or Evicted, and when the number of
# vms running is 0.
return workload_list_awk_command(
f'({status_arg} ~ "Admitted|Evicted|QuotaReserved" &&'
f' ({running_vms_arg} ~ "<none>" || {running_vms_arg} == 0))'
)
elif args.filter_by_status == 'FINISHED':
return workload_list_awk_command(f'{status_arg} == "Finished"')
elif args.filter_by_status == 'FAILED':
# Failed includes the status Finished, and when the verbose reason is failed.
return workload_list_awk_command(
f'({status_arg} == "Finished" && {status_verbose_arg} ~ "failed")'
)
elif args.filter_by_status == 'SUCCESSFUL':
# Failed includes the status Finished, and when the verbose reason is finished/success.
return workload_list_awk_command(
f'({status_arg} == "Finished" && {status_verbose_arg} ~ "finished")'
)
raise RuntimeError(f'Can not find filter type: {args.filter_by_status}')


def determine_workload_list_filter_by_job(args) -> str:
"""Function to filter view of workload list based on job name.

Args:
args: user provided arguments for running the command.

Returns:
the argument needed to filter job names from workload list
"""
# Argument positions related to columns created by workload list command.
if not hasattr(args, 'filter_by_job') or args.filter_by_job is None:
return ''
else:
job_name_arg = '$1'
return workload_list_awk_command(f'{job_name_arg} ~ "{args.filter_by_job}"')


def get_workload_list(args) -> tuple[int, str]:
"""Function to get the list of the workloads in the cluster.

Expand All @@ -105,34 +31,108 @@ def get_workload_list(args) -> tuple[int, str]:
return_code: 0 if successful and 1 otherwise.
return_value: workloads in the cluster matching the criteria.
"""
columns = {
'Jobset Name': '.metadata.ownerReferences[0].name',
'Created Time': '.metadata.creationTimestamp',
'Priority': '.spec.priorityClassName',
'TPU VMs Needed': '.spec.podSets[0].count',
'TPU VMs Running/Ran': '.status.admission.podSetAssignments[-1].count',
'TPU VMs Done': '.status.reclaimablePods[0].count',
'Status': '.status.conditions[-1].type',
'Status Message': '.status.conditions[-1].message',
'Status Time': '.status.conditions[-1].lastTransitionTime',
}
s = ','.join([key + ':' + value for key, value in columns.items()])

workload_list_filter_status_cmd = determine_workload_list_filter_by_status(
args
)
workload_list_filter_job_cmd = determine_workload_list_filter_by_job(args)
command = (
f'kubectl get workloads --ignore-not-found -o=custom-columns="{s}" '
f'{workload_list_filter_status_cmd} {workload_list_filter_job_cmd}'
)
command = 'kubectl get workloads --ignore-not-found -o json'

task = f'List Jobs with filter-by-status={args.filter_by_status}'
if hasattr(args, 'filter_by_job'):
if hasattr(args, 'filter_by_job') and args.filter_by_job:
task += f' with filter-by-job={args.filter_by_job}'

return_code, return_value = run_command_for_value(command, task)
return return_code, return_value
if return_code != 0:
return return_code, return_value

try:
if not return_value.strip():
workloads_json = {'items': []}
else:
workloads_json = json.loads(return_value)
except json.JSONDecodeError:
return 1, f"Failed to parse kubectl output as JSON: {return_value}"

items = workloads_json.get('items', [])

rows = []
headers = [
'Jobset Name',
'Created Time',
'Priority',
'TPU VMs Needed',
'TPU VMs Running/Ran',
'TPU VMs Done',
'Status',
'Status Message',
'Status Time',
]

for item in items:
metadata = item.get('metadata', {})
spec = item.get('spec', {})
status = item.get('status', {})

owner_refs = metadata.get('ownerReferences', [])
jobset_name = owner_refs[0].get('name', '<none>') if owner_refs else '<none>'

if hasattr(args, 'filter_by_job') and args.filter_by_job:
if not re.search(args.filter_by_job, jobset_name):
continue

created_time = metadata.get('creationTimestamp', '<none>')
priority = spec.get('priorityClassName', '<none>')

pod_sets = spec.get('podSets', [])
tpu_vms_needed = str(pod_sets[0].get('count', '<none>')) if pod_sets else '<none>'

admission = status.get('admission', {})
pod_set_assignments = admission.get('podSetAssignments', [])
tpu_vms_running = str(pod_set_assignments[-1].get('count', '<none>')) if pod_set_assignments else '<none>'

reclaimable = status.get('reclaimablePods', [])
tpu_vms_done = str(reclaimable[0].get('count', '<none>')) if reclaimable else '<none>'

conditions = status.get('conditions', [])
if conditions:
latest_cond = sorted(conditions, key=lambda c: c.get('lastTransitionTime', ''))[-1]
cond_type = latest_cond.get('type', '<none>')
cond_msg = latest_cond.get('message', '<none>')
cond_time = latest_cond.get('lastTransitionTime', '<none>')
else:
cond_type = '<none>'
cond_msg = '<none>'
cond_time = '<none>'

keep = False
filter_status = getattr(args, 'filter_by_status', 'EVERYTHING')
if filter_status == 'EVERYTHING':
keep = True
elif filter_status == 'RUNNING':
if cond_type in ('Admitted', 'Evicted') and tpu_vms_running.isdigit() and int(tpu_vms_running) > 0:
keep = True
elif filter_status == 'QUEUED':
if cond_type in ('Admitted', 'Evicted', 'QuotaReserved') and (tpu_vms_running == '<none>' or tpu_vms_running == '0'):
keep = True
elif filter_status == 'FINISHED':
if cond_type == 'Finished':
keep = True
elif filter_status == 'FAILED':
if cond_type == 'Finished' and 'failed' in cond_msg.lower():
keep = True
elif filter_status == 'SUCCESSFUL':
if cond_type == 'Finished' and ('finished' in cond_msg.lower() or 'success' in cond_msg.lower()):
keep = True
else:
return 1, f"Can not find filter type: {filter_status}"

if keep:
rows.append([jobset_name, created_time, priority, tpu_vms_needed, tpu_vms_running, tpu_vms_done, cond_type, cond_msg, cond_time])

all_data = [headers] + rows
col_widths = [max(len(str(item)) for item in col) for col in zip(*all_data)]

lines = []
for row in all_data:
lines.append(' '.join(str(item).ljust(width) for item, width in zip(row, col_widths)))

return 0, '\n'.join(lines)


def check_if_workload_exists(args) -> bool:
Expand Down Expand Up @@ -199,7 +199,7 @@ def wait_for_job_completion(args) -> int:
f'{timeout_val}s' if timeout_val != -1 else 'max timeout (1 week)'
)
wait_cmd = (
"kubectl wait --for jsonpath='.status.conditions[-1].type'=Finished"
'kubectl wait --for=condition=Finished'
f' workload {full_workload_name} --timeout={timeout_val}s'
)
return_code, return_value = run_command_for_value(
Expand Down Expand Up @@ -227,19 +227,38 @@ def wait_for_job_completion(args) -> int:
)
status_cmd = (
f'kubectl get jobset {args.workload} -o'
" jsonpath='{.status.conditions[-1].type}'"
" jsonpath='{.status.conditions[?(@.type==\"Completed\")].status}'"
)
return_code, return_value = run_command_for_value(
status_cmd, 'Get jobset status'
)
if return_code != 0:
xpk_print(f'Get workload status request returned ERROR {return_code}')
return return_code
xpk_print(f'Your workload finished with status: {return_value}')
if return_value != 'Completed':
xpk_print('Your workload did not complete successfully')
return 125
return 0

if return_value == 'True':
xpk_print(f'Your workload finished with Completed status: {return_value}')
return 0

# If not True, check for Failed condition to output a cleaner message
failed_status_cmd = (
f'kubectl get jobset {args.workload} -o'
" jsonpath='{.status.conditions[?(@.type==\"Failed\")].status}'"
)
failed_return_code, failed_return_value = run_command_for_value(
failed_status_cmd, 'Get jobset failed status'
)
if failed_return_code != 0:
xpk_print(f'Get workload failed status request returned ERROR {failed_return_code}')
return failed_return_code

if failed_return_value == 'True':
xpk_print(f'Your workload failed with Failed status: {failed_return_value}')
else:
xpk_print('Your workload finished without a Completed or Failed status')

xpk_print('Your workload did not complete successfully')
return 125


GCP_NAME_FILTER_VALUE_REGEX = re.compile(r'[a-z0-9\-]+')
Expand Down
Loading