Skip to content

Implement PodSet inference for JobSet #316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- kubeflow.org
resources:
Expand Down
16 changes: 14 additions & 2 deletions hack/e2e-util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export IMAGE_KUBEFLOW_OPERATOR="docker.io/kubeflow/training-operator:v1-855e096"
export KUBERAY_VERSION=1.1.0
export IMAGE_KUBERAY_OPERATOR="quay.io/kuberay/operator:v1.1.1"

export JOBSET_VERSION=v0.7.3
export IMAGE_JOBSET_OPERATOR="registry.k8s.io/jobset/jobset:v0.7.3"

# These are small images used by the e2e tests.
# Pull and kind load to avoid long delays during testing
export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0"
Expand Down Expand Up @@ -142,7 +145,7 @@ function check_prerequisites {
}

function pull_images {
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR}
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} ${IMAGE_JOBSET_OPERATOR}
do
docker pull $image
if [ $? -ne 0 ]
Expand Down Expand Up @@ -238,7 +241,7 @@ function kind_up_cluster {
}

function kind_load_images {
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR}
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} ${IMAGE_JOBSET_OPERATOR}
do
kind load docker-image ${image} ${CLUSTER_CONTEXT}
if [ $? -ne 0 ]
Expand Down Expand Up @@ -267,6 +270,15 @@ function configure_cluster {
echo -n "." && sleep 1;
done
echo ""

echo "Installing JobSet operator version $JOBSET_VERSION"
kubectl apply --server-side -f "https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml"
echo "Waiting for pods in the jobset namespace to become ready"
while [[ $(kubectl get pods -n jobset-system -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}' | tr ' ' '\n' | sort -u) != "True" ]]
do
echo -n "." && sleep 1;
done
echo ""
}

function wait_for_appwrapper_controller {
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type componentStatusSummary struct {
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/finalizers,verbs=update

// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters
// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters, jobsets

//+kubebuilder:rbac:groups="",resources=pods;services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=deployments;statefulsets,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -89,6 +89,7 @@ type componentStatusSummary struct {
//+kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=podgroups,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ray.io,resources=rayclusters;rayjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete

// Reconcile reconciles an appwrapper
// Please see [aw-states] for documentation of this method.
Expand Down
21 changes: 21 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,27 @@ func InferPodSets(obj *unstructured.Unstructured) ([]workloadv1beta2.AppWrapperP
}
podSets = append(podSets, workloadv1beta2.AppWrapperPodSet{Replicas: ptr.To(replicas), Path: "template.spec.template"})

case schema.GroupVersionKind{Group: "jobset.x-k8s.io", Version: "v1alpha2", Kind: "JobSet"}:
if jobs, err := getValueAtPath(obj.UnstructuredContent(), "template.spec.replicatedJobs"); err == nil {
if jobs, ok := jobs.([]interface{}); ok {
for i := range jobs {
jobSpecPrefix := fmt.Sprintf("template.spec.replicatedJobs[%v].", i)
// validate path to replica template
if _, err := getValueAtPath(obj.UnstructuredContent(), jobSpecPrefix+"template"); err == nil {
var replicas int32 = 1
if parallelism, err := GetReplicas(obj, jobSpecPrefix+"template.spec.parallelism"); err == nil {
replicas = parallelism
}
if completions, err := GetReplicas(obj, jobSpecPrefix+"template.spec.completions"); err == nil && completions < replicas {
replicas = completions
}
// infer replica count
podSets = append(podSets, workloadv1beta2.AppWrapperPodSet{Replicas: ptr.To(replicas), Path: jobSpecPrefix + "template.spec.template"})
}
}
}
}

case schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PyTorchJob"}:
for _, replicaType := range []string{"Master", "Worker"} {
prefix := "template.spec.pytorchReplicaSpecs." + replicaType + "."
Expand Down
47 changes: 47 additions & 0 deletions samples/wrapped-jobset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
apiVersion: workload.codeflare.dev/v1beta2
kind: AppWrapper
metadata:
name: sample-jobset
labels:
kueue.x-k8s.io/queue-name: default-queue
spec:
components:
- template:
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
name: sample-jobset
spec:
replicatedJobs:
- name: workers
template:
spec:
parallelism: 4
completions: 4
backoffLimit: 0
template:
spec:
restartPolicy: Never
containers:
- name: sleep
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 100"]
resources:
requests:
cpu: 100m
- name: driver
template:
spec:
parallelism: 1
completions: 1
backoffLimit: 0
template:
spec:
restartPolicy: Never
containers:
- name: sleep
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 100"]
resources:
requests:
cpu: 100m
11 changes: 10 additions & 1 deletion test/e2e/appwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,16 @@ var _ = Describe("AppWrapper E2E Test", func() {
})
})

// TODO: JobSets (would have to deploy JobSet controller on e2e test cluster)
Describe("Creation of JobSet GVKs", Label("Kueue", "Standalone"), func() {
It("JobSet", func() {
aw := createAppWrapper(ctx, jobset(500))
appwrappers = append(appwrappers, aw)
// TODO: Need dev versions of kueue/jobset to get correct handling of ownership
// Once those are released change the test to:
// Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
Eventually(AppWrapperPhase(ctx, aw), 15*time.Second).Should(Equal(workloadv1beta2.AppWrapperResuming))
})
})

Describe("Webhook Enforces AppWrapper Invariants", Label("Webhook"), func() {
Context("Structural Invariants", func() {
Expand Down
38 changes: 38 additions & 0 deletions test/e2e/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,44 @@ func stuckInitBatchjob(milliCPU int64) workloadv1beta2.AppWrapperComponent {
}
}

const jobsetYAML = `
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
generateName: %v
spec:
replicatedJobs:
- name: driver
template:
spec:
parallelism: 1
completions: 1
backoffLimit: 0
template:
spec:
restartPolicy: Never
containers:
- name: sleep
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 100"]
resources:
requests:
cpu: %v
`

func jobset(milliCPU int64) workloadv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(jobsetYAML,
"jobset-",
resource.NewMilliQuantity(milliCPU, resource.DecimalSI))

jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
return workloadv1beta2.AppWrapperComponent{
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Path: "template.spec.replicatedJobs[0].template.spec.template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
}
}

// This is not a useful PyTorchJob:
// 1. Using a dummy busybox image to avoid pulling a large & rate-limited image from dockerhub
// 2. We avoid needing the injected sidecar (alpine:3.10 from dockerhub) by not specifying a Master
Expand Down