Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
npx @tessl/cli install tessl/pypi-apache-airflow-providers-cncf-kubernetes@10.7.0A comprehensive Kubernetes integration provider for Apache Airflow that enables running workflows and tasks on Kubernetes clusters. This provider offers operators for managing pods and jobs, hooks for API interactions, sensors for monitoring workloads, triggers for asynchronous operations, decorators for task creation, and executors for running Airflow tasks as Kubernetes pods.
pip install apache-airflow-providers-cncf-kubernetes# Main provider package
import airflow.providers.cncf.kubernetes
# Core operators
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
# Hooks for API interactions
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, AsyncKubernetesHook
# Sensors for monitoring
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
# Decorators for task creation
from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task
# Executors
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutorfrom datetime import datetime
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
# Define the DAG
dag = DAG(
'kubernetes_pod_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Run a simple task in a Kubernetes pod
task = KubernetesPodOperator(
task_id='hello_kubernetes',
name='hello-pod',
namespace='default',
image='python:3.9-slim',
cmds=['python', '-c'],
arguments=['print("Hello from Kubernetes!")'],
dag=dag
)from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
@kubernetes_task(
image='python:3.9-slim',
namespace='default'
)
def process_data():
import pandas as pd
# Task logic here
return "Processing complete"
# Use in DAG
result = process_data()from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
job_task = KubernetesJobOperator(
task_id='data_processing_job',
name='data-job',
namespace='default',
image='data-processor:latest',
cmds=['python', 'process.py'],
dag=dag
)The provider is organized around several key components:
Execute individual tasks in Kubernetes pods with full lifecycle management, including creation, monitoring, log retrieval, and cleanup.
class KubernetesPodOperator(BaseOperator):
def __init__(
self,
image: str,
name: str | None = None,
namespace: str = "default",
cmds: list[str] | None = None,
arguments: list[str] | None = None,
**kwargs
): ...
def execute(self, context: Context) -> Any: ...Create, monitor, and manage Kubernetes Jobs with support for batch processing, parallel execution, and job lifecycle operations.
class KubernetesJobOperator(KubernetesPodOperator):
def __init__(
self,
name: str,
image: str,
namespace: str = "default",
**kwargs
): ...Connect to Kubernetes clusters with comprehensive API access, supporting both synchronous and asynchronous operations.
class KubernetesHook(BaseHook):
def get_conn(self): ...
def create_custom_object(self, group: str, version: str, plural: str, body: dict, namespace: str | None = None): ...
def get_pod(self, name: str, namespace: str): ...
def create_job(self, job: V1Job): ...Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions.
class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
def __init__(
self,
yaml_conf: str | None = None,
custom_resource_definition: dict | None = None,
**kwargs
): ...Deploy and manage Spark applications on Kubernetes clusters with custom resource definitions and monitoring.
class SparkKubernetesOperator(KubernetesPodOperator):
def __init__(
self,
application_file: str,
namespace: str = "default",
**kwargs
): ...Create Kubernetes tasks using Python decorators with automatic pod configuration and execution.
def kubernetes_task(
image: str,
namespace: str = "default",
name: str | None = None,
**kwargs
): ...
def kubernetes_cmd_task(
image: str,
cmds: list[str],
namespace: str = "default",
**kwargs
): ...Monitor Kubernetes workloads with sensors that check application status and wait for completion conditions.
class SparkKubernetesSensor(BaseSensorOperator):
def __init__(
self,
application_name: str,
namespace: str = "default",
**kwargs
): ...Run Airflow tasks on Kubernetes infrastructure with the KubernetesExecutor and hybrid LocalKubernetesExecutor.
class KubernetesExecutor(BaseExecutor):
def start(self): ...
def sync(self): ...
def end(self): ...# Pod event enumeration
class PodEventType(Enum):
"""Type of Events emitted by kubernetes pod."""
WARNING = "Warning"
NORMAL = "Normal"
# Container state enumeration
class ContainerState(str, Enum):
"""
Possible container states.
See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
"""
WAITING = "waiting"
RUNNING = "running"
TERMINATED = "terminated"
FAILED = "failed"
UNDEFINED = "undefined"
# Execution mode for callbacks
class ExecutionMode(str, Enum):
"""Enum class for execution mode."""
SYNC = "sync"
ASYNC = "async"
# Pod phase constants
class PodPhase:
"""Pod phase constants for lifecycle management."""
PENDING = "Pending"
RUNNING = "Running"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
UNKNOWN = "Unknown"
# Actions to take when pod finishes
class OnFinishAction(str, Enum):
"""Actions to take when pod finishes execution."""
DELETE_POD = "delete_pod"
KEEP_POD = "keep_pod"# Pod operation exceptions
class PodMutationHookException(AirflowException):
"""Raised when exception happens during Pod Mutation Hook execution."""
...
class PodReconciliationError(AirflowException):
"""Raised when an error is encountered while trying to merge pod configs."""
...
class PodReattachFailure(AirflowException):
"""When we expect to be able to find a pod but cannot."""
...
class PodCredentialsExpiredFailure(AirflowException):
"""When pod fails to refresh credentials."""
...
# Pod manager exceptions
class PodLaunchFailedException(AirflowException):
"""When pod launching fails in KubernetesPodOperator."""
...
class PodLaunchTimeoutException(AirflowException):
"""When pod does not leave the Pending phase within specified timeout."""
...
class PodNotFoundException(AirflowException):
"""Expected pod does not exist in kube-api."""
...
# Resource operation exceptions
class FailToDeleteError(Exception):
"""For handling error if an error occurred when handling a yaml file during deletion of the resource."""
...