CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

spark-integration.mddocs/

Spark Integration

Deploy and manage Spark applications on Kubernetes clusters using custom resource definitions, monitoring sensors, and automated lifecycle management.

Capabilities

Spark Kubernetes Operator

Deploy Spark applications on Kubernetes using SparkApplication custom resources with comprehensive configuration and monitoring.

class SparkKubernetesOperator(KubernetesPodOperator):
    """
    Start Spark job on Kubernetes.
    
    Args:
        application_file (str): Path to Spark application YAML file
        namespace (str): Kubernetes namespace. Defaults to 'default'
        kubernetes_conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
        api_group (str): API group for Spark operator. Default: 'sparkoperator.k8s.io'
        api_version (str): API version for Spark operator. Default: 'v1beta2'
        **kwargs: Additional arguments passed to KubernetesPodOperator
    """
    def __init__(
        self,
        application_file: str,
        namespace: str = "default",
        kubernetes_conn_id: str = "kubernetes_default",
        api_group: str = "sparkoperator.k8s.io",
        api_version: str = "v1beta2",
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Execute the Spark application."""
        ...

Spark Kubernetes Sensor

Monitor Spark applications running on Kubernetes and wait for completion or specific status conditions.

class SparkKubernetesSensor(BaseSensorOperator):
    """
    Checks sparkApplication object in kubernetes cluster.
    
    Args:
        application_name (str): Name of the Spark application to monitor
        namespace (str): Kubernetes namespace. Defaults to 'default'
        kubernetes_conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
        attach_log (bool): Attach application logs to sensor output. Default: False
        api_group (str): API group for Spark operator. Default: 'sparkoperator.k8s.io'
        api_version (str): API version for Spark operator. Default: 'v1beta2'
        poke_interval (int): Interval between checks in seconds. Default: 60
        timeout (int): Timeout in seconds. Default: 60*60*24*7 (7 days)
    """
    def __init__(
        self,
        application_name: str,
        namespace: str = "default",
        kubernetes_conn_id: str = "kubernetes_default",
        attach_log: bool = False,
        api_group: str = "sparkoperator.k8s.io",
        api_version: str = "v1beta2",
        poke_interval: int = 60,
        timeout: int = 60 * 60 * 24 * 7,
        **kwargs
    ): ...

    def poke(self, context: Context) -> bool:
        """Check Spark application status."""
        ...

    def get_hook(self) -> KubernetesHook:
        """Get Kubernetes hook."""
        ...

Custom Object Launcher

Advanced Spark job management with custom object definitions and lifecycle control.

class SparkJobSpec:
    """
    Specification for Spark job configuration.
    
    Args:
        job_id (str): Unique identifier for the Spark job
        conn_id (str): Connection ID for Kubernetes cluster
        image (str): Docker image for Spark application
        app_name (str): Name of the Spark application
        namespace (str): Kubernetes namespace
        spec (dict): Spark application specification
    """
    def __init__(
        self,
        job_id: str,
        conn_id: str,
        image: str,
        app_name: str,
        namespace: str,
        spec: dict
    ): ...

class KubernetesSpec:
    """
    Kubernetes-specific configuration for Spark jobs.
    
    Args:
        image (str): Docker image
        namespace (str): Kubernetes namespace
        delete_on_termination (bool): Delete resources on termination
    """
    def __init__(
        self,
        image: str,
        namespace: str,
        delete_on_termination: bool = True
    ): ...

class SparkResources:
    """
    Resource specifications for Spark jobs.
    
    Args:
        driver_resources (dict): Driver resource specifications
        executor_resources (dict): Executor resource specifications
    """
    def __init__(
        self,
        driver_resources: dict,
        executor_resources: dict
    ): ...

class CustomObjectStatus:
    """
    Status representation for custom objects.
    
    Args:
        object (dict): Kubernetes object
        status (str): Current status
        message (str): Status message
    """
    def __init__(self, object: dict, status: str, message: str): ...

class CustomObjectLauncher(LoggingMixin):
    """
    Launcher for custom Kubernetes objects.
    
    Args:
        kube_client (ApiClient): Kubernetes API client
        group (str): API group
        version (str): API version
        plural (str): Resource plural name
        namespace (str): Kubernetes namespace
    """
    def __init__(
        self,
        kube_client: ApiClient,
        group: str,
        version: str,
        plural: str,
        namespace: str
    ): ...

    def start_spark_job(
        self,
        image: str,
        app_name: str,
        job_spec: dict,
        namespace: str
    ) -> str:
        """Start Spark job."""
        ...

    def delete_spark_job(self, job_id: str, namespace: str) -> None:
        """Delete Spark job."""
        ...

    def check_spark_job_completion(
        self,
        job_id: str,
        namespace: str
    ) -> CustomObjectStatus:
        """Check job completion status."""
        ...

def should_retry_start_spark_job(exception: Exception) -> bool:
    """
    Determine if Spark job start should be retried.
    
    Args:
        exception: Exception that occurred during job start
        
    Returns:
        bool: True if operation should be retried
    """
    ...

Usage Examples

Basic Spark Application Deployment

from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

# Deploy Spark application from YAML file
spark_job = SparkKubernetesOperator(
    task_id='run_spark_job',
    application_file='/path/to/spark-application.yaml',
    namespace='spark',
    kubernetes_conn_id='kubernetes_default',
    dag=dag
)

Spark Application with Custom Configuration

# Create Spark application YAML content
spark_app_yaml = """
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: data-processing-job
  namespace: spark
spec:
  type: Python
  mode: cluster
  image: spark:3.4.0-python3
  imagePullPolicy: IfNotPresent
  mainApplicationFile: s3a://my-bucket/spark-jobs/data_processor.py
  sparkVersion: 3.4.0
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-driver
    labels:
      version: 3.4.0
    env:
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: aws-credentials
            key: access-key
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: aws-credentials
            key: secret-key
  executor:
    cores: 1
    instances: 2
    memory: 512m
    labels:
      version: 3.4.0
    env:
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: aws-credentials
            key: access-key
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: aws-credentials
            key: secret-key
  deps:
    packages:
      - org.apache.hadoop:hadoop-aws:3.3.4
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      jmxExporterJar: /prometheus/jmx_prometheus_javaagent-0.17.2.jar
      port: 8090
"""

# Write YAML to file and deploy
import tempfile
import os

with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
    f.write(spark_app_yaml)
    yaml_file_path = f.name

spark_job = SparkKubernetesOperator(
    task_id='data_processing_spark_job',
    application_file=yaml_file_path,
    namespace='spark',
    dag=dag
)

Monitoring Spark Application with Sensor

from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor

# Monitor Spark application completion
spark_sensor = SparkKubernetesSensor(
    task_id='wait_for_spark_completion',
    application_name='data-processing-job',
    namespace='spark',
    kubernetes_conn_id='kubernetes_default',
    attach_log=True,
    poke_interval=30,
    timeout=3600,  # 1 hour timeout
    dag=dag
)

# Chain operations
spark_job >> spark_sensor

Advanced Spark Job with Custom Launcher

from airflow.providers.cncf.kubernetes.operators.custom_object_launcher import (
    SparkJobSpec, CustomObjectLauncher, SparkResources
)

def launch_custom_spark_job(**context):
    from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
    
    # Get Kubernetes client
    hook = KubernetesHook(conn_id='kubernetes_default')
    kube_client = hook.get_conn()
    
    # Create custom object launcher
    launcher = CustomObjectLauncher(
        kube_client=kube_client,
        group='sparkoperator.k8s.io',
        version='v1beta2',
        plural='sparkapplications',
        namespace='spark'
    )
    
    # Define Spark job specification
    spark_spec = {
        'type': 'Python',
        'mode': 'cluster',
        'image': 'spark:3.4.0-python3',
        'mainApplicationFile': 's3a://my-bucket/jobs/ml_training.py',
        'sparkVersion': '3.4.0',
        'driver': {
            'cores': 2,
            'memory': '2g',
            'serviceAccount': 'spark-driver'
        },
        'executor': {
            'cores': 2,
            'instances': 4,
            'memory': '4g'
        },
        'arguments': [
            '--input-path', 's3a://my-bucket/data/training/',
            '--output-path', 's3a://my-bucket/models/',
            '--model-type', 'random-forest'
        ]
    }
    
    # Start Spark job
    job_id = launcher.start_spark_job(
        image='spark:3.4.0-python3',
        app_name='ml-training-job',
        job_spec=spark_spec,
        namespace='spark'
    )
    
    return job_id

# Use PythonOperator to launch custom Spark job
from airflow.operators.python import PythonOperator

custom_spark_launcher = PythonOperator(
    task_id='launch_custom_spark',
    python_callable=launch_custom_spark_job,
    dag=dag
)

Spark Job with Dependencies

# Complete Spark workflow with dependencies
from airflow import DAG
from datetime import datetime

dag = DAG(
    'spark_workflow',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

# Data preparation Spark job
data_prep_job = SparkKubernetesOperator(
    task_id='data_preparation',
    application_file='/configs/data-prep-spark.yaml',
    namespace='spark',
    dag=dag
)

# Data processing Spark job
data_process_job = SparkKubernetesOperator(
    task_id='data_processing',
    application_file='/configs/data-process-spark.yaml',
    namespace='spark',
    dag=dag
)

# Model training Spark job
model_training_job = SparkKubernetesOperator(
    task_id='model_training',
    application_file='/configs/model-training-spark.yaml',
    namespace='spark',
    dag=dag
)

# Monitor each job completion
prep_sensor = SparkKubernetesSensor(
    task_id='wait_prep_completion',
    application_name='data-prep-job',
    namespace='spark',
    poke_interval=30,
    dag=dag
)

process_sensor = SparkKubernetesSensor(
    task_id='wait_process_completion',
    application_name='data-process-job',
    namespace='spark',
    poke_interval=30,
    dag=dag
)

training_sensor = SparkKubernetesSensor(
    task_id='wait_training_completion',
    application_name='model-training-job',
    namespace='spark',
    poke_interval=60,
    timeout=7200,  # 2 hour timeout for training
    dag=dag
)

# Set dependencies
data_prep_job >> prep_sensor >> data_process_job >> process_sensor >> model_training_job >> training_sensor

Spark with Dynamic Configuration

def generate_spark_config(**context):
    """Generate dynamic Spark configuration based on execution date."""
    execution_date = context['execution_date']
    
    # Dynamic configuration based on date
    spark_config = {
        'apiVersion': 'sparkoperator.k8s.io/v1beta2',
        'kind': 'SparkApplication',
        'metadata': {
            'name': f'dynamic-job-{execution_date.strftime("%Y%m%d")}',
            'namespace': 'spark'
        },
        'spec': {
            'type': 'Python',
            'mode': 'cluster',
            'image': 'spark:3.4.0-python3',
            'mainApplicationFile': 's3a://my-bucket/jobs/dynamic_processor.py',
            'arguments': [
                '--date', execution_date.strftime('%Y-%m-%d'),
                '--input-path', f's3a://my-bucket/data/{execution_date.year}/{execution_date.month:02d}/',
                '--output-path', f's3a://my-bucket/processed/{execution_date.strftime("%Y-%m-%d")}/'
            ],
            'driver': {
                'cores': 1,
                'memory': '1g'
            },
            'executor': {
                'cores': 2,
                'instances': execution_date.weekday() + 2,  # More executors on weekends
                'memory': '2g'
            }
        }
    }
    
    # Write config to temporary file
    import tempfile
    import yaml
    
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
        yaml.dump(spark_config, f)
        return f.name

# Dynamic Spark job
from airflow.operators.python import PythonOperator

generate_config = PythonOperator(
    task_id='generate_spark_config',
    python_callable=generate_spark_config,
    dag=dag
)

dynamic_spark_job = SparkKubernetesOperator(
    task_id='dynamic_spark_job',
    application_file="{{ ti.xcom_pull(task_ids='generate_spark_config') }}",
    namespace='spark',
    dag=dag
)

generate_config >> dynamic_spark_job

Spark Job Cleanup

def cleanup_spark_resources(**context):
    """Clean up Spark application resources after completion."""
    from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
    
    hook = KubernetesHook(conn_id='kubernetes_default')
    
    # Delete completed Spark applications older than 1 day
    import datetime
    cutoff_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
    
    # Get custom objects
    custom_objects = hook.get_conn().list_namespaced_custom_object(
        group='sparkoperator.k8s.io',
        version='v1beta2',
        namespace='spark',
        plural='sparkapplications'
    )
    
    for app in custom_objects['items']:
        app_name = app['metadata']['name']
        creation_time = app['metadata']['creationTimestamp']
        status = app.get('status', {}).get('applicationState', {}).get('state', '')
        
        # Parse creation time
        from datetime import datetime as dt
        created = dt.strptime(creation_time, '%Y-%m-%dT%H:%M:%SZ')
        
        # Delete if completed and old
        if status in ['COMPLETED', 'FAILED'] and created < cutoff_date:
            hook.delete_custom_object(
                group='sparkoperator.k8s.io',
                version='v1beta2',
                plural='sparkapplications',
                name=app_name,
                namespace='spark'
            )
            print(f"Deleted Spark application: {app_name}")

cleanup_task = PythonOperator(
    task_id='cleanup_spark_apps',
    python_callable=cleanup_spark_resources,
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json