CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes-asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations

Pending
Overview
Eval results
Files

utils.mddocs/

Utils

YAML processing utilities for creating and applying Kubernetes resources from YAML files or dictionaries with full async support. Provides convenient functions for deploying applications and managing resources declaratively.

Capabilities

YAML Processing Functions

Core functions for creating Kubernetes resources from YAML definitions.

async def create_from_yaml(k8s_client, yaml_file, verbose=False, namespace="default", **kwargs):
    """
    Create Kubernetes resources from YAML file.
    
    Reads YAML file containing one or more Kubernetes resource definitions
    and creates them in the cluster using the appropriate API calls.
    
    Parameters:
    - k8s_client: ApiClient, authenticated Kubernetes API client
    - yaml_file: str, path to YAML file containing resource definitions
    - verbose: bool, enable verbose output showing creation details
    - namespace: str, default namespace for resources without namespace specified
    - dry_run: str, dry run mode ('All', 'Server', or None)
    - field_manager: str, field manager name for server-side apply
    - **kwargs: additional parameters passed to create operations
    
    Returns:
    - list: Created resource objects
    
    Raises:
    - FailToCreateError: When resource creation fails
    - FileNotFoundError: When YAML file doesn't exist
    - yaml.YAMLError: When YAML parsing fails
    """

async def create_from_dict(k8s_client, data, verbose=False, namespace="default", **kwargs):
    """
    Create Kubernetes resources from dictionary or list of dictionaries.
    
    Takes Python dictionary representation(s) of Kubernetes resources
    and creates them in the cluster.
    
    Parameters:
    - k8s_client: ApiClient, authenticated Kubernetes API client
    - data: dict or list, resource definition(s) as dictionaries
    - verbose: bool, enable verbose output showing creation details
    - namespace: str, default namespace for resources without namespace specified
    - dry_run: str, dry run mode ('All', 'Server', or None)
    - field_manager: str, field manager name for server-side apply
    - **kwargs: additional parameters passed to create operations
    
    Returns:
    - list: Created resource objects
    
    Raises:
    - FailToCreateError: When resource creation fails
    - ValueError: When data format is invalid
    """

async def create_from_yaml_single_item(k8s_client, yaml_object, verbose=False, 
                                      namespace="default", **kwargs):
    """
    Create single Kubernetes resource from YAML object.
    
    Helper function for creating individual resources, used internally
    by create_from_yaml and create_from_dict.
    
    Parameters:
    - k8s_client: ApiClient, authenticated Kubernetes API client
    - yaml_object: dict, single resource definition
    - verbose: bool, enable verbose output
    - namespace: str, default namespace if not specified in resource
    - **kwargs: additional parameters passed to create operation
    
    Returns:
    - object: Created resource object
    
    Raises:
    - FailToCreateError: When resource creation fails
    """

Exception Classes

Specialized exceptions for YAML processing and resource creation errors.

class FailToCreateError(Exception):
    """
    Exception raised when Kubernetes resource creation fails.
    
    Provides detailed information about which resources failed
    and the reasons for failure.
    """
    
    def __init__(self, api_exceptions):
        """
        Initialize with list of API exceptions from failed operations.
        
        Parameters:
        - api_exceptions: list, API exceptions from failed create operations
        """
        self.api_exceptions = api_exceptions
        
    @property
    def message(self):
        """Formatted error message describing all failures."""
        
    def __str__(self):
        """String representation of the error."""

Usage Examples

Basic YAML File Application

import asyncio
from kubernetes_asyncio import client, config, utils

async def apply_yaml_file():
    await config.load_config()
    api_client = client.ApiClient()
    
    try:
        # Apply resources from YAML file
        created_resources = await utils.create_from_yaml(
            k8s_client=api_client,
            yaml_file="deployment.yaml",
            namespace="default",
            verbose=True
        )
        
        print(f"Successfully created {len(created_resources)} resources:")
        for resource in created_resources:
            print(f"  - {resource.kind}: {resource.metadata.name}")
            
    except utils.FailToCreateError as e:
        print(f"Failed to create resources: {e}")
        for api_exception in e.api_exceptions:
            print(f"  - {api_exception}")
    except FileNotFoundError:
        print("YAML file not found")
    finally:
        await api_client.close()

asyncio.run(apply_yaml_file())

Multi-Document YAML File

# Example: multi-document.yaml
"""
apiVersion: v1
kind: Namespace
metadata:
  name: myapp
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp-deployment
  namespace: myapp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
  template:
    metadata:
      labels:
        app: myapp
    spec:
      containers:
      - name: myapp
        image: nginx:latest
        ports:
        - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: myapp-service
  namespace: myapp
spec:
  selector:
    app: myapp
  ports:
  - port: 80
    targetPort: 80
  type: ClusterIP
"""

async def apply_multi_document():
    await config.load_config()
    api_client = client.ApiClient()
    
    try:
        # Apply multi-document YAML file
        resources = await utils.create_from_yaml(
            k8s_client=api_client,
            yaml_file="multi-document.yaml",
            verbose=True
        )
        
        print("Created resources:")
        for resource in resources:
            namespace = getattr(resource.metadata, 'namespace', 'cluster-wide')
            print(f"  - {resource.kind}/{resource.metadata.name} in {namespace}")
            
    except utils.FailToCreateError as e:
        print("Some resources failed to create:")
        for exception in e.api_exceptions:
            print(f"  - Error: {exception}")
    finally:
        await api_client.close()

asyncio.run(apply_multi_document())

Creating from Dictionary

async def create_from_dict_example():
    await config.load_config()
    api_client = client.ApiClient()
    
    # Define resources as Python dictionaries
    resources = [
        {
            "apiVersion": "v1",
            "kind": "ConfigMap", 
            "metadata": {
                "name": "app-config",
                "namespace": "default"
            },
            "data": {
                "database_url": "postgresql://localhost:5432/mydb",
                "debug": "true"
            }
        },
        {
            "apiVersion": "v1",
            "kind": "Secret",
            "metadata": {
                "name": "app-secrets",
                "namespace": "default"
            },
            "type": "Opaque",
            "data": {
                "password": "cGFzc3dvcmQ=",  # base64 encoded "password"
                "api_key": "YWJjZGVmZw=="    # base64 encoded "abcdefg"
            }
        },
        {
            "apiVersion": "apps/v1",
            "kind": "Deployment",
            "metadata": {
                "name": "myapp",
                "namespace": "default"
            },
            "spec": {
                "replicas": 2,
                "selector": {
                    "matchLabels": {"app": "myapp"}
                },
                "template": {
                    "metadata": {
                        "labels": {"app": "myapp"}
                    },
                    "spec": {
                        "containers": [{
                            "name": "myapp",
                            "image": "myapp:latest",
                            "env": [
                                {
                                    "name": "DATABASE_URL",
                                    "valueFrom": {
                                        "configMapKeyRef": {
                                            "name": "app-config",
                                            "key": "database_url"
                                        }
                                    }
                                },
                                {
                                    "name": "API_KEY",
                                    "valueFrom": {
                                        "secretKeyRef": {
                                            "name": "app-secrets",
                                            "key": "api_key"
                                        }
                                    }
                                }
                            ]
                        }]
                    }
                }
            }
        }
    ]
    
    try:
        created = await utils.create_from_dict(
            k8s_client=api_client,
            data=resources,
            namespace="default",
            verbose=True
        )
        
        print(f"Successfully created {len(created)} resources")
        
    except utils.FailToCreateError as e:
        print(f"Creation failed: {e}")
    finally:
        await api_client.close()

asyncio.run(create_from_dict_example())

Dry Run Operations

async def dry_run_example():
    await config.load_config()
    api_client = client.ApiClient()
    
    deployment_yaml = """
apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-deployment
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: test
  template:
    metadata:
      labels:
        app: test
    spec:
      containers:
      - name: test
        image: nginx:latest
"""
    
    # Save to temporary file
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
        f.write(deployment_yaml)
        temp_file = f.name
    
    try:
        # Perform dry run to validate without creating
        resources = await utils.create_from_yaml(
            k8s_client=api_client,
            yaml_file=temp_file,
            dry_run="All",  # Client-side dry run
            verbose=True
        )
        
        print("Dry run successful - resources would be created:")
        for resource in resources:
            print(f"  - {resource.kind}: {resource.metadata.name}")
            
    except utils.FailToCreateError as e:
        print("Dry run failed - issues found:")
        for exception in e.api_exceptions:
            print(f"  - {exception}")
    finally:
        import os
        os.unlink(temp_file)
        await api_client.close()

asyncio.run(dry_run_example())

Error Handling and Recovery

async def error_handling_example():
    await config.load_config()
    api_client = client.ApiClient()
    
    # YAML with intentional errors for demonstration
    problematic_resources = [
        {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": "valid-pod",
                "namespace": "default"
            },
            "spec": {
                "containers": [{
                    "name": "nginx",
                    "image": "nginx:latest"
                }]
            }
        },
        {
            "apiVersion": "v1", 
            "kind": "Pod",
            "metadata": {
                "name": "invalid-pod",
                "namespace": "nonexistent-namespace"  # This will fail
            },
            "spec": {
                "containers": [{
                    "name": "nginx",
                    "image": "nginx:latest"
                }]
            }
        },
        {
            "apiVersion": "apps/v1",
            "kind": "Deployment",
            "metadata": {
                "name": "invalid-deployment",
                "namespace": "default"
            },
            "spec": {
                # Missing required selector - this will fail
                "replicas": 1,
                "template": {
                    "metadata": {
                        "labels": {"app": "test"}
                    },
                    "spec": {
                        "containers": [{
                            "name": "nginx",
                            "image": "nginx:latest"
                        }]
                    }
                }
            }
        }
    ]
    
    try:
        created = await utils.create_from_dict(
            k8s_client=api_client,
            data=problematic_resources,
            verbose=True
        )
        
        print(f"Created {len(created)} resources successfully")
        
    except utils.FailToCreateError as e:
        print("Some resources failed to create:")
        
        # Analyze failures
        successful_resources = []
        failed_resources = []
        
        for i, exception in enumerate(e.api_exceptions):
            if exception is None:
                successful_resources.append(i)
                print(f"  ✓ Resource {i} created successfully")
            else:
                failed_resources.append((i, exception))
                print(f"  ✗ Resource {i} failed: {exception}")
        
        print(f"\nSummary: {len(successful_resources)} succeeded, {len(failed_resources)} failed")
        
        # Attempt to clean up successfully created resources if needed
        if successful_resources:
            print("Successfully created resources are still in the cluster")
            
    finally:
        await api_client.close()

asyncio.run(error_handling_example())

Advanced Usage with Custom Parameters

async def advanced_usage():
    await config.load_config()
    api_client = client.ApiClient()
    
    deployment_config = {
        "apiVersion": "apps/v1",
        "kind": "Deployment",
        "metadata": {
            "name": "advanced-app",
            "namespace": "production"
        },
        "spec": {
            "replicas": 5,
            "selector": {
                "matchLabels": {"app": "advanced-app"}
            },
            "template": {
                "metadata": {
                    "labels": {"app": "advanced-app"}
                },
                "spec": {
                    "containers": [{
                        "name": "app",
                        "image": "myapp:v2.0.0",
                        "resources": {
                            "requests": {
                                "memory": "256Mi",
                                "cpu": "200m"
                            },
                            "limits": {
                                "memory": "512Mi", 
                                "cpu": "500m"
                            }
                        }
                    }]
                }
            }
        }
    }
    
    try:
        # Create with custom field manager for server-side apply tracking
        created = await utils.create_from_dict(
            k8s_client=api_client,
            data=deployment_config,
            namespace="production",
            field_manager="my-deployment-tool",
            verbose=True
        )
        
        deployment = created[0]
        print(f"Created deployment: {deployment.metadata.name}")
        print(f"  Replicas: {deployment.spec.replicas}")
        print(f"  Image: {deployment.spec.template.spec.containers[0].image}")
        
    except utils.FailToCreateError as e:
        print(f"Deployment failed: {e}")
    finally:
        await api_client.close()

asyncio.run(advanced_usage())

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes-asyncio

docs

client-apis.md

configuration.md

dynamic-client.md

index.md

leader-election.md

streaming.md

utils.md

watch.md

tile.json