Source code for kubernetes_job.job_manager

import logging
import yaml
import datetime
import os
import socket
from pathlib import Path
from typing import Iterator

from kubernetes import client
from kubernetes.utils import create_from_dict
from kubernetes.client.models import V1Job, V1Status

from .job_func_def import JobFuncDef, JobMeta

logger = logging.getLogger(__name__)

K8S_CONTAINER_NAME = 'kubernetes-job'

current_job: [JobFuncDef, None] = None

def _gen_id(name: str, dt: datetime) -> str:
    """Generate a job id from the name and the given datetime"""
    return f"{name or K8S_CONTAINER_NAME}-{dt.strftime('%Y%m%d%H%M%S%f')}"

[docs]def job_name(job: [str, V1Job]) -> str: """Return the name of a job""" return job if isinstance(job, str) else
[docs]def is_failed(job: V1Job): """Return True if the job is failed""" return bool(job.status.failed)
[docs]def is_succeeded(job: V1Job): """Return True if the job has succeeded""" return bool(job.status.succeeded)
[docs]def is_completed(job: V1Job): """Return True if the job has completed (either failed or succeeded)""" return is_failed(job) or is_succeeded(job)
[docs]def is_active(job: V1Job): """Return True if the job is active (running)""" return bool(
[docs]def job_status(job: V1Job) -> str: """Return SUCCEEDED, FAILED, ACTIVE, or PENDING, depending on the status of the job""" if is_failed(job): return 'FAILED' elif is_succeeded(job): return 'SUCCEEDED' elif is_active(job): return 'ACTIVE' else: return 'PENDING'
[docs]class JobManager: """ Kubernetes JobManager :param k8s_client: Kubernetes OpenAPI client :param k8s_job_spec: `dict` or path to YAML file containing the spec for the job worker :param namespace: Kubernetes k8s_namespace (default: 'default') """ k8s_client = None k8s_job_spec = None k8s_namespace = None def __init__(self, k8s_client: client.ApiClient, k8s_job_spec: [dict, str], namespace: str = K8S_DEFAULT_NAMESPACE ): """ Initialize the JobManager :param k8s_client: Kubernetes OpenAPI client :param k8s_job_spec: `dict` or path to YAML file containing the spec for the job worker :param namespace: Kubernetes k8s_namespace """ self.k8s_client = k8s_client self.k8s_job_spec = k8s_job_spec self.k8s_namespace = namespace
[docs] def create_job(self, func, *func_args, **func_kwargs) -> V1Job: """ Create a job :param func: Function pointer :param func_args: Args to submit to the function :param func_kwargs: Kwargs to submit to the function :return: V1Job """ if isinstance(self.k8s_job_spec, dict): yml_document = self.k8s_job_spec else: with Path(self.k8s_job_spec).open() as f: yml_document = yaml.safe_load(f) # verify job spec compatibility if yml_document['apiVersion'] != 'batch/v1': raise ValueError(f"Expected apiVersion 'batch/v1', got '{yml_document['apiVersion']}'") if yml_document['kind'] != 'Job': raise ValueError(f"Expected kind 'Job', got '{yml_document['kind']}'") # add function call as environment variable metadata = yml_document['metadata'] container = yml_document['spec']['template']['spec']['containers'][0] # set name by combining existing name (or default) with a timestamp dt_scheduled = datetime.datetime.utcnow() name = _gen_id(metadata.get('name'), dt_scheduled) metadata['name'] = name # set command; check if explicitly specified cmd = container.get('command') if cmd:"Job spec contains Docker command '{cmd}'; make sure this command calls 'kubernetes-job' to start the job runner.") else: container['command'] = ['kubernetes-job'] # create metadata meta = JobMeta() = name meta.dt_scheduled = dt_scheduled = socket.gethostname() # serialize function call job = JobFuncDef(func, args=func_args, kwargs=func_kwargs, meta=meta) env_vars = container.get('env', []) env_vars.append({ 'name': K8S_ENV_VAR_NAME, 'value': job.dump() }) container['env'] = env_vars # create the job yml = yaml.dump(yml_document)"Creating K8s job '{name}'; calling function {func.__name__}") logger.debug(f"Spec:\n{yml}") api_response = create_from_dict(self.k8s_client, yml_document, namespace=self.k8s_namespace) return api_response[0]
[docs] def delete_job(self, job: [str, V1Job], grace_period_seconds: int = 0, propagation_policy: str = 'Background') -> V1Status: """ Delete a Job :param job: Name or V1Job instance :param grace_period_seconds: (default: 0) :param propagation_policy: (default: 'Background') :return: V1Status """ name = job_name(job) logger.debug(f"Deleting job '{name}") api_instance = client.BatchV1Api(self.k8s_client) api_response = api_instance.delete_namespaced_job(name, self.k8s_namespace, grace_period_seconds=grace_period_seconds, propagation_policy=propagation_policy) return api_response
[docs] def list_jobs(self, field_selector=None, label_selector=None) -> Iterator[V1Job]: """ List job objects :param field_selector: A selector to restrict the list of returned objects by their fields. Defaults to everything. :param label_selector: A selector to restrict the list of returned objects by their labels. Defaults to everything. :return: Iterator of V1Job """ api_instance = client.BatchV1Api(self.k8s_client) paging_token = None has_more = True # retrieve all jobs in this k8s_namespace # this may take several calls to the API while has_more: api_response = api_instance.list_namespaced_job(self.k8s_namespace, _continue=paging_token, field_selector=field_selector, label_selector=label_selector) # yield results for job in api_response.items: yield job # get _continue paging token paging_token = api_response.metadata._continue if api_response.metadata else None has_more = paging_token
[docs] def read_job(self, job: [str, V1Job]) -> V1Job: """ Read the status of the specified Job :param job: Name or V1Job instance :return: V1Job """ name = job_name(job) api_instance = client.BatchV1Api(self.k8s_client) return api_instance.read_namespaced_job(name, namespace=self.k8s_namespace)
[docs] def clean_jobs(self, field_selector=None, label_selector=None): """ Clean up completed jobs :param field_selector: A selector to restrict the list of returned objects by their fields. Defaults to everything. :param label_selector: A selector to restrict the list of returned objects by their labels. Defaults to everything. """ for job in self.list_jobs(field_selector=field_selector, label_selector=label_selector): if is_completed(job): logger.debug(f"Cleanup up job '{job_name(job)}; status: {job_status(job)}") self.delete_job(job)
[docs] @staticmethod def execute_job(job_func_def: str = None): """ Execute the JobFuncDef specified in the func_spec :param job_func_def: Serialized job definition :return: Job function return value (if any) """"Executing default job") global current_job try: func_def = job_func_def or os.getenv(K8S_ENV_VAR_NAME) if not func_def: raise ValueError(f"Argument 'job_func_def' is missing, and environment var '{K8S_ENV_VAR_NAME}' is not set either.") current_job = JobFuncDef.load(func_def) return current_job.execute() except Exception as e: logger.fatal(e) raise