or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-integration.mddecorators.mdexecutors.mdindex.mdjob-management.mdmonitoring.mdpod-operations.mdresource-management.mdspark-integration.md
tile.json

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-cncf-kubernetes@10.7.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-cncf-kubernetes@10.7.0

index.mddocs/

Apache Airflow Providers CNCF Kubernetes

A comprehensive Kubernetes integration provider for Apache Airflow that enables running workflows and tasks on Kubernetes clusters. This provider offers operators for managing pods and jobs, hooks for API interactions, sensors for monitoring workloads, triggers for asynchronous operations, decorators for task creation, and executors for running Airflow tasks as Kubernetes pods.

Package Information

  • Package Name: apache-airflow-providers-cncf-kubernetes
  • Language: Python
  • Installation: pip install apache-airflow-providers-cncf-kubernetes
  • Minimum Airflow Version: 2.10.0+

Core Imports

# Main provider package
import airflow.providers.cncf.kubernetes

# Core operators
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

# Hooks for API interactions
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, AsyncKubernetesHook

# Sensors for monitoring
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor

# Decorators for task creation
from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task

# Executors
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor

Basic Usage

Running a Task in a Kubernetes Pod

from datetime import datetime
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

# Define the DAG
dag = DAG(
    'kubernetes_pod_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Run a simple task in a Kubernetes pod
task = KubernetesPodOperator(
    task_id='hello_kubernetes',
    name='hello-pod',
    namespace='default',
    image='python:3.9-slim',
    cmds=['python', '-c'],
    arguments=['print("Hello from Kubernetes!")'],
    dag=dag
)

Using Kubernetes Task Decorator

from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task

@kubernetes_task(
    image='python:3.9-slim',
    namespace='default'
)
def process_data():
    import pandas as pd
    # Task logic here
    return "Processing complete"

# Use in DAG
result = process_data()

Managing Kubernetes Jobs

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

job_task = KubernetesJobOperator(
    task_id='data_processing_job',
    name='data-job',
    namespace='default',
    image='data-processor:latest',
    cmds=['python', 'process.py'],
    dag=dag
)

Architecture

The provider is organized around several key components:

  • Operators: Execute tasks on Kubernetes (pods, jobs, resources, Spark applications)
  • Hooks: Provide low-level API access to Kubernetes clusters (sync and async)
  • Sensors: Monitor Kubernetes workloads for completion or state changes
  • Triggers: Enable asynchronous monitoring of pods and jobs
  • Decorators: Simplify task creation with Python decorators
  • Executors: Run entire Airflow task execution on Kubernetes infrastructure
  • Utilities: Support pod lifecycle management, resource conversion, and configuration

Capabilities

Pod Operations

Execute individual tasks in Kubernetes pods with full lifecycle management, including creation, monitoring, log retrieval, and cleanup.

class KubernetesPodOperator(BaseOperator):
    def __init__(
        self,
        image: str,
        name: str | None = None,
        namespace: str = "default",
        cmds: list[str] | None = None,
        arguments: list[str] | None = None,
        **kwargs
    ): ...
    
    def execute(self, context: Context) -> Any: ...

Pod Operations

Job Management

Create, monitor, and manage Kubernetes Jobs with support for batch processing, parallel execution, and job lifecycle operations.

class KubernetesJobOperator(KubernetesPodOperator):
    def __init__(
        self,
        name: str,
        image: str,
        namespace: str = "default",
        **kwargs
    ): ...

Job Management

Kubernetes API Integration

Connect to Kubernetes clusters with comprehensive API access, supporting both synchronous and asynchronous operations.

class KubernetesHook(BaseHook):
    def get_conn(self): ...
    def create_custom_object(self, group: str, version: str, plural: str, body: dict, namespace: str | None = None): ...
    def get_pod(self, name: str, namespace: str): ...
    def create_job(self, job: V1Job): ...

API Integration

Resource Management

Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions.

class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
    def __init__(
        self,
        yaml_conf: str | None = None,
        custom_resource_definition: dict | None = None,
        **kwargs
    ): ...

Resource Management

Spark on Kubernetes

Deploy and manage Spark applications on Kubernetes clusters with custom resource definitions and monitoring.

class SparkKubernetesOperator(KubernetesPodOperator):
    def __init__(
        self,
        application_file: str,
        namespace: str = "default",
        **kwargs
    ): ...

Spark Integration

Task Decorators

Create Kubernetes tasks using Python decorators with automatic pod configuration and execution.

def kubernetes_task(
    image: str,
    namespace: str = "default",
    name: str | None = None,
    **kwargs
): ...

def kubernetes_cmd_task(
    image: str,
    cmds: list[str],
    namespace: str = "default",
    **kwargs
): ...

Task Decorators

Monitoring and Sensors

Monitor Kubernetes workloads with sensors that check application status and wait for completion conditions.

class SparkKubernetesSensor(BaseSensorOperator):
    def __init__(
        self,
        application_name: str,
        namespace: str = "default",
        **kwargs
    ): ...

Monitoring

Executors

Run Airflow tasks on Kubernetes infrastructure with the KubernetesExecutor and hybrid LocalKubernetesExecutor.

class KubernetesExecutor(BaseExecutor):
    def start(self): ...
    def sync(self): ...
    def end(self): ...

Executors

Types

# Pod event enumeration
class PodEventType(Enum):
    """Type of Events emitted by kubernetes pod."""
    WARNING = "Warning"
    NORMAL = "Normal"

# Container state enumeration  
class ContainerState(str, Enum):
    """
    Possible container states.
    
    See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
    """
    WAITING = "waiting"
    RUNNING = "running"
    TERMINATED = "terminated"
    FAILED = "failed"
    UNDEFINED = "undefined"

# Execution mode for callbacks
class ExecutionMode(str, Enum):
    """Enum class for execution mode."""
    SYNC = "sync"
    ASYNC = "async"

# Pod phase constants
class PodPhase:
    """Pod phase constants for lifecycle management."""
    PENDING = "Pending"
    RUNNING = "Running"
    SUCCEEDED = "Succeeded"
    FAILED = "Failed"
    UNKNOWN = "Unknown"

# Actions to take when pod finishes
class OnFinishAction(str, Enum):
    """Actions to take when pod finishes execution."""
    DELETE_POD = "delete_pod"
    KEEP_POD = "keep_pod"

Exception Types

# Pod operation exceptions
class PodMutationHookException(AirflowException):
    """Raised when exception happens during Pod Mutation Hook execution."""
    ...

class PodReconciliationError(AirflowException):
    """Raised when an error is encountered while trying to merge pod configs."""
    ...

class PodReattachFailure(AirflowException):
    """When we expect to be able to find a pod but cannot."""
    ...

class PodCredentialsExpiredFailure(AirflowException):
    """When pod fails to refresh credentials."""
    ...

# Pod manager exceptions  
class PodLaunchFailedException(AirflowException):
    """When pod launching fails in KubernetesPodOperator."""
    ...

class PodLaunchTimeoutException(AirflowException):
    """When pod does not leave the Pending phase within specified timeout."""
    ...

class PodNotFoundException(AirflowException):
    """Expected pod does not exist in kube-api."""
    ...

# Resource operation exceptions
class FailToDeleteError(Exception):
    """For handling error if an error occurred when handling a yaml file during deletion of the resource."""
    ...