Examples

demo.py

A small demo application illustrating the use of the Kubernetes-job API.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
import os
import tempfile
import time
import yaml
import sys
from pathlib import Path
from kubernetes import client, config

from kubernetes_job import JobManager, is_completed
from funcs import add, calc_pi

logger = logging.getLogger()
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)

# retrieve cluster details from environment variables
host_url = os.environ.get("HOST_URL")
cacert = os.environ.get("CACERT")
token = os.environ.get("TOKEN")

configuration = None

if host_url:
    # initialize configuration for token authentication
    # this is the way to go if we're using a service account
    configuration = client.Configuration()
    configuration.api_key["authorization"] = token
    configuration.api_key_prefix['authorization'] = 'Bearer'
    configuration.host = host_url

    # configuration.ssl_ca_cert expects a file containing the certificates,
    # so we generate a temporary file to hold those
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as tf:
        tf.write(cacert)
        configuration.ssl_ca_cert = tf.name

else:
    # try to initialize from $HOME/.kube/config (eg. kubectl config file)
    config.load_kube_config()

# initialize the Kubernetes client
k8s_client = client.ApiClient(configuration=configuration)

# Path to worker configuration
yaml_path = Path(__file__).parent / 'job.yml'

# we're loading the yaml file here;
# we could also supply the path when initializing the JobManager
with Path(yaml_path).open() as f:
    yaml_spec = yaml.safe_load(f)

# initialize the job manager
manager = JobManager(k8s_client=k8s_client, k8s_job_spec=yaml_spec)

# create a new job
new_job = manager.create_job(calc_pi, 100, 1)
logging.info(f"Created job {new_job.metadata.name}")

# list all jobs
for job in manager.list_jobs():
    logging.info(f"Found: {job.metadata.name}")

# get the status of a job
job_status = manager.read_job(new_job)
while not is_completed(job_status):
    logging.info(f"Status: {job_status.status}")
    job_status = manager.read_job(new_job)
    time.sleep(5)

# clean up jobs
manager.clean_jobs()

# delete a job
# manager.delete_job(new_job)

job.yaml

An example of a job spec template.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
apiVersion: batch/v1
kind: Job
metadata:
  # job name; a unique id will be added when launching a new job based on this template
  name: kubernetes-job
spec:
  # The service account can be important if the security is restricted.
  # It may not be necessary if the JobManager is initialized with elevated credentials.
  serviceAccountName: kubernetes-job-service-account

  # Try 1 time to execute this job
  backoffLimit: 1

  # Active deadline (timeout), in a number of seconds.
  activeDeadlineSeconds: 3600

  # Clean up pods and logs after finishing the job
  # N.B. This Kuberetes feature is still in alpha, GKE does not support it yet!
  ttlSecondsAfterFinished: 3600

  template:
    spec:
      containers:
      - name: kubernetes-job

        # Change this image to an image containing your codebase
        image: registry.gitlab.com/roemer/kubernetes-job:latest

        # A startup command is automatically added by Kubernetes-job, so no need to set it here.
        # We may set one, though, to override startup behaviour.
        # Be sure to initialize the job runner then by spawning a `kubernetes-job` process.
        # cmd: kubernetes-job

      restartPolicy: Never

service_account.yaml

An example of the Kubernetes configuration needed to create a service account for job management.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: kubernetes-job-service-account
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: kubernetes-job-service-role
rules:
  - apiGroups:
      - ""
      - "batch"
    resources:
      - jobs
    verbs:
      - get
      - list
      - watch
      - create
      - delete
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: kubernetes-job-service-account
  namespace: default
subjects:
  - kind: ServiceAccount
    name: kubernetes-job-service-account
    namespace: default
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: kubernetes-job-service-role

To execute, run the following command:

kubectl apply -f service_account.yml