Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute tasks in Kubernetes pods with comprehensive lifecycle management, monitoring, and resource control. Pod operations provide the foundation for running containerized workloads in Airflow workflows.
Execute tasks in Kubernetes pods with full control over container specifications, resource limits, and execution parameters.
class KubernetesPodOperator(BaseOperator):
"""
Execute a task in a Kubernetes Pod.
Args:
image (str): Docker image to run
name (str, optional): Name of the pod. Auto-generated if not provided
namespace (str): Kubernetes namespace. Defaults to 'default'
cmds (list[str], optional): Container command to execute
arguments (list[str], optional): Arguments for the container command
ports (list[V1ContainerPort], optional): Container ports to expose
volume_mounts (list[V1VolumeMount], optional): Volume mounts for the pod
volumes (list[V1Volume], optional): Volumes to attach to the pod
env_vars (list[V1EnvVar], optional): Environment variables
secrets (list[Secret], optional): Kubernetes secrets to mount
configmaps (list[str], optional): ConfigMaps to mount as volumes
labels (dict[str, str], optional): Pod labels
startup_timeout_seconds (int): Time to wait for pod startup. Default: 120
get_logs (bool): Whether to retrieve pod logs. Default: True
log_events_on_failure (bool): Log pod events on failure. Default: False
is_delete_operator_pod (bool): Delete pod after execution. Default: True
hostnetwork (bool): Use host networking. Default: False
dnspolicy (str, optional): DNS policy for the pod
schedulername (str, optional): Scheduler name
full_pod_spec (V1Pod, optional): Complete pod specification
node_selector (dict[str, str], optional): Node selection constraints
affinity (V1Affinity, optional): Pod affinity rules
tolerations (list[V1Toleration], optional): Pod tolerations
security_context (V1SecurityContext, optional): Security context
image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
image_pull_secrets (list[V1LocalObjectReference], optional): Image pull secrets
service_account_name (str, optional): Service account name
hostpid (bool): Use host PID namespace. Default: False
hostipc (bool): Use host IPC namespace. Default: False
priority_class_name (str, optional): Priority class name
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
do_xcom_push (bool): Push return value to XCom. Default: False
pod_template_file (str, optional): Path to pod template file
pod_template_dict (dict, optional): Pod template as dictionary
random_name_suffix (bool): Add random suffix to pod name. Default: True
reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
on_finish_action (str): Action when pod finishes. Default: 'delete_pod'
is_delete_operator_pod (bool): Delete pod after completion. Default: True
termination_grace_period (int, optional): Grace period for pod termination
active_deadline_seconds (int, optional): Pod active deadline
log_pod_spec_on_failure (bool): Log pod spec on failure. Default: True
container_logs (bool): Stream container logs. Default: True
wait_until_job_completion (bool): Wait for job completion. Default: True
"""
def __init__(
self,
*,
kubernetes_conn_id: str | None = "kubernetes_default",
namespace: str | None = None,
image: str | None = None,
name: str | None = None,
random_name_suffix: bool = True,
cmds: list[str] | None = None,
arguments: list[str] | None = None,
ports: list[V1ContainerPort] | None = None,
volume_mounts: list[V1VolumeMount] | None = None,
volumes: list[V1Volume] | None = None,
env_vars: list[V1EnvVar] | dict[str, str] | None = None,
env_from: list[V1EnvFromSource] | None = None,
secrets: list[Secret] | None = None,
in_cluster: bool | None = None,
cluster_context: str | None = None,
labels: dict | None = None,
reattach_on_restart: bool = True,
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
schedule_timeout_seconds: int | None = None,
get_logs: bool = True,
base_container_name: str | None = None,
base_container_status_polling_interval: float = 1,
init_container_logs: str | bool | None = None,
container_logs: str | bool | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
container_resources: V1ResourceRequirements | None = None,
affinity: V1Affinity | None = None,
config_file: str | None = None,
node_selector: dict | None = None,
image_pull_secrets: list[V1LocalObjectReference] | None = None,
service_account_name: str | None = None,
automount_service_account_token: bool | None = None,
hostnetwork: bool = False,
host_aliases: list[V1HostAlias] | None = None,
tolerations: list[V1Toleration] | None = None,
security_context: V1PodSecurityContext | dict | None = None,
container_security_context: V1SecurityContext | dict | None = None,
dnspolicy: str | None = None,
dns_config: V1PodDNSConfig | None = None,
hostname: str | None = None,
subdomain: str | None = None,
schedulername: str | None = None,
full_pod_spec: V1Pod | None = None,
init_containers: list[V1Container] | None = None,
log_events_on_failure: bool = False,
do_xcom_push: bool = False,
pod_template_file: str | None = None,
pod_template_dict: dict | None = None,
priority_class_name: str | None = None,
pod_runtime_info_envs: list[V1EnvVar] | None = None,
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_on_exit_code: int | None = None,
deferrable: bool = False,
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
on_finish_action: str = "delete_pod",
is_delete_operator_pod: bool | None = None,
termination_message_policy: str = "File",
active_deadline_seconds: int | None = None,
callbacks: list | None = None,
progress_callback: Callable[[str], None] | None = None,
logging_interval: int | None = None,
trigger_kwargs: dict | None = None,
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Execute the pod operation."""
...
def execute_sync(self, context: Context) -> Any:
"""Execute pod synchronously."""
...
def execute_async(self, context: Context) -> Any:
"""Execute pod asynchronously."""
...
def find_pod(self, context: Context) -> V1Pod | None:
"""Find an existing pod."""
...
def get_or_create_pod(self, context: Context) -> V1Pod:
"""Get existing pod or create a new one."""
...
def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> V1Pod:
"""Wait for pod to start running."""
...
def await_pod_completion(self, pod: V1Pod) -> V1Pod:
"""Wait for pod to complete execution."""
...
def extract_xcom(self, pod: V1Pod) -> str:
"""Extract XCom value from pod."""
...
def cleanup(self, pod: V1Pod | None, remote_pod: V1Pod | None) -> None:
"""Clean up pod resources."""
...
def build_pod_request_obj(self, context: Context | None = None) -> V1Pod:
"""Build Kubernetes pod request object."""
...Enumeration of Kubernetes pod event types for monitoring pod lifecycle changes.
class PodEventType(Enum):
"""Pod event types for monitoring."""
ADDED = "ADDED"
MODIFIED = "MODIFIED"
DELETED = "DELETED"Exception classes for pod operation error handling.
class PodReattachFailure(AirflowException):
"""Raised when pod reattachment fails."""
...
class PodCredentialsExpiredFailure(AirflowException):
"""Raised when pod credentials have expired."""
...from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
# Simple pod execution
simple_task = KubernetesPodOperator(
task_id='simple_pod',
name='simple-pod',
namespace='default',
image='python:3.9-slim',
cmds=['python', '-c'],
arguments=['print("Hello from Kubernetes!")'],
dag=dag
)from kubernetes.client import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource
# Pod with persistent volume
volume_task = KubernetesPodOperator(
task_id='volume_pod',
name='volume-pod',
namespace='default',
image='ubuntu:20.04',
cmds=['sh', '-c'],
arguments=['echo "Data processing" && ls -la /data'],
volumes=[
V1Volume(
name='data-volume',
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
claim_name='data-pvc'
)
)
],
volume_mounts=[
V1VolumeMount(
name='data-volume',
mount_path='/data'
)
],
dag=dag
)from kubernetes.client import V1EnvVar
from airflow.providers.cncf.kubernetes.secret import Secret
# Pod with secrets and env vars
secure_task = KubernetesPodOperator(
task_id='secure_pod',
name='secure-pod',
namespace='default',
image='myapp:latest',
env_vars=[
V1EnvVar(name='ENV', value='production'),
V1EnvVar(name='DEBUG', value='false')
],
secrets=[
Secret('env', 'DB_PASSWORD', 'db-secret', 'password'),
Secret('volume', '/etc/certs', 'tls-secret')
],
dag=dag
)from kubernetes.client import V1ResourceRequirements
# Pod with resource constraints
resource_task = KubernetesPodOperator(
task_id='resource_pod',
name='resource-pod',
namespace='default',
image='cpu-intensive:latest',
container_resources=V1ResourceRequirements(
requests={'cpu': '100m', 'memory': '128Mi'},
limits={'cpu': '500m', 'memory': '512Mi'}
),
dag=dag
)from kubernetes.client import V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement
# Pod with node affinity
affinity_task = KubernetesPodOperator(
task_id='affinity_pod',
name='affinity-pod',
namespace='default',
image='workload:latest',
node_selector={'disktype': 'ssd'},
affinity=V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[
V1NodeSelectorTerm(
match_expressions=[
V1NodeSelectorRequirement(
key='kubernetes.io/arch',
operator='In',
values=['amd64']
)
]
)
]
)
)
),
dag=dag
)# Async pod execution with deferral
async_task = KubernetesPodOperator(
task_id='async_pod',
name='async-pod',
namespace='default',
image='long-running:latest',
deferrable=True, # Enable async execution
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes