CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

api-integration.mddocs/

API Integration

Connect to Kubernetes clusters with comprehensive API access supporting both synchronous and asynchronous operations. The hooks provide low-level access to Kubernetes resources and enable custom integrations.

Capabilities

Kubernetes Hook

Synchronous hook for interacting with Kubernetes API, supporting cluster connections, resource management, and pod operations.

class KubernetesHook(BaseHook):
    """
    Creates Kubernetes API connection with multiple configuration options.
    
    Args:
        conn_id (str): Airflow connection ID. Default: 'kubernetes_default'
        client_configuration (Configuration, optional): Kubernetes client configuration
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
        disable_verify_ssl (bool): Disable SSL verification. Default: False
        disable_tcp_keepalive (bool): Disable TCP keepalive. Default: False
    """
    def __init__(
        self,
        conn_id: str = "kubernetes_default",
        client_configuration: Configuration | None = None,
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool | None = None,
        disable_verify_ssl: bool = False,
        disable_tcp_keepalive: bool = False,
        **kwargs
    ): ...

    def get_conn(self) -> ApiClient:
        """Returns kubernetes api session."""
        ...

    def create_custom_object(
        self,
        group: str,
        version: str,
        plural: str,
        body: dict,
        namespace: str | None = None
    ) -> dict:
        """Create custom resource definition object."""
        ...

    def get_pod(self, name: str, namespace: str) -> V1Pod:
        """Read pod object from kubernetes API."""
        ...

    def create_job(self, job: V1Job, **kwargs) -> V1Job:
        """Run Job."""
        ...

    def get_pod_logs(
        self,
        pod_name: str,
        container: str | None = "",
        namespace: str | None = None,
    ) -> str:
        """Retrieve a container's log from the specified pod."""
        ...

    def get_pod_log_stream(
        self,
        pod_name: str,
        container: str | None = "",
        namespace: str | None = None,
    ) -> tuple[watch.Watch, Generator[str, None, None]]:
        """Retrieve a log stream for a container in a kubernetes pod."""
        ...

    def get_namespaced_pod_list(
        self,
        label_selector: str | None = "",
        namespace: str | None = None,
        **kwargs
    ) -> list[V1Pod]:
        """Get list of pods in a namespace."""
        ...

Asynchronous Kubernetes Hook

Asynchronous hook for non-blocking Kubernetes API operations, enabling efficient concurrent processing and improved performance.

class AsyncKubernetesHook(KubernetesHook):
    """
    Asynchronous Kubernetes API hook for non-blocking operations.
    
    Args:
        config_dict (dict, optional): Configuration dictionary
        All other parameters inherited from KubernetesHook
    """
    def __init__(
        self,
        config_dict: dict | None = None,
        *args,
        **kwargs
    ): ...

    async def get_conn(self) -> AsyncApiClient:
        """Returns asynchronous kubernetes api session."""
        ...

    async def get_pod(self, name: str, namespace: str) -> V1Pod:
        """Get pod's object asynchronously."""
        ...

    def get_custom_object(
        self,
        group: str,
        version: str,
        plural: str,
        name: str,
        namespace: str | None = None
    ) -> dict:
        """Get custom resource definition object."""
        ...

    def delete_custom_object(
        self,
        group: str,
        version: str,
        plural: str,
        name: str,
        namespace: str | None = None,
        body: V1DeleteOptions | None = None
    ) -> dict:
        """Delete custom resource definition object."""
        ...

    def get_namespace(self) -> str:
        """Return the namespace defined in connection."""
        ...

    def get_pod_log_stream(
        self,
        pod_name: str,
        container: str | None = None,
        namespace: str | None = None,
        **kwargs
    ):
        """Retrieve log stream for container in pod."""
        ...

    def get_pod_logs(
        self,
        pod_name: str,
        container: str | None = None,
        namespace: str | None = None,
        **kwargs
    ) -> str:
        """Retrieve container's log from pod."""
        ...

    def get_pod(self, name: str, namespace: str) -> V1Pod:
        """Read pod object from kubernetes API."""
        ...

    def create_job(self, job: V1Job) -> V1Job:
        """Run Kubernetes Job."""
        ...

    def get_job(self, job_name: str, namespace: str) -> V1Job:
        """Get Job of specified name and namespace."""
        ...

    def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:
        """Get job status."""
        ...

    def wait_until_job_complete(
        self,
        job_name: str,
        namespace: str,
        timeout: int = 3600
    ) -> V1Job:
        """Block until job is complete or failed."""
        ...

    def is_job_complete(self, job: V1Job) -> bool:
        """Check if job is complete."""
        ...

    def is_job_failed(self, job: V1Job) -> bool:
        """Check if job failed."""
        ...

    def is_job_successful(self, job: V1Job) -> bool:
        """Check if job completed successfully."""
        ...

    def apply_from_yaml_file(
        self,
        yaml_file: str | None = None,
        yaml_objects: list[dict] | None = None,
        verbose: bool = False,
        namespace: str = "default"
    ) -> list[dict]:
        """Perform action from yaml file."""
        ...

    def test_connection(self) -> tuple[bool, str]:
        """Test the connection."""
        ...

Asynchronous Kubernetes Hook

Asynchronous hook for non-blocking Kubernetes operations, supporting concurrent resource management and monitoring.

class AsyncKubernetesHook(KubernetesHook):
    """
    Hook to use Kubernetes SDK asynchronously.
    
    Inherits all configuration from KubernetesHook with async operation support.
    """
    def __init__(self, **kwargs): ...

    @asynccontextmanager
    async def get_conn(self) -> AsyncGenerator[ApiClient, None]:
        """Async context manager for API client."""
        ...

    async def get_pod(self, name: str, namespace: str) -> V1Pod:
        """Get pod's object asynchronously."""
        ...

    async def delete_pod(self, name: str, namespace: str) -> V1Status:
        """Delete pod's object asynchronously."""
        ...

    async def read_logs(
        self,
        name: str,
        namespace: str,
        container: str | None = None,
        follow: bool = False,
        **kwargs
    ) -> str:
        """Read logs inside the pod."""
        ...

    async def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:
        """Get job's status object asynchronously."""
        ...

    async def wait_until_job_complete(
        self,
        job_name: str,
        namespace: str,
        timeout: int = 3600,
        check_interval: int = 10
    ) -> V1Job:
        """Wait until job complete asynchronously."""
        ...

    async def wait_until_container_complete(
        self,
        pod_name: str,
        namespace: str,
        container_name: str | None = None,
        timeout: int = 3600
    ) -> None:
        """Wait for container completion."""
        ...

    async def wait_until_container_started(
        self,
        pod_name: str,
        namespace: str,
        container_name: str | None = None,
        timeout: int = 120
    ) -> None:
        """Wait for container to start."""
        ...

Hook Constants

Constants used by the Kubernetes hooks for configuration and status checking.

# Resource loading constant
LOADING_KUBE_CONFIG_FILE_RESOURCE: str

# Job status condition types
JOB_FINAL_STATUS_CONDITION_TYPES: list[str]
JOB_STATUS_CONDITION_TYPES: list[str]

Usage Examples

Basic Hook Usage

from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook

# Connect to Kubernetes cluster
hook = KubernetesHook(
    conn_id='kubernetes_default',
    in_cluster=False,
    config_file='/path/to/kubeconfig'
)

# Get API client
client = hook.get_conn()

# Test connection
is_connected, message = hook.test_connection()
print(f"Connection status: {is_connected}, Message: {message}")

Pod Operations with Hook

# Get pod information
pod = hook.get_pod(name='my-pod', namespace='default')
print(f"Pod status: {pod.status.phase}")

# Get pod logs
logs = hook.get_pod_logs(
    pod_name='my-pod',
    namespace='default',
    container='main-container'
)
print(logs)

# Stream pod logs
for log_line in hook.get_pod_log_stream(
    pod_name='my-pod',
    namespace='default',
    follow=True
):
    print(log_line)

Job Management with Hook

from kubernetes.client import V1Job, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Container

# Create job specification
job_spec = V1Job(
    api_version='batch/v1',
    kind='Job',
    metadata={'name': 'my-job', 'namespace': 'default'},
    spec=V1JobSpec(
        template=V1PodTemplateSpec(
            spec=V1PodSpec(
                containers=[
                    V1Container(
                        name='worker',
                        image='busybox:latest',
                        command=['sh', '-c', 'echo "Job completed"']
                    )
                ],
                restart_policy='Never'
            )
        )
    )
)

# Create and monitor job
created_job = hook.create_job(job_spec)
print(f"Job created: {created_job.metadata.name}")

# Wait for completion
completed_job = hook.wait_until_job_complete(
    job_name='my-job',
    namespace='default',
    timeout=600
)

# Check job status
if hook.is_job_successful(completed_job):
    print("Job completed successfully")
elif hook.is_job_failed(completed_job):
    print("Job failed")

Custom Resource Operations

# Create custom resource
custom_resource = {
    'apiVersion': 'apps.example.com/v1',
    'kind': 'MyApp',
    'metadata': {
        'name': 'my-app-instance',
        'namespace': 'default'
    },
    'spec': {
        'replicas': 3,
        'image': 'my-app:latest'
    }
}

# Create the custom resource
created_cr = hook.create_custom_object(
    group='apps.example.com',
    version='v1',
    plural='myapps',
    body=custom_resource,
    namespace='default'
)

# Get custom resource
cr = hook.get_custom_object(
    group='apps.example.com',
    version='v1',
    plural='myapps',
    name='my-app-instance',
    namespace='default'
)

# Delete custom resource
hook.delete_custom_object(
    group='apps.example.com',
    version='v1',
    plural='myapps',
    name='my-app-instance',
    namespace='default'
)

YAML Resource Management

# Apply resources from YAML file
applied_resources = hook.apply_from_yaml_file(
    yaml_file='/path/to/resources.yaml',
    namespace='default',
    verbose=True
)

# Apply resources from YAML objects
yaml_objects = [
    {
        'apiVersion': 'v1',
        'kind': 'ConfigMap',
        'metadata': {'name': 'my-config', 'namespace': 'default'},
        'data': {'key': 'value'}
    }
]

hook.apply_from_yaml_file(
    yaml_objects=yaml_objects,
    namespace='default'
)

Asynchronous Operations

import asyncio
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook

async def async_operations():
    # Create async hook
    async_hook = AsyncKubernetesHook(
        conn_id='kubernetes_default',
        in_cluster=True
    )

    # Use async context manager for connection
    async with async_hook.get_conn() as client:
        # Get pod asynchronously
        pod = await async_hook.get_pod(
            name='my-pod',
            namespace='default'
        )
        
        # Read logs asynchronously
        logs = await async_hook.read_logs(
            name='my-pod',
            namespace='default',
            container='main',
            follow=False
        )
        
        # Monitor job completion
        completed_job = await async_hook.wait_until_job_complete(
            job_name='my-job',
            namespace='default',
            timeout=600,
            check_interval=5
        )
        
        # Wait for container to start
        await async_hook.wait_until_container_started(
            pod_name='my-pod',
            namespace='default',
            container_name='main',
            timeout=120
        )

# Run async operations
asyncio.run(async_operations())

Multi-cluster Configuration

# Connect to different clusters
prod_hook = KubernetesHook(
    conn_id='k8s_prod',
    cluster_context='production-cluster',
    config_file='/home/user/.kube/config'
)

staging_hook = KubernetesHook(
    conn_id='k8s_staging', 
    cluster_context='staging-cluster',
    config_file='/home/user/.kube/config'
)

# In-cluster configuration
in_cluster_hook = KubernetesHook(
    conn_id='k8s_in_cluster',
    in_cluster=True
)

Connection Testing and Debugging

# Test connection with detailed output
is_connected, message = hook.test_connection()
if not is_connected:
    print(f"Connection failed: {message}")
else:
    print("Successfully connected to Kubernetes cluster")

# Get namespace from connection
namespace = hook.get_namespace()
print(f"Default namespace: {namespace}")

# Disable SSL verification for testing
test_hook = KubernetesHook(
    conn_id='k8s_test',
    disable_verify_ssl=True,
    disable_tcp_keepalive=True
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json