Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Monitor Kubernetes workloads with sensors that check application status and wait for completion conditions. Monitoring capabilities include Spark application tracking and custom resource status checking.
Monitor Spark applications running on Kubernetes clusters and wait for specific status conditions or completion.
class SparkKubernetesSensor(BaseSensorOperator):
"""
Checks sparkApplication object in kubernetes cluster.
Args:
application_name (str): Name of the Spark application to monitor
namespace (str): Kubernetes namespace. Defaults to 'default'
kubernetes_conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
attach_log (bool): Attach application logs to sensor output. Default: False
api_group (str): API group for Spark operator. Default: 'sparkoperator.k8s.io'
api_version (str): API version for Spark operator. Default: 'v1beta2'
poke_interval (int): Interval between checks in seconds. Default: 60
timeout (int): Timeout in seconds. Default: 60*60*24*7 (7 days)
mode (str): Sensor mode ('poke' or 'reschedule'). Default: 'poke'
exponential_backoff (bool): Use exponential backoff. Default: False
max_wait (int): Maximum wait time between pokes. Default: 60
soft_fail (bool): Mark task as SKIPPED on failure. Default: False
"""
def __init__(
self,
application_name: str,
namespace: str = "default",
kubernetes_conn_id: str = "kubernetes_default",
attach_log: bool = False,
api_group: str = "sparkoperator.k8s.io",
api_version: str = "v1beta2",
poke_interval: int = 60,
timeout: int = 60 * 60 * 24 * 7,
mode: str = "poke",
exponential_backoff: bool = False,
max_wait: int = 60,
soft_fail: bool = False,
**kwargs
): ...
def poke(self, context: Context) -> bool:
"""
Check Spark application status.
Returns:
bool: True if application has completed successfully, False otherwise
"""
...
def get_hook(self) -> KubernetesHook:
"""
Get Kubernetes hook for cluster access.
Returns:
KubernetesHook: Configured Kubernetes hook
"""
...Asynchronous monitoring of Kubernetes Pod completion with triggers for deferrable execution.
class KubernetesPodTrigger(BaseTrigger):
"""
Trigger for monitoring Kubernetes Pod completion.
Args:
pod_name (str): Name of the pod to monitor
pod_namespace (str): Namespace of the pod
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool): Use in-cluster configuration. Default: True
get_logs (bool): Retrieve pod logs. Default: True
startup_timeout (int): Startup timeout in seconds. Default: 120
delete_on_finish (bool): Delete pod when finished. Default: True
poll_interval (int): Polling interval in seconds. Default: 10
logging_level (str): Logging level. Default: 'INFO'
"""
def __init__(
self,
pod_name: str,
pod_namespace: str,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool = True,
get_logs: bool = True,
startup_timeout: int = 120,
delete_on_finish: bool = True,
poll_interval: int = 10,
logging_level: str = "INFO",
**kwargs
): ...
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Monitor pod execution asynchronously.
Yields:
TriggerEvent: Events indicating pod status changes
"""
...Asynchronous monitoring of Kubernetes Job completion status with triggers for deferrable execution.
class KubernetesJobTrigger(BaseTrigger):
"""
Trigger for monitoring Kubernetes Job completion.
Args:
job_name (str): Name of the job to monitor
job_namespace (str): Namespace of the job
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool): Use in-cluster configuration. Default: True
get_logs (bool): Retrieve job logs. Default: True
startup_timeout (int): Startup timeout in seconds. Default: 120
poll_interval (int): Polling interval in seconds. Default: 10
logging_level (str): Logging level. Default: 'INFO'
"""
def __init__(
self,
job_name: str,
job_namespace: str,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool = True,
get_logs: bool = True,
startup_timeout: int = 120,
poll_interval: int = 10,
logging_level: str = "INFO",
**kwargs
): ...
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Monitor job execution asynchronously.
Yields:
TriggerEvent: Events indicating job status changes
"""
...Enumeration of possible container states for monitoring and status checking.
class ContainerState(Enum):
"""Container state enumeration for monitoring."""
WAITING = "waiting"
RUNNING = "running"
TERMINATED = "terminated"from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
# Monitor Spark application completion
spark_monitor = SparkKubernetesSensor(
task_id='wait_for_spark_completion',
application_name='data-processing-job',
namespace='spark',
kubernetes_conn_id='kubernetes_default',
poke_interval=30,
timeout=3600, # 1 hour timeout
dag=dag
)# Monitor with detailed logging
spark_monitor_with_logs = SparkKubernetesSensor(
task_id='monitor_spark_with_logs',
application_name='ml-training-job',
namespace='ml',
attach_log=True, # Attach Spark application logs
poke_interval=60,
timeout=7200, # 2 hours for training job
mode='poke',
dag=dag
)# Monitor custom Spark operator
custom_spark_monitor = SparkKubernetesSensor(
task_id='monitor_custom_spark',
application_name='custom-analytics-job',
namespace='analytics',
api_group='analytics.company.com', # Custom API group
api_version='v1alpha1', # Custom API version
poke_interval=45,
timeout=5400, # 90 minutes
dag=dag
)# Monitor with exponential backoff to reduce API calls
efficient_monitor = SparkKubernetesSensor(
task_id='efficient_spark_monitor',
application_name='batch-processing-job',
namespace='batch',
poke_interval=10, # Start with 10 seconds
exponential_backoff=True,
max_wait=300, # Max 5 minutes between checks
timeout=10800, # 3 hours total
dag=dag
)from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
# Pod with async monitoring using triggers
async_pod = KubernetesPodOperator(
task_id='async_pod_task',
name='long-running-pod',
namespace='default',
image='long-runner:latest',
deferrable=True, # Enable async monitoring
startup_timeout_seconds=300,
dag=dag
)from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
# Job with custom monitoring configuration
monitored_job = KubernetesJobOperator(
task_id='monitored_batch_job',
name='batch-processor',
namespace='batch',
image='batch-processor:latest',
deferrable=True,
get_logs=True,
startup_timeout_seconds=180,
dag=dag
)from airflow import DAG
from datetime import datetime
dag = DAG(
'spark_monitoring_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Launch multiple Spark applications
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
# Data extraction job
extract_job = SparkKubernetesOperator(
task_id='launch_extract_job',
application_file='/configs/extract-spark.yaml',
namespace='spark',
dag=dag
)
# Data transformation job
transform_job = SparkKubernetesOperator(
task_id='launch_transform_job',
application_file='/configs/transform-spark.yaml',
namespace='spark',
dag=dag
)
# Model training job
training_job = SparkKubernetesOperator(
task_id='launch_training_job',
application_file='/configs/training-spark.yaml',
namespace='spark',
dag=dag
)
# Monitor each application
extract_monitor = SparkKubernetesSensor(
task_id='monitor_extract',
application_name='data-extract-job',
namespace='spark',
poke_interval=30,
timeout=1800, # 30 minutes
dag=dag
)
transform_monitor = SparkKubernetesSensor(
task_id='monitor_transform',
application_name='data-transform-job',
namespace='spark',
poke_interval=45,
timeout=3600, # 1 hour
dag=dag
)
training_monitor = SparkKubernetesSensor(
task_id='monitor_training',
application_name='model-training-job',
namespace='spark',
attach_log=True, # Get training logs
poke_interval=120,
timeout=14400, # 4 hours
dag=dag
)
# Set up pipeline dependencies
extract_job >> extract_monitor >> transform_job >> transform_monitor >> training_job >> training_monitor# Monitor with soft fail for optional jobs
optional_monitor = SparkKubernetesSensor(
task_id='monitor_optional_job',
application_name='optional-analytics-job',
namespace='analytics',
poke_interval=60,
timeout=1800,
soft_fail=True, # Mark as SKIPPED if job fails
dag=dag
)
# Continue pipeline even if optional job fails
from airflow.operators.dummy import DummyOperator
continue_pipeline = DummyOperator(
task_id='continue_pipeline',
dag=dag
)
optional_monitor >> continue_pipelinedef monitor_custom_resource(**context):
"""Monitor custom Kubernetes resource status."""
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
hook = KubernetesHook(conn_id='kubernetes_default')
# Monitor custom resource
try:
custom_object = hook.get_custom_object(
group='apps.example.com',
version='v1',
plural='myapps',
name='my-application',
namespace='default'
)
status = custom_object.get('status', {})
phase = status.get('phase', 'Unknown')
print(f"Custom resource status: {phase}")
# Check if ready
conditions = status.get('conditions', [])
ready_condition = next(
(c for c in conditions if c.get('type') == 'Ready'),
None
)
if ready_condition and ready_condition.get('status') == 'True':
print("Custom resource is ready")
return True
else:
print("Custom resource not ready yet")
return False
except Exception as e:
print(f"Error monitoring custom resource: {e}")
return False
from airflow.sensors.python import PythonSensor
custom_resource_sensor = PythonSensor(
task_id='monitor_custom_resource',
python_callable=monitor_custom_resource,
poke_interval=30,
timeout=600,
dag=dag
)def check_pod_health(**context):
"""Check pod health status."""
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
hook = KubernetesHook(conn_id='kubernetes_default')
try:
pod = hook.get_pod(name='health-check-pod', namespace='monitoring')
# Check pod phase
phase = pod.status.phase
print(f"Pod phase: {phase}")
if phase == 'Running':
# Check container readiness
container_statuses = pod.status.container_statuses or []
all_ready = all(status.ready for status in container_statuses)
if all_ready:
print("All containers are ready")
return True
else:
print("Some containers are not ready")
return False
elif phase == 'Succeeded':
print("Pod completed successfully")
return True
elif phase == 'Failed':
print("Pod failed")
raise Exception("Pod execution failed")
else:
print(f"Pod is in {phase} state, waiting...")
return False
except Exception as e:
print(f"Error checking pod health: {e}")
raise
health_monitor = PythonSensor(
task_id='health_check_monitor',
python_callable=check_pod_health,
poke_interval=15,
timeout=300,
dag=dag
)def monitor_resource_usage(**context):
"""Monitor Kubernetes resource usage."""
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from kubernetes.client import ApiException
hook = KubernetesHook(conn_id='kubernetes_default')
client = hook.get_conn()
try:
# Get metrics from metrics server (requires metrics-server)
from kubernetes.client.api import custom_objects_api
metrics_client = custom_objects_api.CustomObjectsApi(client)
# Get pod metrics
pod_metrics = metrics_client.list_namespaced_custom_object(
group='metrics.k8s.io',
version='v1beta1',
namespace='default',
plural='pods'
)
total_cpu = 0
total_memory = 0
pod_count = 0
for pod in pod_metrics['items']:
pod_name = pod['metadata']['name']
if pod_name.startswith('data-processing'):
containers = pod['containers']
for container in containers:
usage = container['usage']
cpu = usage['cpu']
memory = usage['memory']
# Convert to numeric values (simplified)
cpu_numeric = float(cpu.replace('n', '')) / 1000000000 # nanocores to cores
memory_numeric = float(memory.replace('Ki', '')) / 1024 # Ki to Mi
total_cpu += cpu_numeric
total_memory += memory_numeric
pod_count += 1
print(f"Resource usage - Pods: {pod_count}, CPU: {total_cpu:.2f} cores, Memory: {total_memory:.2f} Mi")
# Check if usage is within acceptable limits
cpu_limit = 10.0 # 10 cores
memory_limit = 20480 # 20 Gi in Mi
return total_cpu < cpu_limit and total_memory < memory_limit
except ApiException as e:
if e.status == 404:
print("Metrics server not available")
return True # Skip check if metrics not available
else:
raise
resource_monitor = PythonSensor(
task_id='resource_usage_monitor',
python_callable=monitor_resource_usage,
poke_interval=120, # Check every 2 minutes
timeout=1800, # 30 minutes
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes