Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Deploy and manage Spark applications on Kubernetes clusters using custom resource definitions, monitoring sensors, and automated lifecycle management.
Deploy Spark applications on Kubernetes using SparkApplication custom resources with comprehensive configuration and monitoring.
class SparkKubernetesOperator(KubernetesPodOperator):
"""
Start Spark job on Kubernetes.
Args:
application_file (str): Path to Spark application YAML file
namespace (str): Kubernetes namespace. Defaults to 'default'
kubernetes_conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
api_group (str): API group for Spark operator. Default: 'sparkoperator.k8s.io'
api_version (str): API version for Spark operator. Default: 'v1beta2'
**kwargs: Additional arguments passed to KubernetesPodOperator
"""
def __init__(
self,
application_file: str,
namespace: str = "default",
kubernetes_conn_id: str = "kubernetes_default",
api_group: str = "sparkoperator.k8s.io",
api_version: str = "v1beta2",
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Execute the Spark application."""
...Monitor Spark applications running on Kubernetes and wait for completion or specific status conditions.
class SparkKubernetesSensor(BaseSensorOperator):
"""
Checks sparkApplication object in kubernetes cluster.
Args:
application_name (str): Name of the Spark application to monitor
namespace (str): Kubernetes namespace. Defaults to 'default'
kubernetes_conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
attach_log (bool): Attach application logs to sensor output. Default: False
api_group (str): API group for Spark operator. Default: 'sparkoperator.k8s.io'
api_version (str): API version for Spark operator. Default: 'v1beta2'
poke_interval (int): Interval between checks in seconds. Default: 60
timeout (int): Timeout in seconds. Default: 60*60*24*7 (7 days)
"""
def __init__(
self,
application_name: str,
namespace: str = "default",
kubernetes_conn_id: str = "kubernetes_default",
attach_log: bool = False,
api_group: str = "sparkoperator.k8s.io",
api_version: str = "v1beta2",
poke_interval: int = 60,
timeout: int = 60 * 60 * 24 * 7,
**kwargs
): ...
def poke(self, context: Context) -> bool:
"""Check Spark application status."""
...
def get_hook(self) -> KubernetesHook:
"""Get Kubernetes hook."""
...Advanced Spark job management with custom object definitions and lifecycle control.
class SparkJobSpec:
"""
Specification for Spark job configuration.
Args:
job_id (str): Unique identifier for the Spark job
conn_id (str): Connection ID for Kubernetes cluster
image (str): Docker image for Spark application
app_name (str): Name of the Spark application
namespace (str): Kubernetes namespace
spec (dict): Spark application specification
"""
def __init__(
self,
job_id: str,
conn_id: str,
image: str,
app_name: str,
namespace: str,
spec: dict
): ...
class KubernetesSpec:
"""
Kubernetes-specific configuration for Spark jobs.
Args:
image (str): Docker image
namespace (str): Kubernetes namespace
delete_on_termination (bool): Delete resources on termination
"""
def __init__(
self,
image: str,
namespace: str,
delete_on_termination: bool = True
): ...
class SparkResources:
"""
Resource specifications for Spark jobs.
Args:
driver_resources (dict): Driver resource specifications
executor_resources (dict): Executor resource specifications
"""
def __init__(
self,
driver_resources: dict,
executor_resources: dict
): ...
class CustomObjectStatus:
"""
Status representation for custom objects.
Args:
object (dict): Kubernetes object
status (str): Current status
message (str): Status message
"""
def __init__(self, object: dict, status: str, message: str): ...
class CustomObjectLauncher(LoggingMixin):
"""
Launcher for custom Kubernetes objects.
Args:
kube_client (ApiClient): Kubernetes API client
group (str): API group
version (str): API version
plural (str): Resource plural name
namespace (str): Kubernetes namespace
"""
def __init__(
self,
kube_client: ApiClient,
group: str,
version: str,
plural: str,
namespace: str
): ...
def start_spark_job(
self,
image: str,
app_name: str,
job_spec: dict,
namespace: str
) -> str:
"""Start Spark job."""
...
def delete_spark_job(self, job_id: str, namespace: str) -> None:
"""Delete Spark job."""
...
def check_spark_job_completion(
self,
job_id: str,
namespace: str
) -> CustomObjectStatus:
"""Check job completion status."""
...
def should_retry_start_spark_job(exception: Exception) -> bool:
"""
Determine if Spark job start should be retried.
Args:
exception: Exception that occurred during job start
Returns:
bool: True if operation should be retried
"""
...from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
# Deploy Spark application from YAML file
spark_job = SparkKubernetesOperator(
task_id='run_spark_job',
application_file='/path/to/spark-application.yaml',
namespace='spark',
kubernetes_conn_id='kubernetes_default',
dag=dag
)# Create Spark application YAML content
spark_app_yaml = """
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: data-processing-job
namespace: spark
spec:
type: Python
mode: cluster
image: spark:3.4.0-python3
imagePullPolicy: IfNotPresent
mainApplicationFile: s3a://my-bucket/spark-jobs/data_processor.py
sparkVersion: 3.4.0
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-driver
labels:
version: 3.4.0
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access-key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: secret-key
executor:
cores: 1
instances: 2
memory: 512m
labels:
version: 3.4.0
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access-key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: secret-key
deps:
packages:
- org.apache.hadoop:hadoop-aws:3.3.4
monitoring:
exposeDriverMetrics: true
exposeExecutorMetrics: true
prometheus:
jmxExporterJar: /prometheus/jmx_prometheus_javaagent-0.17.2.jar
port: 8090
"""
# Write YAML to file and deploy
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(spark_app_yaml)
yaml_file_path = f.name
spark_job = SparkKubernetesOperator(
task_id='data_processing_spark_job',
application_file=yaml_file_path,
namespace='spark',
dag=dag
)from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
# Monitor Spark application completion
spark_sensor = SparkKubernetesSensor(
task_id='wait_for_spark_completion',
application_name='data-processing-job',
namespace='spark',
kubernetes_conn_id='kubernetes_default',
attach_log=True,
poke_interval=30,
timeout=3600, # 1 hour timeout
dag=dag
)
# Chain operations
spark_job >> spark_sensorfrom airflow.providers.cncf.kubernetes.operators.custom_object_launcher import (
SparkJobSpec, CustomObjectLauncher, SparkResources
)
def launch_custom_spark_job(**context):
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
# Get Kubernetes client
hook = KubernetesHook(conn_id='kubernetes_default')
kube_client = hook.get_conn()
# Create custom object launcher
launcher = CustomObjectLauncher(
kube_client=kube_client,
group='sparkoperator.k8s.io',
version='v1beta2',
plural='sparkapplications',
namespace='spark'
)
# Define Spark job specification
spark_spec = {
'type': 'Python',
'mode': 'cluster',
'image': 'spark:3.4.0-python3',
'mainApplicationFile': 's3a://my-bucket/jobs/ml_training.py',
'sparkVersion': '3.4.0',
'driver': {
'cores': 2,
'memory': '2g',
'serviceAccount': 'spark-driver'
},
'executor': {
'cores': 2,
'instances': 4,
'memory': '4g'
},
'arguments': [
'--input-path', 's3a://my-bucket/data/training/',
'--output-path', 's3a://my-bucket/models/',
'--model-type', 'random-forest'
]
}
# Start Spark job
job_id = launcher.start_spark_job(
image='spark:3.4.0-python3',
app_name='ml-training-job',
job_spec=spark_spec,
namespace='spark'
)
return job_id
# Use PythonOperator to launch custom Spark job
from airflow.operators.python import PythonOperator
custom_spark_launcher = PythonOperator(
task_id='launch_custom_spark',
python_callable=launch_custom_spark_job,
dag=dag
)# Complete Spark workflow with dependencies
from airflow import DAG
from datetime import datetime
dag = DAG(
'spark_workflow',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Data preparation Spark job
data_prep_job = SparkKubernetesOperator(
task_id='data_preparation',
application_file='/configs/data-prep-spark.yaml',
namespace='spark',
dag=dag
)
# Data processing Spark job
data_process_job = SparkKubernetesOperator(
task_id='data_processing',
application_file='/configs/data-process-spark.yaml',
namespace='spark',
dag=dag
)
# Model training Spark job
model_training_job = SparkKubernetesOperator(
task_id='model_training',
application_file='/configs/model-training-spark.yaml',
namespace='spark',
dag=dag
)
# Monitor each job completion
prep_sensor = SparkKubernetesSensor(
task_id='wait_prep_completion',
application_name='data-prep-job',
namespace='spark',
poke_interval=30,
dag=dag
)
process_sensor = SparkKubernetesSensor(
task_id='wait_process_completion',
application_name='data-process-job',
namespace='spark',
poke_interval=30,
dag=dag
)
training_sensor = SparkKubernetesSensor(
task_id='wait_training_completion',
application_name='model-training-job',
namespace='spark',
poke_interval=60,
timeout=7200, # 2 hour timeout for training
dag=dag
)
# Set dependencies
data_prep_job >> prep_sensor >> data_process_job >> process_sensor >> model_training_job >> training_sensordef generate_spark_config(**context):
"""Generate dynamic Spark configuration based on execution date."""
execution_date = context['execution_date']
# Dynamic configuration based on date
spark_config = {
'apiVersion': 'sparkoperator.k8s.io/v1beta2',
'kind': 'SparkApplication',
'metadata': {
'name': f'dynamic-job-{execution_date.strftime("%Y%m%d")}',
'namespace': 'spark'
},
'spec': {
'type': 'Python',
'mode': 'cluster',
'image': 'spark:3.4.0-python3',
'mainApplicationFile': 's3a://my-bucket/jobs/dynamic_processor.py',
'arguments': [
'--date', execution_date.strftime('%Y-%m-%d'),
'--input-path', f's3a://my-bucket/data/{execution_date.year}/{execution_date.month:02d}/',
'--output-path', f's3a://my-bucket/processed/{execution_date.strftime("%Y-%m-%d")}/'
],
'driver': {
'cores': 1,
'memory': '1g'
},
'executor': {
'cores': 2,
'instances': execution_date.weekday() + 2, # More executors on weekends
'memory': '2g'
}
}
}
# Write config to temporary file
import tempfile
import yaml
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
yaml.dump(spark_config, f)
return f.name
# Dynamic Spark job
from airflow.operators.python import PythonOperator
generate_config = PythonOperator(
task_id='generate_spark_config',
python_callable=generate_spark_config,
dag=dag
)
dynamic_spark_job = SparkKubernetesOperator(
task_id='dynamic_spark_job',
application_file="{{ ti.xcom_pull(task_ids='generate_spark_config') }}",
namespace='spark',
dag=dag
)
generate_config >> dynamic_spark_jobdef cleanup_spark_resources(**context):
"""Clean up Spark application resources after completion."""
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
hook = KubernetesHook(conn_id='kubernetes_default')
# Delete completed Spark applications older than 1 day
import datetime
cutoff_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
# Get custom objects
custom_objects = hook.get_conn().list_namespaced_custom_object(
group='sparkoperator.k8s.io',
version='v1beta2',
namespace='spark',
plural='sparkapplications'
)
for app in custom_objects['items']:
app_name = app['metadata']['name']
creation_time = app['metadata']['creationTimestamp']
status = app.get('status', {}).get('applicationState', {}).get('state', '')
# Parse creation time
from datetime import datetime as dt
created = dt.strptime(creation_time, '%Y-%m-%dT%H:%M:%SZ')
# Delete if completed and old
if status in ['COMPLETED', 'FAILED'] and created < cutoff_date:
hook.delete_custom_object(
group='sparkoperator.k8s.io',
version='v1beta2',
plural='sparkapplications',
name=app_name,
namespace='spark'
)
print(f"Deleted Spark application: {app_name}")
cleanup_task = PythonOperator(
task_id='cleanup_spark_apps',
python_callable=cleanup_spark_resources,
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes