Job Management

Job is the fundamental object of high performance workload; this document provides the definition of Job in Volcano.

The definition of Job follow Kuberentes’s style, e.g. Status, Spec; the follow sections will only describe the major functions of Job.

Multiple Pod Template

As most jobs of high performance workload include different type of tasks, e.g. TensorFlow (ps/worker), Spark (driver/executor); Job introduces taskSpecs to support multiple pod template, defined as follow. The Policies will describe in Error Handling section.

// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
    ...

    // Tasks specifies the task specification of Job
    // +optional
    Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
}

// TaskSpec specifies the task specification of Job
type TaskSpec struct {
    // Name specifies the name of task
    Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`

    // Replicas specifies the replicas of this TaskSpec in Job
    Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`

    // Specifies the pod that will be created for this TaskSpec
    // when executing a Job
    Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`

    // Specifies the lifecycle of tasks
    // +optional
    Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}

JobController will create Pods based on the templates and replicas in spec.tasks; the controlled OwnerReference of Pod will be set to the Job. The following is an example YAML with multiple pod template.

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: tf-job
spec:
  tasks:
  - name: "ps"
    replicas: 2
    template:
      spec:
        containers:
        - name: ps
          image: ps-img
  - name: "worker"
    replicas: 5
    template:
      spec: 
        containers:
        - name: worker
          image: worker-img

Job Input/Output

Most of high performance workload will handle data which is considering as input/output of a Job. The following types are introduced for Job’s input/output.

type VolumeSpec struct {
	MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`

	// defined the PVC name
	// + optional
	VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`

	// VolumeClaim defines the PVC used by the VolumeSpec.
	// + optional
	VolumeClaim *PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,3,opt,name=claim"`
}

type JobSpec struct{
    ...

    // The volumes mount on Job
    // +optional
    Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,1,opt,name=volumes"`
}

The Volumes of Job can be nil which means user will manage data themselves. If *VolumeSpec.volumeClaim is nil and *VolumeSpec.volumeClaimName is nil or not exist in PersistentVolumeClaim´╝îemptyDir volume will be used for each Task/Pod.

Conditions and Phases

The following phases are introduced to give a simple, high-level summary of where the Job is in its lifecycle; and the conditions array, the reason and message field contain more detail about the job’s status.

type JobPhase string

const (
    // Pending is the phase that job is pending in the queue, waiting for scheduling decision
    Pending JobPhase = "Pending"
    // Aborting is the phase that job is aborted, waiting for releasing pods
    Aborting JobPhase = "Aborting"
    // Aborted is the phase that job is aborted by user or error handling
    Aborted JobPhase = "Aborted"
    // Running is the phase that minimal available tasks of Job are running
    Running JobPhase = "Running"
    // Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
    Restarting JobPhase = "Restarting"
    // Completed is the phase that all tasks of Job are completed successfully
    Completed JobPhase = "Completed"
    // Terminating is the phase that the Job is terminated, waiting for releasing pods
    Terminating JobPhase = "Terminating"
    // Teriminated is the phase that the job is finished unexpected, e.g. events
    Teriminated JobPhase = "Terminated"
)

// JobState contains details for the current state of the job.
type JobState struct {
    // The phase of Job.
    // +optional
    Phase JobPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`

    // Unique, one-word, CamelCase reason for the phase's last transition.
    // +optional
    Reason string `json:"reason,omitempty" protobuf:"bytes,2,opt,name=reason"`

    // Human-readable message indicating details about last transition.
    // +optional
    Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
}

// JobStatus represents the current state of a Job
type JobStatus struct {
    // Current state of Job.
    State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`

    ......
}

The following table shows available transactions between different phases. The phase can not transfer to the target phase if the cell is empty.

From \ To Pending Aborted Running Completed Terminated
Pending * * *
Aborted * *
Running * * * *
Completed *
Terminated *

Restarting, Aborting and Terminating are temporary states to avoid race condition, e.g. there’ll be several PodeEvictedEvents because of TerminateJobAction which should not be handled again.

Error Handling

After Job was created in system, there’ll be several events related to the Job, e.g. Pod succeeded, Pod failed; and some events are critical to the Job, e.g. Pod of MPIJob failed. So LifecyclePolicy is introduced to handle different events based on user’s configuration.

// Event is the type of Event related to the Job
type Event string

const (
    // AllEvents means all event
    AllEvents             Event = "*"
    // PodFailedEvent is triggered if Pod was failed
    PodFailedEvent        Event = "PodFailed"
    // PodEvictedEvent is triggered if Pod was deleted
    PodEvictedEvent       Event = "PodEvicted"
    // These below are several events can lead to job 'Unknown'
    // 1. Task Unschedulable, this is triggered when part of
    //    pods can't be scheduled while some are already running in gang-scheduling case.
    JobUnknownEvent Event = "Unknown"

    // OutOfSyncEvent is triggered if Pod/Job were updated
    OutOfSyncEvent Event = "OutOfSync"
    // CommandIssuedEvent is triggered if a command is raised by user
    CommandIssuedEvent Event = "CommandIssued"
    // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
    TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the type of event handling 
type Action string

const (
    // AbortJobAction if this action is set, the whole job will be aborted:
    // all Pod of Job will be evicted, and no Pod will be recreated
    AbortJobAction Action = "AbortJob"
    // RestartJobAction if this action is set, the whole job will be restarted
    RestartJobAction Action = "RestartJob"
    // TerminateJobAction if this action is set, the whole job wil be terminated
    // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
    TerminateJobAction Action = "TerminateJob"
    // CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
    CompleteJobAction Action = "CompleteJob"

    // ResumeJobAction is the action to resume an aborted job.
    ResumeJobAction Action = "ResumeJob"
    // SyncJobAction is the action to sync Job/Pod status.
    SyncJobAction Action = "SyncJob"
)

// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
    Event  Event  `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
    Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
    Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}

Both JobSpec and TaskSpec include lifecycle policy: the policies in JobSpec are the default policy if no policies in TaskSpec; the policies in TaskSpec will overwrite defaults.

// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
    ... 

    // Specifies the default lifecycle of tasks
    // +optional
    Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`

    // Tasks specifies the task specification of Job
    // +optional
    Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,6,opt,name=tasks"`
}

// TaskSpec specifies the task specification of Job
type TaskSpec struct {
    ...

    // Specifies the lifecycle of tasks
    // +optional
    Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}

The following examples demonstrate the usage of LifecyclePolicy for job and task.

For the training job of machine learning framework, the whole job should be restarted if any task was failed or evicted. To simplify the configuration, a job level LifecyclePolicy is set as follows. As no LifecyclePolicy is set for any task, all tasks will use the policies in spec.policies.

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: tf-job
spec:
  # If any event here, restart the whole job.
  policies:
  - event: *
    action: RestartJob
  tasks:
  - name: "ps"
    replicas: 1
    template:
      spec:
        containers:
        - name: ps
          image: ps-img
  - name: "worker"
    replicas: 5
    template:  
      spec: 
        containers:
        - name: worker
          image: worker-img
  ...

Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restarted if ‘driver’ tasks failed and only restart the task if ‘executor’ tasks failed. OnFailure restartPolicy is set for executor and RestartJob is set for driver spec.tasks.policies as follow.

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: spark-job
spec:
  tasks:
  - name: "driver"
    replicas: 1
    policies:
    - event: *
      action: RestartJob
    template:
      spec:
        containers:
        - name: driver
          image: driver-img
  - name: "executor"
    replicas: 5
    template:  
      spec: 
        containers:
        - name: executor
          image: executor-img
        restartPolicy: OnFailure

Features Interaction

Admission Controller

The following validations must be included to make sure expected behaviours:

  • spec.minAvailable <= sum(spec.taskSpecs.replicas)
  • no duplicated name in spec.taskSpecs array
  • no duplicated event handler in LifecyclePolicy array, both job policies and task policies

CoScheduling

CoScheduling (or Gang-scheduling) is required by most of high performance workload, e.g. TF training job, MPI job. The spec.minAvailable is used to identify how many pods will be scheduled together. The default value of spec.minAvailable is summary of spec.tasks.replicas. The admission controller web hook will check spec.minAvailable against the summary of spec.tasks.replicas; the job creation will be rejected if spec.minAvailable > sum(spec.tasks.replicas). If spec.minAvailable < sum(spec.tasks.replicas), the pod of spec.tasks will be created randomly; refer to Task Priority with Job section on how to create tasks in order.

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: tf-job
spec:
  # minAvailable to run job
  minAvailable: 6
  tasks:
  - name: "ps"
    replicas: 1
    template:
      spec:
        containers:
        - name: "ps"
          image: "ps-img"
  - name: "worker"
    replicas: 5
    template:
      spec: 
        containers:
        - name: "worker"
          image: "worker-img"

Task Priority within Job

In addition to multiple pod template, the priority of each task maybe different. PriorityClass of PodTemplate is reused to define the priority of task within a job. This’s an example to run spark job: 1 driver with 5 executors, the driver’s priority is master-pri which is higher than normal pods; as spec.minAvailable is 3, the scheduler will make sure one driver with 2 executors will be scheduled if not enough resources.

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: spark-job
spec:
  minAvailable: 3
  tasks:
  - name: "driver"
    replicas: 1
    template:
      spec:
        priorityClass: "master-pri"
        containers:
        - name: driver
          image: driver-img
  - name: "executor"
    replicas: 5
    template:
      spec: 
        containers:
        - name: executor
          image: executor-img

NOTE: although scheduler will make sure high priority pods with job will be scheduled firstly, there’s still a race condition between different kubelets that low priority pod maybe launched early; the job/task dependency will be introduced later to handle such kind of race condition.

Resource sharing between Job

By default, the spec.minAvailable is set to the summary of spec.tasks.replicas; if it’s set to a smaller value, the pod beyond spec.minAvailable will share resource between jobs.

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: spark-job
spec:
  minAvailable: 3
  tasks:
  - name: "driver"
    replicas: 1
    template:
      spec:
        priorityClass: "master-pri"
        containers:
        - name: driver
          image: driver-img
  - name: "executor"
    replicas: 5
    template:
      spec: 
        containers:
        - name: executor
          image: executor-img

Plugins for Job

As many jobs of AI frame, e.g. TensorFlow, MPI, Mxnet, need set env, pods communicate, ssh sign in without password. We provide Job api plugins to give users a better focus on core business. Now we have three plugins, every plugin has parameters, if not provided, we use default.

  • env: set VK_TASK_INDEX to each container, is a index for giving the identity to container.
  • svc: create Serivce and *.host to enable pods communicate.
  • ssh: sign in ssh without password, e.g. use command mpirun or mpiexec.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: mpi-job
spec:
  minAvailable: 2
  schedulerName: scheduler
  policies:
  - event: PodEvicted
    action: RestartJob
  plugins:
    ssh: []
    env: []
    svc: []
  tasks:
  - replicas: 1
    name: mpimaster
    template:
      spec:
        containers:
          image: mpi-image
          name: mpimaster
  - replicas: 2
    name: mpiworker
    template: 
      spec:
        containers:
          image: mpi-image
          name: mpiworker