CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

monitoring.mddocs/

Monitoring

Monitor Kubernetes workloads with sensors that check application status and wait for completion conditions. Monitoring capabilities include Spark application tracking and custom resource status checking.

Capabilities

Spark Kubernetes Sensor

Monitor Spark applications running on Kubernetes clusters and wait for specific status conditions or completion.

class SparkKubernetesSensor(BaseSensorOperator):
    """
    Checks sparkApplication object in kubernetes cluster.
    
    Args:
        application_name (str): Name of the Spark application to monitor
        namespace (str): Kubernetes namespace. Defaults to 'default'
        kubernetes_conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
        attach_log (bool): Attach application logs to sensor output. Default: False
        api_group (str): API group for Spark operator. Default: 'sparkoperator.k8s.io'
        api_version (str): API version for Spark operator. Default: 'v1beta2'
        poke_interval (int): Interval between checks in seconds. Default: 60
        timeout (int): Timeout in seconds. Default: 60*60*24*7 (7 days)
        mode (str): Sensor mode ('poke' or 'reschedule'). Default: 'poke'
        exponential_backoff (bool): Use exponential backoff. Default: False
        max_wait (int): Maximum wait time between pokes. Default: 60
        soft_fail (bool): Mark task as SKIPPED on failure. Default: False
    """
    def __init__(
        self,
        application_name: str,
        namespace: str = "default",
        kubernetes_conn_id: str = "kubernetes_default",
        attach_log: bool = False,
        api_group: str = "sparkoperator.k8s.io",
        api_version: str = "v1beta2",
        poke_interval: int = 60,
        timeout: int = 60 * 60 * 24 * 7,
        mode: str = "poke",
        exponential_backoff: bool = False,
        max_wait: int = 60,
        soft_fail: bool = False,
        **kwargs
    ): ...

    def poke(self, context: Context) -> bool:
        """
        Check Spark application status.
        
        Returns:
            bool: True if application has completed successfully, False otherwise
        """
        ...

    def get_hook(self) -> KubernetesHook:
        """
        Get Kubernetes hook for cluster access.
        
        Returns:
            KubernetesHook: Configured Kubernetes hook
        """
        ...

Pod Triggers

Asynchronous monitoring of Kubernetes Pod completion with triggers for deferrable execution.

class KubernetesPodTrigger(BaseTrigger):
    """
    Trigger for monitoring Kubernetes Pod completion.
    
    Args:
        pod_name (str): Name of the pod to monitor
        pod_namespace (str): Namespace of the pod
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool): Use in-cluster configuration. Default: True
        get_logs (bool): Retrieve pod logs. Default: True
        startup_timeout (int): Startup timeout in seconds. Default: 120
        delete_on_finish (bool): Delete pod when finished. Default: True
        poll_interval (int): Polling interval in seconds. Default: 10
        logging_level (str): Logging level. Default: 'INFO'
    """
    def __init__(
        self,
        pod_name: str,
        pod_namespace: str,
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool = True,
        get_logs: bool = True,
        startup_timeout: int = 120,
        delete_on_finish: bool = True,
        poll_interval: int = 10,
        logging_level: str = "INFO",
        **kwargs
    ): ...

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """
        Monitor pod execution asynchronously.
        
        Yields:
            TriggerEvent: Events indicating pod status changes
        """
        ...

Job Triggers

Asynchronous monitoring of Kubernetes Job completion status with triggers for deferrable execution.

class KubernetesJobTrigger(BaseTrigger):
    """
    Trigger for monitoring Kubernetes Job completion.
    
    Args:
        job_name (str): Name of the job to monitor
        job_namespace (str): Namespace of the job  
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool): Use in-cluster configuration. Default: True
        get_logs (bool): Retrieve job logs. Default: True
        startup_timeout (int): Startup timeout in seconds. Default: 120
        poll_interval (int): Polling interval in seconds. Default: 10
        logging_level (str): Logging level. Default: 'INFO'
    """
    def __init__(
        self,
        job_name: str,
        job_namespace: str,
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool = True,
        get_logs: bool = True,
        startup_timeout: int = 120,
        poll_interval: int = 10,
        logging_level: str = "INFO",
        **kwargs
    ): ...

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """
        Monitor job execution asynchronously.
        
        Yields:
            TriggerEvent: Events indicating job status changes
        """
        ...

Container State Enumeration

Enumeration of possible container states for monitoring and status checking.

class ContainerState(Enum):
    """Container state enumeration for monitoring."""
    WAITING = "waiting"
    RUNNING = "running" 
    TERMINATED = "terminated"

Usage Examples

Basic Spark Application Monitoring

from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor

# Monitor Spark application completion
spark_monitor = SparkKubernetesSensor(
    task_id='wait_for_spark_completion',
    application_name='data-processing-job',
    namespace='spark',
    kubernetes_conn_id='kubernetes_default',
    poke_interval=30,
    timeout=3600,  # 1 hour timeout
    dag=dag
)

Spark Monitoring with Log Attachment

# Monitor with detailed logging
spark_monitor_with_logs = SparkKubernetesSensor(
    task_id='monitor_spark_with_logs',
    application_name='ml-training-job',
    namespace='ml',
    attach_log=True,  # Attach Spark application logs
    poke_interval=60,
    timeout=7200,  # 2 hours for training job
    mode='poke',
    dag=dag
)

Custom Spark Operator Monitoring

# Monitor custom Spark operator
custom_spark_monitor = SparkKubernetesSensor(
    task_id='monitor_custom_spark',
    application_name='custom-analytics-job',
    namespace='analytics',
    api_group='analytics.company.com',  # Custom API group
    api_version='v1alpha1',             # Custom API version
    poke_interval=45,
    timeout=5400,  # 90 minutes
    dag=dag
)

Exponential Backoff Monitoring

# Monitor with exponential backoff to reduce API calls
efficient_monitor = SparkKubernetesSensor(
    task_id='efficient_spark_monitor',
    application_name='batch-processing-job',
    namespace='batch',
    poke_interval=10,        # Start with 10 seconds
    exponential_backoff=True,
    max_wait=300,           # Max 5 minutes between checks
    timeout=10800,          # 3 hours total
    dag=dag
)

Asynchronous Pod Monitoring

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

# Pod with async monitoring using triggers
async_pod = KubernetesPodOperator(
    task_id='async_pod_task',
    name='long-running-pod',
    namespace='default',
    image='long-runner:latest',
    deferrable=True,  # Enable async monitoring
    startup_timeout_seconds=300,
    dag=dag
)

Job Monitoring with Custom Polling

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

# Job with custom monitoring configuration
monitored_job = KubernetesJobOperator(
    task_id='monitored_batch_job',
    name='batch-processor',
    namespace='batch',
    image='batch-processor:latest',
    deferrable=True,
    get_logs=True,
    startup_timeout_seconds=180,
    dag=dag
)

Multi-Application Monitoring Pipeline

from airflow import DAG
from datetime import datetime

dag = DAG(
    'spark_monitoring_pipeline',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

# Launch multiple Spark applications
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

# Data extraction job
extract_job = SparkKubernetesOperator(
    task_id='launch_extract_job',
    application_file='/configs/extract-spark.yaml',
    namespace='spark',
    dag=dag
)

# Data transformation job  
transform_job = SparkKubernetesOperator(
    task_id='launch_transform_job',
    application_file='/configs/transform-spark.yaml',
    namespace='spark',
    dag=dag
)

# Model training job
training_job = SparkKubernetesOperator(
    task_id='launch_training_job',
    application_file='/configs/training-spark.yaml',
    namespace='spark',
    dag=dag
)

# Monitor each application
extract_monitor = SparkKubernetesSensor(
    task_id='monitor_extract',
    application_name='data-extract-job',
    namespace='spark',
    poke_interval=30,
    timeout=1800,  # 30 minutes
    dag=dag
)

transform_monitor = SparkKubernetesSensor(
    task_id='monitor_transform',
    application_name='data-transform-job',
    namespace='spark',
    poke_interval=45,
    timeout=3600,  # 1 hour
    dag=dag
)

training_monitor = SparkKubernetesSensor(
    task_id='monitor_training',
    application_name='model-training-job',
    namespace='spark',
    attach_log=True,  # Get training logs
    poke_interval=120,
    timeout=14400,  # 4 hours
    dag=dag
)

# Set up pipeline dependencies
extract_job >> extract_monitor >> transform_job >> transform_monitor >> training_job >> training_monitor

Conditional Monitoring with Soft Fail

# Monitor with soft fail for optional jobs
optional_monitor = SparkKubernetesSensor(
    task_id='monitor_optional_job',
    application_name='optional-analytics-job',
    namespace='analytics',
    poke_interval=60,
    timeout=1800,
    soft_fail=True,  # Mark as SKIPPED if job fails
    dag=dag
)

# Continue pipeline even if optional job fails
from airflow.operators.dummy import DummyOperator

continue_pipeline = DummyOperator(
    task_id='continue_pipeline',
    dag=dag
)

optional_monitor >> continue_pipeline

Custom Resource Monitoring

def monitor_custom_resource(**context):
    """Monitor custom Kubernetes resource status."""
    from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
    
    hook = KubernetesHook(conn_id='kubernetes_default')
    
    # Monitor custom resource
    try:
        custom_object = hook.get_custom_object(
            group='apps.example.com',
            version='v1',
            plural='myapps',
            name='my-application',
            namespace='default'
        )
        
        status = custom_object.get('status', {})
        phase = status.get('phase', 'Unknown')
        
        print(f"Custom resource status: {phase}")
        
        # Check if ready
        conditions = status.get('conditions', [])
        ready_condition = next(
            (c for c in conditions if c.get('type') == 'Ready'),
            None
        )
        
        if ready_condition and ready_condition.get('status') == 'True':
            print("Custom resource is ready")
            return True
        else:
            print("Custom resource not ready yet")
            return False
            
    except Exception as e:
        print(f"Error monitoring custom resource: {e}")
        return False

from airflow.sensors.python import PythonSensor

custom_resource_sensor = PythonSensor(
    task_id='monitor_custom_resource',
    python_callable=monitor_custom_resource,
    poke_interval=30,
    timeout=600,
    dag=dag
)

Health Check Monitoring

def check_pod_health(**context):
    """Check pod health status."""
    from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
    
    hook = KubernetesHook(conn_id='kubernetes_default')
    
    try:
        pod = hook.get_pod(name='health-check-pod', namespace='monitoring')
        
        # Check pod phase
        phase = pod.status.phase
        print(f"Pod phase: {phase}")
        
        if phase == 'Running':
            # Check container readiness
            container_statuses = pod.status.container_statuses or []
            all_ready = all(status.ready for status in container_statuses)
            
            if all_ready:
                print("All containers are ready")
                return True
            else:
                print("Some containers are not ready")
                return False
        elif phase == 'Succeeded':
            print("Pod completed successfully")
            return True
        elif phase == 'Failed':
            print("Pod failed")
            raise Exception("Pod execution failed")
        else:
            print(f"Pod is in {phase} state, waiting...")
            return False
            
    except Exception as e:
        print(f"Error checking pod health: {e}")
        raise

health_monitor = PythonSensor(
    task_id='health_check_monitor',
    python_callable=check_pod_health,
    poke_interval=15,
    timeout=300,
    dag=dag
)

Resource Usage Monitoring

def monitor_resource_usage(**context):
    """Monitor Kubernetes resource usage."""
    from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
    from kubernetes.client import ApiException
    
    hook = KubernetesHook(conn_id='kubernetes_default')
    client = hook.get_conn()
    
    try:
        # Get metrics from metrics server (requires metrics-server)
        from kubernetes.client.api import custom_objects_api
        metrics_client = custom_objects_api.CustomObjectsApi(client)
        
        # Get pod metrics
        pod_metrics = metrics_client.list_namespaced_custom_object(
            group='metrics.k8s.io',
            version='v1beta1',
            namespace='default',
            plural='pods'
        )
        
        total_cpu = 0
        total_memory = 0
        pod_count = 0
        
        for pod in pod_metrics['items']:
            pod_name = pod['metadata']['name']
            if pod_name.startswith('data-processing'):
                containers = pod['containers']
                for container in containers:
                    usage = container['usage']
                    cpu = usage['cpu']
                    memory = usage['memory']
                    
                    # Convert to numeric values (simplified)
                    cpu_numeric = float(cpu.replace('n', '')) / 1000000000  # nanocores to cores
                    memory_numeric = float(memory.replace('Ki', '')) / 1024  # Ki to Mi
                    
                    total_cpu += cpu_numeric
                    total_memory += memory_numeric
                    pod_count += 1
        
        print(f"Resource usage - Pods: {pod_count}, CPU: {total_cpu:.2f} cores, Memory: {total_memory:.2f} Mi")
        
        # Check if usage is within acceptable limits
        cpu_limit = 10.0  # 10 cores
        memory_limit = 20480  # 20 Gi in Mi
        
        return total_cpu < cpu_limit and total_memory < memory_limit
        
    except ApiException as e:
        if e.status == 404:
            print("Metrics server not available")
            return True  # Skip check if metrics not available
        else:
            raise

resource_monitor = PythonSensor(
    task_id='resource_usage_monitor',
    python_callable=monitor_resource_usage,
    poke_interval=120,  # Check every 2 minutes
    timeout=1800,       # 30 minutes
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json