CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

resource-management.mddocs/

Resource Management

Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions. Resource management operators provide comprehensive lifecycle control over Kubernetes objects.

Capabilities

Resource Base Operations

Base class providing common functionality for Kubernetes resource operations.

class KubernetesResourceBaseOperator(BaseOperator):
    """
    Base class for Kubernetes resource operations.
    
    Args:
        namespace (str): Kubernetes namespace. Defaults to 'default'
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
        conn_id (str): Airflow connection ID. Default: 'kubernetes_default'
    """
    def __init__(
        self,
        namespace: str = "default",
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool | None = None,
        conn_id: str = "kubernetes_default",
        **kwargs
    ): ...

Resource Creation

Create Kubernetes resources from YAML files, dictionaries, or custom resource definitions.

class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
    """
    Create Kubernetes resources from YAML.
    
    Args:
        yaml_conf (str, optional): YAML configuration as string
        yaml_conf_file (str, optional): Path to YAML configuration file
        custom_resource_definition (dict, optional): Custom resource definition as dict
        namespace (str): Target namespace. Defaults to 'default'
        custom_objects (list[dict], optional): List of custom objects to create
        namespaced (bool): Whether resources are namespaced. Default: True
        wait_until_ready (bool): Wait for resources to be ready. Default: False
        wait_timeout (int): Timeout for waiting in seconds. Default: 300
    """
    def __init__(
        self,
        yaml_conf: str | None = None,
        yaml_conf_file: str | None = None,
        custom_resource_definition: dict | None = None,
        custom_objects: list[dict] | None = None,
        namespaced: bool = True,
        wait_until_ready: bool = False,
        wait_timeout: int = 300,
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Create the Kubernetes resources."""
        ...

Resource Deletion

Delete Kubernetes resources by name, labels, or from YAML specifications.

class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
    """
    Delete Kubernetes resources.
    
    Args:
        yaml_conf (str, optional): YAML configuration for resources to delete
        yaml_conf_file (str, optional): Path to YAML file with resources to delete
        custom_resource_definition (dict, optional): Custom resource definition to delete
        custom_objects (list[dict], optional): List of custom objects to delete
        api_version (str, optional): API version of resources to delete
        kind (str, optional): Kind of resources to delete
        name (str, optional): Name of specific resource to delete
        label_selector (str, optional): Label selector for bulk deletion
        field_selector (str, optional): Field selector for resource selection
        grace_period_seconds (int, optional): Grace period for deletion
        propagation_policy (str): Deletion propagation policy. Default: 'Background'
        wait_for_completion (bool): Wait for deletion to complete. Default: True
        wait_timeout (int): Timeout for waiting in seconds. Default: 300
    """
    def __init__(
        self,
        yaml_conf: str | None = None,
        yaml_conf_file: str | None = None,
        custom_resource_definition: dict | None = None,
        custom_objects: list[dict] | None = None,
        api_version: str | None = None,
        kind: str | None = None,
        name: str | None = None,
        label_selector: str | None = None,
        field_selector: str | None = None,
        grace_period_seconds: int | None = None,
        propagation_policy: str = "Background",
        wait_for_completion: bool = True,
        wait_timeout: int = 300,
        **kwargs
    ): ...

    def execute(self, context: Context) -> Any:
        """Delete the Kubernetes resources."""
        ...

Usage Examples

Creating Resources from YAML File

from airflow.providers.cncf.kubernetes.operators.resource import KubernetesCreateResourceOperator

# Create resources from YAML file
create_from_file = KubernetesCreateResourceOperator(
    task_id='create_resources',
    yaml_conf_file='/path/to/resources.yaml',
    namespace='my-namespace',
    wait_until_ready=True,
    wait_timeout=300,
    dag=dag
)

Creating Resources from YAML String

# YAML configuration as string
yaml_config = """
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: default
data:
  database_url: "postgresql://localhost:5432/mydb"
  debug: "false"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-app
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      containers:
      - name: app
        image: my-app:latest
        ports:
        - containerPort: 8080
        envFrom:
        - configMapRef:
            name: my-config
"""

create_from_yaml = KubernetesCreateResourceOperator(
    task_id='create_from_yaml',
    yaml_conf=yaml_config,
    namespace='default',
    dag=dag
)

Creating Custom Resources

# Custom resource definition
custom_resource = {
    'apiVersion': 'argoproj.io/v1alpha1',
    'kind': 'Application',
    'metadata': {
        'name': 'my-application',
        'namespace': 'argocd'
    },
    'spec': {
        'source': {
            'repoURL': 'https://github.com/my-org/my-app',
            'path': 'k8s',
            'targetRevision': 'HEAD'
        },
        'destination': {
            'server': 'https://kubernetes.default.svc',
            'namespace': 'default'
        },
        'syncPolicy': {
            'automated': {
                'prune': True,
                'selfHeal': True
            }
        }
    }
}

create_custom_resource = KubernetesCreateResourceOperator(
    task_id='create_argocd_app',
    custom_resource_definition=custom_resource,
    namespace='argocd',
    dag=dag
)

Creating Multiple Custom Objects

# Multiple custom objects
custom_objects = [
    {
        'apiVersion': 'v1',
        'kind': 'Secret',
        'metadata': {
            'name': 'db-credentials',
            'namespace': 'default'
        },
        'type': 'Opaque',
        'data': {
            'username': 'dXNlcm5hbWU=',  # base64 encoded
            'password': 'cGFzc3dvcmQ='   # base64 encoded
        }
    },
    {
        'apiVersion': 'v1',
        'kind': 'Service',
        'metadata': {
            'name': 'my-service',
            'namespace': 'default'
        },
        'spec': {
            'selector': {
                'app': 'my-app'
            },
            'ports': [
                {
                    'port': 80,
                    'targetPort': 8080
                }
            ]
        }
    }
]

create_multiple = KubernetesCreateResourceOperator(
    task_id='create_multiple_objects',
    custom_objects=custom_objects,
    namespace='default',
    dag=dag
)

Deleting Resources by Name

from airflow.providers.cncf.kubernetes.operators.resource import KubernetesDeleteResourceOperator

# Delete specific resource by name
delete_deployment = KubernetesDeleteResourceOperator(
    task_id='delete_deployment',
    api_version='apps/v1',
    kind='Deployment',
    name='my-app',
    namespace='default',
    wait_for_completion=True,
    dag=dag
)

Deleting Resources by Label Selector

# Delete resources by label selector
delete_by_labels = KubernetesDeleteResourceOperator(
    task_id='delete_by_labels',
    api_version='v1',
    kind='Pod',
    label_selector='app=my-app,environment=staging',
    namespace='default',
    propagation_policy='Foreground',
    dag=dag
)

Deleting Resources from YAML

# Delete resources defined in YAML file
delete_from_yaml = KubernetesDeleteResourceOperator(
    task_id='delete_from_yaml',
    yaml_conf_file='/path/to/resources-to-delete.yaml',
    namespace='default',
    grace_period_seconds=30,
    dag=dag
)

Deleting Custom Resources

# Delete custom resource
delete_custom = KubernetesDeleteResourceOperator(
    task_id='delete_custom_resource',
    custom_resource_definition={
        'apiVersion': 'argoproj.io/v1alpha1',
        'kind': 'Application',
        'metadata': {
            'name': 'my-application',
            'namespace': 'argocd'
        }
    },
    namespace='argocd',
    dag=dag
)

Resource Cleanup with Dependencies

# Complete resource lifecycle example
from airflow import DAG
from datetime import datetime

dag = DAG(
    'resource_lifecycle',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Create resources
create_resources = KubernetesCreateResourceOperator(
    task_id='create_app_resources',
    yaml_conf_file='/path/to/app-resources.yaml',
    namespace='production',
    wait_until_ready=True,
    wait_timeout=600,
    dag=dag
)

# ... application processing tasks ...

# Clean up resources
cleanup_resources = KubernetesDeleteResourceOperator(
    task_id='cleanup_resources',
    yaml_conf_file='/path/to/app-resources.yaml',
    namespace='production',
    wait_for_completion=True,
    dag=dag
)

# Set dependencies
create_resources >> cleanup_resources

Advanced Resource Creation with Waiting

# Create resources and wait for readiness
create_with_wait = KubernetesCreateResourceOperator(
    task_id='create_and_wait',
    yaml_conf="""
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: database
  namespace: default
spec:
  serviceName: database
  replicas: 3
  selector:
    matchLabels:
      app: database
  template:
    metadata:
      labels:
        app: database
    spec:
      containers:
      - name: postgres
        image: postgres:13
        env:
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: password
        volumeMounts:
        - name: data
          mountPath: /var/lib/postgresql/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi
""",
    namespace='default',
    wait_until_ready=True,
    wait_timeout=900,  # Wait up to 15 minutes for StatefulSet to be ready
    dag=dag
)

Field Selector Deletion

# Delete resources using field selector
delete_failed_pods = KubernetesDeleteResourceOperator(
    task_id='delete_failed_pods',
    api_version='v1',
    kind='Pod',
    field_selector='status.phase=Failed',
    namespace='default',
    propagation_policy='Background',
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json