Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Create, monitor, and manage Kubernetes Jobs for batch processing, parallel execution, and complex workload orchestration. Jobs provide reliable execution with automatic retry, parallel processing, and completion tracking.
Execute batch workloads as Kubernetes Jobs with support for parallel processing, completion tracking, and automatic cleanup.
class KubernetesJobOperator(KubernetesPodOperator):
"""
Executes a Kubernetes Job.
Args:
name (str): Name of the job
image (str): Docker image to run
namespace (str): Kubernetes namespace. Defaults to 'default'
parallelism (int, optional): Number of parallel pods
completions (int, optional): Number of successful completions needed
backoff_limit (int, optional): Number of retries before marking job failed
ttl_seconds_after_finished (int, optional): TTL for cleanup after completion
active_deadline_seconds (int, optional): Maximum duration for job execution
suspend (bool): Whether to suspend the job. Default: False
manual_selector (bool): Manually manage pod selector. Default: False
completion_mode (str, optional): Completion mode ('NonIndexed' or 'Indexed')
pod_failure_policy (dict, optional): Pod failure policy configuration
**kwargs: Additional arguments passed to KubernetesPodOperator
"""
def __init__(
self,
name: str,
image: str,
namespace: str = "default",
parallelism: int | None = None,
completions: int | None = None,
backoff_limit: int | None = None,
ttl_seconds_after_finished: int | None = None,
active_deadline_seconds: int | None = None,
suspend: bool = False,
manual_selector: bool = False,
completion_mode: str | None = None,
pod_failure_policy: dict | None = None,
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Execute the job."""
...
def build_job_request_obj(self, context: Context) -> V1Job:
"""Build Kubernetes job request object."""
...
def create_job_request_obj(self, context: Context) -> V1Job:
"""Create job from pod template."""
...Delete existing Kubernetes Jobs with optional cleanup of associated pods and resources.
class KubernetesDeleteJobOperator(BaseOperator):
"""
Delete a Kubernetes Job.
Args:
name (str): Name of the job to delete
namespace (str): Kubernetes namespace. Defaults to 'default'
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
delete_on_success (bool): Delete job on successful completion. Default: True
delete_on_failure (bool): Delete job on failure. Default: False
propagation_policy (str): Deletion propagation policy. Default: 'Background'
"""
def __init__(
self,
name: str,
namespace: str = "default",
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
delete_on_success: bool = True,
delete_on_failure: bool = False,
propagation_policy: str = "Background",
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Delete the job."""
...Update existing Kubernetes Jobs with new specifications or configuration changes.
class KubernetesPatchJobOperator(BaseOperator):
"""
Patch a Kubernetes Job.
Args:
name (str): Name of the job to patch
namespace (str): Kubernetes namespace. Defaults to 'default'
body (dict): Patch body as dictionary
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
"""
def __init__(
self,
name: str,
namespace: str,
body: dict,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Patch the job."""
...Asynchronous monitoring of Kubernetes Job completion status with triggers for deferrable execution.
class KubernetesJobTrigger(BaseTrigger):
"""
Trigger for monitoring Kubernetes Job completion.
Args:
job_name (str): Name of the job to monitor
job_namespace (str): Namespace of the job
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool): Use in-cluster configuration. Default: True
get_logs (bool): Retrieve job logs. Default: True
startup_timeout (int): Startup timeout in seconds. Default: 120
"""
def __init__(
self,
job_name: str,
job_namespace: str,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool = True,
get_logs: bool = True,
startup_timeout: int = 120,
**kwargs
): ...
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Monitor job execution asynchronously."""
...from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
# Simple batch job
batch_job = KubernetesJobOperator(
task_id='batch_processing',
name='data-processor',
namespace='default',
image='batch-processor:latest',
cmds=['python', 'process_batch.py'],
completions=1,
parallelism=1,
dag=dag
)# Parallel processing job
parallel_job = KubernetesJobOperator(
task_id='parallel_processing',
name='parallel-processor',
namespace='default',
image='parallel-processor:latest',
cmds=['python', 'parallel_process.py'],
completions=10, # Need 10 successful completions
parallelism=3, # Run 3 pods in parallel
backoff_limit=2, # Allow 2 retries per pod
dag=dag
)# Job with automatic cleanup
cleanup_job = KubernetesJobOperator(
task_id='cleanup_job',
name='cleanup-processor',
namespace='default',
image='cleanup:latest',
ttl_seconds_after_finished=3600, # Clean up after 1 hour
active_deadline_seconds=1800, # Max 30 minutes execution
dag=dag
)# Job with custom failure handling
failure_policy_job = KubernetesJobOperator(
task_id='resilient_job',
name='resilient-processor',
namespace='default',
image='resilient:latest',
backoff_limit=5,
pod_failure_policy={
'rules': [
{
'action': 'FailJob',
'onExitCodes': {
'containerName': 'main',
'operator': 'In',
'values': [1, 2, 3]
}
},
{
'action': 'Ignore',
'onPodConditions': [
{
'type': 'DisruptionTarget'
}
]
}
]
},
dag=dag
)from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator
# Clean up completed job
delete_job = KubernetesDeleteJobOperator(
task_id='delete_completed_job',
name='data-processor',
namespace='default',
delete_on_success=True,
propagation_policy='Foreground', # Wait for pods to be deleted
dag=dag
)from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
# Job with monitoring
monitored_job = KubernetesJobOperator(
task_id='monitored_job',
name='monitored-processor',
namespace='default',
image='processor:latest',
deferrable=True, # Use async monitoring
get_logs=True,
log_events_on_failure=True,
dag=dag
)# Indexed job for array processing
indexed_job = KubernetesJobOperator(
task_id='indexed_processing',
name='indexed-processor',
namespace='default',
image='array-processor:latest',
cmds=['python', 'process_index.py'],
completions=10,
parallelism=3,
completion_mode='Indexed', # Each pod gets a completion index
env_vars=[
V1EnvVar(
name='JOB_COMPLETION_INDEX',
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(
field_path='metadata.annotations["batch.kubernetes.io/job-completion-index"]'
)
)
)
],
dag=dag
)from airflow.providers.cncf.kubernetes.operators.job import KubernetesPatchJobOperator
# Suspend a running job
patch_job = KubernetesPatchJobOperator(
task_id='suspend_job',
name='long-running-job',
namespace='default',
body={
'spec': {
'suspend': True
}
},
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes