CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

job-management.mddocs/

Job Management

Create, monitor, and manage Kubernetes Jobs for batch processing, parallel execution, and complex workload orchestration. Jobs provide reliable execution with automatic retry, parallel processing, and completion tracking.

Capabilities

Job Execution

Execute batch workloads as Kubernetes Jobs with support for parallel processing, completion tracking, and automatic cleanup.

class KubernetesJobOperator(KubernetesPodOperator):
    """
    Executes a Kubernetes Job.
    
    Args:
        name (str): Name of the job
        image (str): Docker image to run
        namespace (str): Kubernetes namespace. Defaults to 'default'
        parallelism (int, optional): Number of parallel pods
        completions (int, optional): Number of successful completions needed
        backoff_limit (int, optional): Number of retries before marking job failed
        ttl_seconds_after_finished (int, optional): TTL for cleanup after completion
        active_deadline_seconds (int, optional): Maximum duration for job execution
        suspend (bool): Whether to suspend the job. Default: False
        manual_selector (bool): Manually manage pod selector. Default: False
        completion_mode (str, optional): Completion mode ('NonIndexed' or 'Indexed')
        pod_failure_policy (dict, optional): Pod failure policy configuration
        **kwargs: Additional arguments passed to KubernetesPodOperator
    """
    def __init__(
        self,
        name: str,
        image: str,
        namespace: str = "default",
        parallelism: int | None = None,
        completions: int | None = None,
        backoff_limit: int | None = None,
        ttl_seconds_after_finished: int | None = None,
        active_deadline_seconds: int | None = None,
        suspend: bool = False,
        manual_selector: bool = False,
        completion_mode: str | None = None,
        pod_failure_policy: dict | None = None,
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Execute the job."""
        ...

    def build_job_request_obj(self, context: Context) -> V1Job:
        """Build Kubernetes job request object."""
        ...

    def create_job_request_obj(self, context: Context) -> V1Job:
        """Create job from pod template."""
        ...

Job Deletion

Delete existing Kubernetes Jobs with optional cleanup of associated pods and resources.

class KubernetesDeleteJobOperator(BaseOperator):
    """
    Delete a Kubernetes Job.
    
    Args:
        name (str): Name of the job to delete
        namespace (str): Kubernetes namespace. Defaults to 'default'
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
        delete_on_success (bool): Delete job on successful completion. Default: True
        delete_on_failure (bool): Delete job on failure. Default: False
        propagation_policy (str): Deletion propagation policy. Default: 'Background'
    """
    def __init__(
        self,
        name: str,
        namespace: str = "default",
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool | None = None,
        delete_on_success: bool = True,
        delete_on_failure: bool = False,
        propagation_policy: str = "Background",
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Delete the job."""
        ...

Job Patching

Update existing Kubernetes Jobs with new specifications or configuration changes.

class KubernetesPatchJobOperator(BaseOperator):
    """
    Patch a Kubernetes Job.
    
    Args:
        name (str): Name of the job to patch
        namespace (str): Kubernetes namespace. Defaults to 'default'
        body (dict): Patch body as dictionary
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
    """
    def __init__(
        self,
        name: str,
        namespace: str,
        body: dict,
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool | None = None,
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Patch the job."""
        ...

Job Triggers

Asynchronous monitoring of Kubernetes Job completion status with triggers for deferrable execution.

class KubernetesJobTrigger(BaseTrigger):
    """
    Trigger for monitoring Kubernetes Job completion.
    
    Args:
        job_name (str): Name of the job to monitor
        job_namespace (str): Namespace of the job
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool): Use in-cluster configuration. Default: True
        get_logs (bool): Retrieve job logs. Default: True
        startup_timeout (int): Startup timeout in seconds. Default: 120
    """
    def __init__(
        self,
        job_name: str,
        job_namespace: str,
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool = True,
        get_logs: bool = True,
        startup_timeout: int = 120,
        **kwargs
    ): ...

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Monitor job execution asynchronously."""
        ...

Usage Examples

Basic Job Execution

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

# Simple batch job
batch_job = KubernetesJobOperator(
    task_id='batch_processing',
    name='data-processor',
    namespace='default',
    image='batch-processor:latest',
    cmds=['python', 'process_batch.py'],
    completions=1,
    parallelism=1,
    dag=dag
)

Parallel Job Processing

# Parallel processing job
parallel_job = KubernetesJobOperator(
    task_id='parallel_processing',
    name='parallel-processor',
    namespace='default',
    image='parallel-processor:latest',
    cmds=['python', 'parallel_process.py'],
    completions=10,  # Need 10 successful completions
    parallelism=3,   # Run 3 pods in parallel
    backoff_limit=2, # Allow 2 retries per pod
    dag=dag
)

Job with Cleanup Configuration

# Job with automatic cleanup
cleanup_job = KubernetesJobOperator(
    task_id='cleanup_job',
    name='cleanup-processor',
    namespace='default',
    image='cleanup:latest',
    ttl_seconds_after_finished=3600,  # Clean up after 1 hour
    active_deadline_seconds=1800,     # Max 30 minutes execution
    dag=dag
)

Job with Failure Policy

# Job with custom failure handling
failure_policy_job = KubernetesJobOperator(
    task_id='resilient_job',
    name='resilient-processor',
    namespace='default',
    image='resilient:latest',
    backoff_limit=5,
    pod_failure_policy={
        'rules': [
            {
                'action': 'FailJob',
                'onExitCodes': {
                    'containerName': 'main',
                    'operator': 'In',
                    'values': [1, 2, 3]
                }
            },
            {
                'action': 'Ignore',
                'onPodConditions': [
                    {
                        'type': 'DisruptionTarget'
                    }
                ]
            }
        ]
    },
    dag=dag
)

Job Deletion Task

from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator

# Clean up completed job
delete_job = KubernetesDeleteJobOperator(
    task_id='delete_completed_job',
    name='data-processor',
    namespace='default',
    delete_on_success=True,
    propagation_policy='Foreground',  # Wait for pods to be deleted
    dag=dag
)

Job Status Monitoring

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

# Job with monitoring
monitored_job = KubernetesJobOperator(
    task_id='monitored_job',
    name='monitored-processor',
    namespace='default',
    image='processor:latest',
    deferrable=True,  # Use async monitoring
    get_logs=True,
    log_events_on_failure=True,
    dag=dag
)

Indexed Job Processing

# Indexed job for array processing
indexed_job = KubernetesJobOperator(
    task_id='indexed_processing',
    name='indexed-processor',
    namespace='default',
    image='array-processor:latest',
    cmds=['python', 'process_index.py'],
    completions=10,
    parallelism=3,
    completion_mode='Indexed',  # Each pod gets a completion index
    env_vars=[
        V1EnvVar(
            name='JOB_COMPLETION_INDEX',
            value_from=V1EnvVarSource(
                field_ref=V1ObjectFieldSelector(
                    field_path='metadata.annotations["batch.kubernetes.io/job-completion-index"]'
                )
            )
        )
    ],
    dag=dag
)

Job Patching Example

from airflow.providers.cncf.kubernetes.operators.job import KubernetesPatchJobOperator

# Suspend a running job
patch_job = KubernetesPatchJobOperator(
    task_id='suspend_job',
    name='long-running-job',
    namespace='default',
    body={
        'spec': {
            'suspend': True
        }
    },
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json