Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions. Resource management operators provide comprehensive lifecycle control over Kubernetes objects.
Base class providing common functionality for Kubernetes resource operations.
class KubernetesResourceBaseOperator(BaseOperator):
"""
Base class for Kubernetes resource operations.
Args:
namespace (str): Kubernetes namespace. Defaults to 'default'
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
conn_id (str): Airflow connection ID. Default: 'kubernetes_default'
"""
def __init__(
self,
namespace: str = "default",
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
conn_id: str = "kubernetes_default",
**kwargs
): ...Create Kubernetes resources from YAML files, dictionaries, or custom resource definitions.
class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
"""
Create Kubernetes resources from YAML.
Args:
yaml_conf (str, optional): YAML configuration as string
yaml_conf_file (str, optional): Path to YAML configuration file
custom_resource_definition (dict, optional): Custom resource definition as dict
namespace (str): Target namespace. Defaults to 'default'
custom_objects (list[dict], optional): List of custom objects to create
namespaced (bool): Whether resources are namespaced. Default: True
wait_until_ready (bool): Wait for resources to be ready. Default: False
wait_timeout (int): Timeout for waiting in seconds. Default: 300
"""
def __init__(
self,
yaml_conf: str | None = None,
yaml_conf_file: str | None = None,
custom_resource_definition: dict | None = None,
custom_objects: list[dict] | None = None,
namespaced: bool = True,
wait_until_ready: bool = False,
wait_timeout: int = 300,
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Create the Kubernetes resources."""
...Delete Kubernetes resources by name, labels, or from YAML specifications.
class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
"""
Delete Kubernetes resources.
Args:
yaml_conf (str, optional): YAML configuration for resources to delete
yaml_conf_file (str, optional): Path to YAML file with resources to delete
custom_resource_definition (dict, optional): Custom resource definition to delete
custom_objects (list[dict], optional): List of custom objects to delete
api_version (str, optional): API version of resources to delete
kind (str, optional): Kind of resources to delete
name (str, optional): Name of specific resource to delete
label_selector (str, optional): Label selector for bulk deletion
field_selector (str, optional): Field selector for resource selection
grace_period_seconds (int, optional): Grace period for deletion
propagation_policy (str): Deletion propagation policy. Default: 'Background'
wait_for_completion (bool): Wait for deletion to complete. Default: True
wait_timeout (int): Timeout for waiting in seconds. Default: 300
"""
def __init__(
self,
yaml_conf: str | None = None,
yaml_conf_file: str | None = None,
custom_resource_definition: dict | None = None,
custom_objects: list[dict] | None = None,
api_version: str | None = None,
kind: str | None = None,
name: str | None = None,
label_selector: str | None = None,
field_selector: str | None = None,
grace_period_seconds: int | None = None,
propagation_policy: str = "Background",
wait_for_completion: bool = True,
wait_timeout: int = 300,
**kwargs
): ...
def execute(self, context: Context) -> Any:
"""Delete the Kubernetes resources."""
...from airflow.providers.cncf.kubernetes.operators.resource import KubernetesCreateResourceOperator
# Create resources from YAML file
create_from_file = KubernetesCreateResourceOperator(
task_id='create_resources',
yaml_conf_file='/path/to/resources.yaml',
namespace='my-namespace',
wait_until_ready=True,
wait_timeout=300,
dag=dag
)# YAML configuration as string
yaml_config = """
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
namespace: default
data:
database_url: "postgresql://localhost:5432/mydb"
debug: "false"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app
namespace: default
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: app
image: my-app:latest
ports:
- containerPort: 8080
envFrom:
- configMapRef:
name: my-config
"""
create_from_yaml = KubernetesCreateResourceOperator(
task_id='create_from_yaml',
yaml_conf=yaml_config,
namespace='default',
dag=dag
)# Custom resource definition
custom_resource = {
'apiVersion': 'argoproj.io/v1alpha1',
'kind': 'Application',
'metadata': {
'name': 'my-application',
'namespace': 'argocd'
},
'spec': {
'source': {
'repoURL': 'https://github.com/my-org/my-app',
'path': 'k8s',
'targetRevision': 'HEAD'
},
'destination': {
'server': 'https://kubernetes.default.svc',
'namespace': 'default'
},
'syncPolicy': {
'automated': {
'prune': True,
'selfHeal': True
}
}
}
}
create_custom_resource = KubernetesCreateResourceOperator(
task_id='create_argocd_app',
custom_resource_definition=custom_resource,
namespace='argocd',
dag=dag
)# Multiple custom objects
custom_objects = [
{
'apiVersion': 'v1',
'kind': 'Secret',
'metadata': {
'name': 'db-credentials',
'namespace': 'default'
},
'type': 'Opaque',
'data': {
'username': 'dXNlcm5hbWU=', # base64 encoded
'password': 'cGFzc3dvcmQ=' # base64 encoded
}
},
{
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {
'name': 'my-service',
'namespace': 'default'
},
'spec': {
'selector': {
'app': 'my-app'
},
'ports': [
{
'port': 80,
'targetPort': 8080
}
]
}
}
]
create_multiple = KubernetesCreateResourceOperator(
task_id='create_multiple_objects',
custom_objects=custom_objects,
namespace='default',
dag=dag
)from airflow.providers.cncf.kubernetes.operators.resource import KubernetesDeleteResourceOperator
# Delete specific resource by name
delete_deployment = KubernetesDeleteResourceOperator(
task_id='delete_deployment',
api_version='apps/v1',
kind='Deployment',
name='my-app',
namespace='default',
wait_for_completion=True,
dag=dag
)# Delete resources by label selector
delete_by_labels = KubernetesDeleteResourceOperator(
task_id='delete_by_labels',
api_version='v1',
kind='Pod',
label_selector='app=my-app,environment=staging',
namespace='default',
propagation_policy='Foreground',
dag=dag
)# Delete resources defined in YAML file
delete_from_yaml = KubernetesDeleteResourceOperator(
task_id='delete_from_yaml',
yaml_conf_file='/path/to/resources-to-delete.yaml',
namespace='default',
grace_period_seconds=30,
dag=dag
)# Delete custom resource
delete_custom = KubernetesDeleteResourceOperator(
task_id='delete_custom_resource',
custom_resource_definition={
'apiVersion': 'argoproj.io/v1alpha1',
'kind': 'Application',
'metadata': {
'name': 'my-application',
'namespace': 'argocd'
}
},
namespace='argocd',
dag=dag
)# Complete resource lifecycle example
from airflow import DAG
from datetime import datetime
dag = DAG(
'resource_lifecycle',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Create resources
create_resources = KubernetesCreateResourceOperator(
task_id='create_app_resources',
yaml_conf_file='/path/to/app-resources.yaml',
namespace='production',
wait_until_ready=True,
wait_timeout=600,
dag=dag
)
# ... application processing tasks ...
# Clean up resources
cleanup_resources = KubernetesDeleteResourceOperator(
task_id='cleanup_resources',
yaml_conf_file='/path/to/app-resources.yaml',
namespace='production',
wait_for_completion=True,
dag=dag
)
# Set dependencies
create_resources >> cleanup_resources# Create resources and wait for readiness
create_with_wait = KubernetesCreateResourceOperator(
task_id='create_and_wait',
yaml_conf="""
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: database
namespace: default
spec:
serviceName: database
replicas: 3
selector:
matchLabels:
app: database
template:
metadata:
labels:
app: database
spec:
containers:
- name: postgres
image: postgres:13
env:
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: db-credentials
key: password
volumeMounts:
- name: data
mountPath: /var/lib/postgresql/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
""",
namespace='default',
wait_until_ready=True,
wait_timeout=900, # Wait up to 15 minutes for StatefulSet to be ready
dag=dag
)# Delete resources using field selector
delete_failed_pods = KubernetesDeleteResourceOperator(
task_id='delete_failed_pods',
api_version='v1',
kind='Pod',
field_selector='status.phase=Failed',
namespace='default',
propagation_policy='Background',
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes