CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

pod-operations.mddocs/

Pod Operations

Execute tasks in Kubernetes pods with comprehensive lifecycle management, monitoring, and resource control. Pod operations provide the foundation for running containerized workloads in Airflow workflows.

Capabilities

Pod Execution

Execute tasks in Kubernetes pods with full control over container specifications, resource limits, and execution parameters.

class KubernetesPodOperator(BaseOperator):
    """
    Execute a task in a Kubernetes Pod.
    
    Args:
        image (str): Docker image to run
        name (str, optional): Name of the pod. Auto-generated if not provided
        namespace (str): Kubernetes namespace. Defaults to 'default'
        cmds (list[str], optional): Container command to execute
        arguments (list[str], optional): Arguments for the container command
        ports (list[V1ContainerPort], optional): Container ports to expose
        volume_mounts (list[V1VolumeMount], optional): Volume mounts for the pod
        volumes (list[V1Volume], optional): Volumes to attach to the pod
        env_vars (list[V1EnvVar], optional): Environment variables
        secrets (list[Secret], optional): Kubernetes secrets to mount
        configmaps (list[str], optional): ConfigMaps to mount as volumes
        labels (dict[str, str], optional): Pod labels
        startup_timeout_seconds (int): Time to wait for pod startup. Default: 120
        get_logs (bool): Whether to retrieve pod logs. Default: True
        log_events_on_failure (bool): Log pod events on failure. Default: False
        is_delete_operator_pod (bool): Delete pod after execution. Default: True
        hostnetwork (bool): Use host networking. Default: False
        dnspolicy (str, optional): DNS policy for the pod
        schedulername (str, optional): Scheduler name
        full_pod_spec (V1Pod, optional): Complete pod specification
        node_selector (dict[str, str], optional): Node selection constraints
        affinity (V1Affinity, optional): Pod affinity rules
        tolerations (list[V1Toleration], optional): Pod tolerations
        security_context (V1SecurityContext, optional): Security context
        image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
        image_pull_secrets (list[V1LocalObjectReference], optional): Image pull secrets
        service_account_name (str, optional): Service account name
        hostpid (bool): Use host PID namespace. Default: False
        hostipc (bool): Use host IPC namespace. Default: False
        priority_class_name (str, optional): Priority class name
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
        do_xcom_push (bool): Push return value to XCom. Default: False
        pod_template_file (str, optional): Path to pod template file
        pod_template_dict (dict, optional): Pod template as dictionary
        random_name_suffix (bool): Add random suffix to pod name. Default: True
        reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
        on_finish_action (str): Action when pod finishes. Default: 'delete_pod'
        is_delete_operator_pod (bool): Delete pod after completion. Default: True
        termination_grace_period (int, optional): Grace period for pod termination
        active_deadline_seconds (int, optional): Pod active deadline
        log_pod_spec_on_failure (bool): Log pod spec on failure. Default: True
        container_logs (bool): Stream container logs. Default: True
        wait_until_job_completion (bool): Wait for job completion. Default: True
    """
    def __init__(
        self,
        *,
        kubernetes_conn_id: str | None = "kubernetes_default",
        namespace: str | None = None,
        image: str | None = None,
        name: str | None = None,
        random_name_suffix: bool = True,
        cmds: list[str] | None = None,
        arguments: list[str] | None = None,
        ports: list[V1ContainerPort] | None = None,
        volume_mounts: list[V1VolumeMount] | None = None,
        volumes: list[V1Volume] | None = None,
        env_vars: list[V1EnvVar] | dict[str, str] | None = None,
        env_from: list[V1EnvFromSource] | None = None,
        secrets: list[Secret] | None = None,
        in_cluster: bool | None = None,
        cluster_context: str | None = None,
        labels: dict | None = None,
        reattach_on_restart: bool = True,
        startup_timeout_seconds: int = 120,
        startup_check_interval_seconds: int = 5,
        schedule_timeout_seconds: int | None = None,
        get_logs: bool = True,
        base_container_name: str | None = None,
        base_container_status_polling_interval: float = 1,
        init_container_logs: str | bool | None = None,
        container_logs: str | bool | None = None,
        image_pull_policy: str | None = None,
        annotations: dict | None = None,
        container_resources: V1ResourceRequirements | None = None,
        affinity: V1Affinity | None = None,
        config_file: str | None = None,
        node_selector: dict | None = None,
        image_pull_secrets: list[V1LocalObjectReference] | None = None,
        service_account_name: str | None = None,
        automount_service_account_token: bool | None = None,
        hostnetwork: bool = False,
        host_aliases: list[V1HostAlias] | None = None,
        tolerations: list[V1Toleration] | None = None,
        security_context: V1PodSecurityContext | dict | None = None,
        container_security_context: V1SecurityContext | dict | None = None,
        dnspolicy: str | None = None,
        dns_config: V1PodDNSConfig | None = None,
        hostname: str | None = None,
        subdomain: str | None = None,
        schedulername: str | None = None,
        full_pod_spec: V1Pod | None = None,
        init_containers: list[V1Container] | None = None,
        log_events_on_failure: bool = False,
        do_xcom_push: bool = False,
        pod_template_file: str | None = None,
        pod_template_dict: dict | None = None,
        priority_class_name: str | None = None,
        pod_runtime_info_envs: list[V1EnvVar] | None = None,
        termination_grace_period: int | None = None,
        configmaps: list[str] | None = None,
        skip_on_exit_code: int | None = None,
        deferrable: bool = False,
        poll_interval: float = 2,
        log_pod_spec_on_failure: bool = True,
        on_finish_action: str = "delete_pod",
        is_delete_operator_pod: bool | None = None,
        termination_message_policy: str = "File",
        active_deadline_seconds: int | None = None,
        callbacks: list | None = None,
        progress_callback: Callable[[str], None] | None = None,
        logging_interval: int | None = None,
        trigger_kwargs: dict | None = None,
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Execute the pod operation."""
        ...

    def execute_sync(self, context: Context) -> Any:
        """Execute pod synchronously."""
        ...
        
    def execute_async(self, context: Context) -> Any:
        """Execute pod asynchronously."""
        ...

    def find_pod(self, context: Context) -> V1Pod | None:
        """Find an existing pod."""
        ...

    def get_or_create_pod(self, context: Context) -> V1Pod:
        """Get existing pod or create a new one."""
        ...

    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> V1Pod:
        """Wait for pod to start running."""
        ...

    def await_pod_completion(self, pod: V1Pod) -> V1Pod:
        """Wait for pod to complete execution."""
        ...

    def extract_xcom(self, pod: V1Pod) -> str:
        """Extract XCom value from pod."""
        ...

    def cleanup(self, pod: V1Pod | None, remote_pod: V1Pod | None) -> None:
        """Clean up pod resources."""
        ...

    def build_pod_request_obj(self, context: Context | None = None) -> V1Pod:
        """Build Kubernetes pod request object."""
        ...

Pod Event Types

Enumeration of Kubernetes pod event types for monitoring pod lifecycle changes.

class PodEventType(Enum):
    """Pod event types for monitoring."""
    ADDED = "ADDED"
    MODIFIED = "MODIFIED" 
    DELETED = "DELETED"

Pod Exceptions

Exception classes for pod operation error handling.

class PodReattachFailure(AirflowException):
    """Raised when pod reattachment fails."""
    ...

class PodCredentialsExpiredFailure(AirflowException):
    """Raised when pod credentials have expired."""
    ...

Usage Examples

Basic Pod Execution

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

# Simple pod execution
simple_task = KubernetesPodOperator(
    task_id='simple_pod',
    name='simple-pod',
    namespace='default',
    image='python:3.9-slim',
    cmds=['python', '-c'],
    arguments=['print("Hello from Kubernetes!")'],
    dag=dag
)

Pod with Volume Mounts

from kubernetes.client import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource

# Pod with persistent volume
volume_task = KubernetesPodOperator(
    task_id='volume_pod',
    name='volume-pod',
    namespace='default',
    image='ubuntu:20.04',
    cmds=['sh', '-c'],
    arguments=['echo "Data processing" && ls -la /data'],
    volumes=[
        V1Volume(
            name='data-volume',
            persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
                claim_name='data-pvc'
            )
        )
    ],
    volume_mounts=[
        V1VolumeMount(
            name='data-volume',
            mount_path='/data'
        )
    ],
    dag=dag
)

Pod with Environment Variables and Secrets

from kubernetes.client import V1EnvVar
from airflow.providers.cncf.kubernetes.secret import Secret

# Pod with secrets and env vars
secure_task = KubernetesPodOperator(
    task_id='secure_pod',
    name='secure-pod',
    namespace='default',
    image='myapp:latest',
    env_vars=[
        V1EnvVar(name='ENV', value='production'),
        V1EnvVar(name='DEBUG', value='false')
    ],
    secrets=[
        Secret('env', 'DB_PASSWORD', 'db-secret', 'password'),
        Secret('volume', '/etc/certs', 'tls-secret')
    ],
    dag=dag
)

Pod with Resource Limits

from kubernetes.client import V1ResourceRequirements

# Pod with resource constraints
resource_task = KubernetesPodOperator(
    task_id='resource_pod',
    name='resource-pod',
    namespace='default',
    image='cpu-intensive:latest',
    container_resources=V1ResourceRequirements(
        requests={'cpu': '100m', 'memory': '128Mi'},
        limits={'cpu': '500m', 'memory': '512Mi'}
    ),
    dag=dag
)

Pod with Node Selection

from kubernetes.client import V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement

# Pod with node affinity
affinity_task = KubernetesPodOperator(
    task_id='affinity_pod',
    name='affinity-pod',
    namespace='default',
    image='workload:latest',
    node_selector={'disktype': 'ssd'},
    affinity=V1Affinity(
        node_affinity=V1NodeAffinity(
            required_during_scheduling_ignored_during_execution=V1NodeSelector(
                node_selector_terms=[
                    V1NodeSelectorTerm(
                        match_expressions=[
                            V1NodeSelectorRequirement(
                                key='kubernetes.io/arch',
                                operator='In',
                                values=['amd64']
                            )
                        ]
                    )
                ]
            )
        )
    ),
    dag=dag
)

Asynchronous Pod Execution

# Async pod execution with deferral
async_task = KubernetesPodOperator(
    task_id='async_pod',
    name='async-pod',
    namespace='default',
    image='long-running:latest',
    deferrable=True,  # Enable async execution
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json