CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

container-services.mddocs/

Azure Container Services

Complete Azure Container Services integration providing comprehensive container orchestration capabilities including Azure Container Instances management, Container Registry operations, and Container Volume handling for containerized workloads.

Capabilities

Azure Container Instances Hook

Primary interface for Azure Container Instances management, providing authenticated connections and container lifecycle operations.

class AzureContainerInstanceHook(BaseHook):
    """
    Hook for Azure Container Instances management.
    
    Provides methods for creating, managing, and monitoring container instances
    with support for various container configurations and networking options.
    """
    
    def get_conn(self) -> ContainerInstanceManagementClient:
        """
        Get authenticated Azure Container Instance management client.
        
        Returns:
            ContainerInstanceManagementClient: Container management client instance
        """
    
    def create_or_update(
        self,
        resource_group_name: str,
        container_group_name: str,
        container_group: ContainerGroup
    ) -> ContainerGroup:
        """
        Create or update a container group.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group
            container_group (ContainerGroup): Container group configuration
            
        Returns:
            ContainerGroup: Created or updated container group
        """
    
    def get_state_exitcode_details(
        self,
        resource_group_name: str,
        container_group_name: str,
        container_name: str
    ) -> tuple[str, int, str]:
        """
        Get container state, exit code, and detailed information.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group
            container_name (str): Name of the container
            
        Returns:
            tuple[str, int, str]: State, exit code, and details
        """
    
    def get_logs(
        self,
        resource_group_name: str,
        container_group_name: str,
        container_name: str,
        tail: int | None = None
    ) -> str:
        """
        Get logs from a running or terminated container.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group
            container_name (str): Name of the container
            tail (int | None): Number of log lines to retrieve from the end
            
        Returns:
            str: Container logs
        """
    
    def delete(
        self,
        resource_group_name: str,
        container_group_name: str
    ) -> None:
        """
        Delete a container group.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group to delete
        """
    
    def exists(
        self,
        resource_group_name: str,
        container_group_name: str
    ) -> bool:
        """
        Check if a container group exists.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group
            
        Returns:
            bool: True if container group exists, False otherwise
        """
    
    def get_state(
        self,
        resource_group_name: str,
        container_group_name: str,
        container_name: str
    ) -> str:
        """
        Get the current state of a container.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group
            container_name (str): Name of the container
            
        Returns:
            str: Current container state (Running, Terminated, Waiting, etc.)
        """
    
    def get_messages(
        self,
        resource_group_name: str,
        container_group_name: str,
        container_name: str
    ) -> list[str]:
        """
        Get status messages from a container.
        
        Args:
            resource_group_name (str): Name of the resource group
            container_group_name (str): Name of the container group
            container_name (str): Name of the container
            
        Returns:
            list[str]: List of container status messages
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Azure Container Instance connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """

Azure Container Registry Hook

Hook for Azure Container Registry operations providing image management and credential handling capabilities.

class AzureContainerRegistryHook(BaseHook):
    """
    Hook for Azure Container Registry operations.
    
    Provides methods for managing container registries, retrieving credentials,
    and handling image operations.
    """
    
    def get_conn(self) -> ImageRegistryCredential:
        """
        Get registry credentials for authentication.
        
        Returns:
            ImageRegistryCredential: Registry credentials for authentication
        """
    
    def get_registry_credentials(
        self,
        registry_name: str,
        resource_group_name: str
    ) -> dict[str, str]:
        """
        Get container registry admin credentials.
        
        Args:
            registry_name (str): Name of the container registry
            resource_group_name (str): Name of the resource group
            
        Returns:
            dict[str, str]: Registry credentials including username and passwords
        """
    
    def get_login_server(
        self,
        registry_name: str,
        resource_group_name: str
    ) -> str:
        """
        Get the login server URL for the container registry.
        
        Args:
            registry_name (str): Name of the container registry
            resource_group_name (str): Name of the resource group
            
        Returns:
            str: Login server URL
        """
    
    def list_repositories(
        self,
        registry_name: str,
        resource_group_name: str
    ) -> list[str]:
        """
        List all repositories in the container registry.
        
        Args:
            registry_name (str): Name of the container registry
            resource_group_name (str): Name of the resource group
            
        Returns:
            list[str]: List of repository names
        """
    
    def list_tags(
        self,
        registry_name: str,
        resource_group_name: str,
        repository_name: str
    ) -> list[str]:
        """
        List all tags for a specific repository.
        
        Args:
            registry_name (str): Name of the container registry
            resource_group_name (str): Name of the resource group
            repository_name (str): Name of the repository
            
        Returns:
            list[str]: List of image tags
        """
    
    def delete_repository(
        self,
        registry_name: str,
        resource_group_name: str,
        repository_name: str
    ) -> None:
        """
        Delete a repository from the container registry.
        
        Args:
            registry_name (str): Name of the container registry
            resource_group_name (str): Name of the resource group
            repository_name (str): Name of the repository to delete
        """
    
    def delete_tag(
        self,
        registry_name: str,
        resource_group_name: str,
        repository_name: str,
        tag: str
    ) -> None:
        """
        Delete a specific tag from a repository.
        
        Args:
            registry_name (str): Name of the container registry
            resource_group_name (str): Name of the resource group
            repository_name (str): Name of the repository
            tag (str): Tag to delete
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Azure Container Registry connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """

Azure Container Volume Hook

Hook for Azure Container Volume management providing persistent storage capabilities for containers.

class AzureContainerVolumeHook(BaseHook):
    """
    Hook for Azure Container Volume management.
    
    Provides methods for creating and managing persistent volumes
    for Azure Container Instances with various storage backends.
    """
    
    def get_conn(self) -> Any:
        """
        Get authenticated connection for volume operations.
        
        Returns:
            Any: Volume management client
        """
    
    def create_file_share_volume(
        self,
        volume_name: str,
        share_name: str,
        storage_account_name: str,
        storage_account_key: str,
        read_only: bool = False
    ) -> Volume:
        """
        Create a volume backed by Azure File Share.
        
        Args:
            volume_name (str): Name of the volume
            share_name (str): Name of the Azure File Share
            storage_account_name (str): Storage account name
            storage_account_key (str): Storage account key
            read_only (bool): Whether the volume is read-only (default: False)
            
        Returns:
            Volume: Created volume configuration
        """
    
    def create_secret_volume(
        self,
        volume_name: str,
        secrets: dict[str, str]
    ) -> Volume:
        """
        Create a volume for storing secrets.
        
        Args:
            volume_name (str): Name of the volume
            secrets (dict[str, str]): Dictionary of secret files and content
            
        Returns:
            Volume: Created secret volume configuration
        """
    
    def create_empty_dir_volume(
        self,
        volume_name: str
    ) -> Volume:
        """
        Create an empty directory volume.
        
        Args:
            volume_name (str): Name of the volume
            
        Returns:
            Volume: Created empty directory volume configuration
        """
    
    def create_git_repo_volume(
        self,
        volume_name: str,
        repository_url: str,
        directory_name: str | None = None,
        revision: str | None = None
    ) -> Volume:
        """
        Create a volume from a Git repository.
        
        Args:
            volume_name (str): Name of the volume
            repository_url (str): URL of the Git repository
            directory_name (str | None): Directory to clone into
            revision (str | None): Specific revision to clone
            
        Returns:
            Volume: Created Git repository volume configuration
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Azure Container Volume connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """

Container Services Operators

Execute Azure Container Services operations as Airflow tasks with comprehensive container management capabilities.

Azure Container Instances Operator

class AzureContainerInstancesOperator(BaseOperator):
    """
    Runs containers on Azure Container Instances.
    
    Supports creating and managing containerized workloads with custom
    configurations, networking, and volume mounting options.
    """
    
    def __init__(
        self,
        *,
        ci_conn_id: str = "azure_container_instance_default",
        registry_conn_id: str | None = None,
        resource_group: str,
        name: str,
        image: str,
        region: str = "West US",
        environment_variables: dict[str, str] | None = None,
        secured_variables: list[str] | None = None,
        volumes: list[Volume] | None = None,
        volume_mounts: list[VolumeMount] | None = None,
        memory_in_gb: float = 1.5,
        cpu: float = 1.0,
        gpu: GpuResource | None = None,
        command: list[str] | None = None,
        restart_policy: str = "Never",
        os_type: str = "Linux",
        ports: list[ContainerPort] | None = None,
        dns_name_label: str | None = None,
        ip_address_type: str = "Public",
        network_profile: ContainerGroupNetworkProfile | None = None,
        tags: dict[str, str] | None = None,
        **kwargs
    ):
        """
        Initialize Azure Container Instances operator.
        
        Args:
            ci_conn_id (str): Airflow connection ID for Container Instances
            registry_conn_id (str | None): Connection ID for container registry
            resource_group (str): Azure resource group name
            name (str): Container group name
            image (str): Container image to run
            region (str): Azure region (default: "West US")
            environment_variables (dict[str, str] | None): Environment variables
            secured_variables (list[str] | None): Secured environment variables
            volumes (list[Volume] | None): Volumes to mount
            volume_mounts (list[VolumeMount] | None): Volume mount configurations
            memory_in_gb (float): Memory allocation in GB (default: 1.5)
            cpu (float): CPU allocation (default: 1.0)
            gpu (GpuResource | None): GPU resource configuration
            command (list[str] | None): Command to run in container
            restart_policy (str): Restart policy (default: "Never")
            os_type (str): Operating system type (default: "Linux")
            ports (list[ContainerPort] | None): Port configurations
            dns_name_label (str | None): DNS name label for public IP
            ip_address_type (str): IP address type (default: "Public")
            network_profile (ContainerGroupNetworkProfile | None): Network profile
            tags (dict[str, str] | None): Resource tags
        """
    
    def execute(self, context: Context) -> str:
        """
        Execute container instance creation and management.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            str: Container execution results or status
        """
    
    def on_kill(self) -> None:
        """Clean up container resources on task termination."""

Usage Examples

Basic Container Execution

from airflow import DAG
from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def check_container_results(**context):
    """Check and process container execution results."""
    result = context['task_instance'].xcom_pull(task_ids='run_data_processing')
    print(f"Container execution result: {result}")

dag = DAG(
    'azure_container_workflow',
    default_args={
        'owner': 'container-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5)
    },
    description='Azure Container Instances workflow',
    schedule_interval=timedelta(hours=6),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Run data processing container
run_container = AzureContainerInstancesOperator(
    task_id='run_data_processing',
    ci_conn_id='azure_container_conn',
    resource_group='data-processing-rg',
    name='data-processor-{{ ds_nodash }}',
    image='myregistry.azurecr.io/data-processor:latest',
    region='East US',
    memory_in_gb=4.0,
    cpu=2.0,
    environment_variables={
        'INPUT_PATH': '/data/input',
        'OUTPUT_PATH': '/data/output',
        'PROCESSING_MODE': 'batch'
    },
    command=[
        'python',
        'process_data.py',
        '--input', '/data/input',
        '--output', '/data/output'
    ],
    restart_policy='Never',
    dag=dag
)

# Process results
check_results = PythonOperator(
    task_id='check_results',
    python_callable=check_container_results,
    dag=dag
)

run_container >> check_results

Advanced Container Configuration

from airflow import DAG
from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
from airflow.providers.microsoft.azure.hooks.container_volume import AzureContainerVolumeHook
from azure.mgmt.containerinstance.models import (
    Volume, VolumeMount, ContainerPort, GpuResource,
    ContainerGroupNetworkProfile, ResourceRequests, ResourceRequirements
)
from datetime import datetime, timedelta

def setup_volumes():
    """Set up volumes for container workload."""
    volume_hook = AzureContainerVolumeHook(azure_container_volume_conn_id='volume_conn')
    
    # Create file share volume for persistent storage
    data_volume = volume_hook.create_file_share_volume(
        volume_name='data-volume',
        share_name='container-data',
        storage_account_name='mystorageaccount',
        storage_account_key='storage-key-here',
        read_only=False
    )
    
    # Create secret volume for configuration
    config_volume = volume_hook.create_secret_volume(
        volume_name='config-volume',
        secrets={
            'config.json': '{"database": "prod", "debug": false}',
            'secrets.env': 'DB_PASSWORD=secret123\nAPI_KEY=key456'
        }
    )
    
    return [data_volume, config_volume]

dag = DAG(
    'advanced_container_workflow',
    default_args={
        'owner': 'ml-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=3)
    },
    description='Advanced container workflow with GPU and volumes',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Configure volumes
volumes = [
    Volume(
        name='data-volume',
        azure_file={
            'share_name': 'ml-data',
            'storage_account_name': 'mlstorageaccount',
            'storage_account_key': 'storage-key'
        }
    ),
    Volume(
        name='model-volume',
        azure_file={
            'share_name': 'ml-models', 
            'storage_account_name': 'mlstorageaccount',
            'storage_account_key': 'storage-key',
            'read_only': True
        }
    )
]

# Configure volume mounts
volume_mounts = [
    VolumeMount(
        name='data-volume',
        mount_path='/data',
        read_only=False
    ),
    VolumeMount(
        name='model-volume',
        mount_path='/models',
        read_only=True
    )
]

# Configure GPU resources for ML workload
gpu_resource = GpuResource(
    count=1,
    sku='K80'
)

# Configure network ports
ports = [
    ContainerPort(port=8080, protocol='TCP'),
    ContainerPort(port=8443, protocol='TCP')
]

# Run ML training container with GPU
ml_training = AzureContainerInstancesOperator(
    task_id='run_ml_training',
    ci_conn_id='azure_container_conn',
    registry_conn_id='azure_container_registry_conn',
    resource_group='ml-training-rg',
    name='ml-trainer-{{ ds_nodash }}',
    image='mlregistry.azurecr.io/trainer:v2.1',
    region='East US',
    memory_in_gb=16.0,
    cpu=4.0,
    gpu=gpu_resource,
    volumes=volumes,
    volume_mounts=volume_mounts,
    ports=ports,
    environment_variables={
        'DATASET_PATH': '/data/training_set.csv',
        'MODEL_OUTPUT_PATH': '/data/models',
        'EPOCHS': '100',
        'BATCH_SIZE': '32',
        'LEARNING_RATE': '0.001'
    },
    secured_variables=['DB_PASSWORD', 'API_SECRET_KEY'],
    command=[
        'python',
        'train_model.py',
        '--config', '/config/training_config.json',
        '--data-path', '/data',
        '--model-path', '/models',
        '--gpu'
    ],
    restart_policy='Never',
    dns_name_label='ml-trainer-{{ ds_nodash }}',
    tags={
        'project': 'machine-learning',
        'environment': 'production',
        'cost-center': 'research'
    },
    dag=dag
)

# Run inference container
ml_inference = AzureContainerInstancesOperator(
    task_id='run_ml_inference',
    ci_conn_id='azure_container_conn',
    registry_conn_id='azure_container_registry_conn',
    resource_group='ml-inference-rg',
    name='ml-inference-{{ ds_nodash }}',
    image='mlregistry.azurecr.io/inference:v2.1',
    region='East US',
    memory_in_gb=8.0,
    cpu=2.0,
    volumes=[volumes[0]],  # Only data volume needed
    volume_mounts=[volume_mounts[0]],
    environment_variables={
        'MODEL_PATH': '/data/models/latest_model.pkl',
        'INPUT_PATH': '/data/inference_input.csv',
        'OUTPUT_PATH': '/data/inference_output.csv'
    },
    command=[
        'python',
        'run_inference.py',
        '--model', '/data/models/latest_model.pkl',
        '--input', '/data/inference_input.csv',
        '--output', '/data/inference_output.csv'
    ],
    restart_policy='Never',
    dag=dag
)

ml_training >> ml_inference

Container Registry Integration

from airflow import DAG
from airflow.providers.microsoft.azure.hooks.container_registry import AzureContainerRegistryHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def manage_container_images():
    """Manage container registry images."""
    registry_hook = AzureContainerRegistryHook(
        azure_container_registry_conn_id='registry_conn'
    )
    
    registry_name = 'myregistry'
    resource_group = 'container-rg'
    
    # Get registry credentials
    credentials = registry_hook.get_registry_credentials(
        registry_name=registry_name,
        resource_group_name=resource_group
    )
    print(f"Registry username: {credentials['username']}")
    
    # Get login server
    login_server = registry_hook.get_login_server(
        registry_name=registry_name,
        resource_group_name=resource_group
    )
    print(f"Login server: {login_server}")
    
    # List repositories
    repositories = registry_hook.list_repositories(
        registry_name=registry_name,
        resource_group_name=resource_group
    )
    print(f"Found repositories: {repositories}")
    
    # List tags for each repository
    for repo in repositories:
        tags = registry_hook.list_tags(
            registry_name=registry_name,
            resource_group_name=resource_group,
            repository_name=repo
        )
        print(f"Repository {repo} tags: {tags}")
        
        # Clean up old tags (keep only latest 5)
        if len(tags) > 5:
            old_tags = sorted(tags)[:-5]  # Keep latest 5
            for old_tag in old_tags:
                try:
                    registry_hook.delete_tag(
                        registry_name=registry_name,
                        resource_group_name=resource_group,
                        repository_name=repo,
                        tag=old_tag
                    )
                    print(f"Deleted old tag: {repo}:{old_tag}")
                except Exception as e:
                    print(f"Failed to delete tag {old_tag}: {e}")

def monitor_container_execution():
    """Monitor running container instances."""
    from airflow.providers.microsoft.azure.hooks.container_instance import AzureContainerInstanceHook
    
    aci_hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
    
    resource_group = 'container-rg'
    container_group = 'data-processor'
    container_name = 'processor'
    
    # Check if container exists
    if aci_hook.exists(resource_group, container_group):
        # Get container state
        state = aci_hook.get_state(resource_group, container_group, container_name)
        print(f"Container state: {state}")
        
        # Get logs if container is running or terminated
        if state in ['Running', 'Terminated']:
            logs = aci_hook.get_logs(
                resource_group,
                container_group,
                container_name,
                tail=100
            )
            print(f"Container logs:\n{logs}")
        
        # Get detailed state information
        state, exit_code, details = aci_hook.get_state_exitcode_details(
            resource_group,
            container_group,
            container_name
        )
        print(f"State: {state}, Exit Code: {exit_code}, Details: {details}")
        
        # Get status messages
        messages = aci_hook.get_messages(resource_group, container_group, container_name)
        for message in messages:
            print(f"Status message: {message}")

dag = DAG(
    'container_registry_management',
    default_args={
        'owner': 'devops-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=2)
    },
    description='Container registry and instance management',
    schedule_interval=timedelta(hours=12),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

manage_images = PythonOperator(
    task_id='manage_registry_images',
    python_callable=manage_container_images,
    dag=dag
)

monitor_containers = PythonOperator(
    task_id='monitor_containers',
    python_callable=monitor_container_execution,
    dag=dag
)

manage_images >> monitor_containers

Connection Configuration

Container Instances Connection (azure_container_instance)

Configure Azure Container Instances connections in Airflow:

# Connection configuration for Container Instances
{
    "conn_id": "azure_container_instance_default",
    "conn_type": "azure_container_instance",
    "extra": {
        "subscriptionId": "your-subscription-id",
        "tenantId": "your-tenant-id",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret"
    }
}

Container Registry Connection (azure_container_registry)

Configure Azure Container Registry connections in Airflow:

# Connection configuration for Container Registry
{
    "conn_id": "azure_container_registry_default",
    "conn_type": "azure_container_registry",
    "login": "myregistry",  # Registry name
    "host": "myregistry.azurecr.io",  # Registry URL
    "extra": {
        "subscriptionId": "your-subscription-id",
        "resourceGroupName": "your-resource-group",
        "tenantId": "your-tenant-id",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret"
    }
}

Container Volume Connection (azure_container_volume)

Configure Azure Container Volume connections in Airflow:

# Connection configuration for Container Volume
{
    "conn_id": "azure_container_volume_default",
    "conn_type": "azure_container_volume",
    "extra": {
        "tenantId": "your-tenant-id",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret"
    }
}

Authentication Methods

All container services support multiple authentication methods:

  1. Service Principal Authentication:

    extra = {
        "tenantId": "your-tenant-id",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret"
    }
  2. Managed Identity Authentication:

    extra = {
        "managedIdentityClientId": "your-managed-identity-client-id"
    }
  3. Azure CLI Authentication:

    extra = {
        "use_azure_cli": True
    }

Error Handling

Common Exception Patterns

from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
from azure.mgmt.containerinstance.models import ContainerGroup
from airflow.providers.microsoft.azure.hooks.container_instance import AzureContainerInstanceHook

def robust_container_operations():
    """Demonstrate error handling patterns for container operations."""
    hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
    
    resource_group = 'my-rg'
    container_group_name = 'my-container-group'
    
    try:
        # Check if container group exists before creating
        if hook.exists(resource_group, container_group_name):
            print("Container group already exists, deleting first...")
            hook.delete(resource_group, container_group_name)
            # Wait for deletion to complete
            import time
            time.sleep(30)
        
        # Create container group
        container_group = ContainerGroup(
            location='East US',
            containers=[],
            os_type='Linux'
        )
        
        result = hook.create_or_update(
            resource_group_name=resource_group,
            container_group_name=container_group_name,
            container_group=container_group
        )
        print(f"Container group created: {result.name}")
        
    except ResourceNotFoundError as e:
        print(f"Resource not found: {e}")
        # Handle missing resource group or other dependencies
        
    except ResourceExistsError as e:
        print(f"Resource already exists: {e}")
        # Handle existing resources
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # Cleanup on failure
        try:
            hook.delete(resource_group, container_group_name)
        except:
            pass  # Ignore cleanup errors
        raise

Connection Testing

def test_container_connections():
    """Test all container service connections."""
    
    # Test Container Instances connection
    try:
        aci_hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
        success, message = aci_hook.test_connection()
        print(f"Container Instances: {message}")
    except Exception as e:
        print(f"Container Instances connection failed: {e}")
    
    # Test Container Registry connection
    try:
        acr_hook = AzureContainerRegistryHook(azure_container_registry_conn_id='acr_conn')
        success, message = acr_hook.test_connection()
        print(f"Container Registry: {message}")
    except Exception as e:
        print(f"Container Registry connection failed: {e}")
    
    # Test Container Volume connection
    try:
        volume_hook = AzureContainerVolumeHook(azure_container_volume_conn_id='volume_conn')
        success, message = volume_hook.test_connection()
        print(f"Container Volume: {message}")
    except Exception as e:
        print(f"Container Volume connection failed: {e}")

Performance Considerations

Optimizing Container Workloads

def optimize_container_performance():
    """Demonstrate performance optimization techniques."""
    
    # Use appropriate resource sizing
    memory_optimized_config = {
        'memory_in_gb': 8.0,  # Adequate memory for workload
        'cpu': 2.0,           # Balanced CPU allocation
    }
    
    # Configure restart policies appropriately
    batch_job_config = {
        'restart_policy': 'Never',  # For batch jobs
    }
    
    long_running_config = {
        'restart_policy': 'Always',  # For services
    }
    
    # Use local SSD storage for temporary files
    volume_config = [
        Volume(
            name='temp-volume',
            empty_dir={}  # Uses local SSD storage
        )
    ]
    
    # Pre-pull images for faster startup
    # This would be done in the registry/image management process
    
    return {
        'optimized_resources': memory_optimized_config,
        'batch_config': batch_job_config,
        'service_config': long_running_config,
        'volume_config': volume_config
    }

def implement_container_monitoring():
    """Implement comprehensive container monitoring."""
    hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
    
    def monitor_container_health(resource_group: str, container_group: str, container_name: str):
        """Monitor container health and performance."""
        
        # Get current state
        state = hook.get_state(resource_group, container_group, container_name)
        
        # Get recent logs for error detection
        logs = hook.get_logs(resource_group, container_group, container_name, tail=50)
        
        # Check for error patterns in logs
        error_keywords = ['ERROR', 'FATAL', 'Exception', 'Failed']
        errors_found = []
        
        for line in logs.split('\n'):
            for keyword in error_keywords:
                if keyword in line:
                    errors_found.append(line.strip())
        
        # Get detailed status information
        state, exit_code, details = hook.get_state_exitcode_details(
            resource_group, container_group, container_name
        )
        
        health_status = {
            'state': state,
            'exit_code': exit_code,
            'details': details,
            'errors_found': errors_found,
            'healthy': state == 'Running' and len(errors_found) == 0
        }
        
        return health_status
    
    return monitor_container_health

This comprehensive documentation covers all Azure Container Services capabilities in the Apache Airflow Microsoft Azure Provider, including Container Instances, Container Registry, and Container Volume management with practical usage patterns and optimization techniques.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json