Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete Azure Container Services integration providing comprehensive container orchestration capabilities including Azure Container Instances management, Container Registry operations, and Container Volume handling for containerized workloads.
Primary interface for Azure Container Instances management, providing authenticated connections and container lifecycle operations.
class AzureContainerInstanceHook(BaseHook):
"""
Hook for Azure Container Instances management.
Provides methods for creating, managing, and monitoring container instances
with support for various container configurations and networking options.
"""
def get_conn(self) -> ContainerInstanceManagementClient:
"""
Get authenticated Azure Container Instance management client.
Returns:
ContainerInstanceManagementClient: Container management client instance
"""
def create_or_update(
self,
resource_group_name: str,
container_group_name: str,
container_group: ContainerGroup
) -> ContainerGroup:
"""
Create or update a container group.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group
container_group (ContainerGroup): Container group configuration
Returns:
ContainerGroup: Created or updated container group
"""
def get_state_exitcode_details(
self,
resource_group_name: str,
container_group_name: str,
container_name: str
) -> tuple[str, int, str]:
"""
Get container state, exit code, and detailed information.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group
container_name (str): Name of the container
Returns:
tuple[str, int, str]: State, exit code, and details
"""
def get_logs(
self,
resource_group_name: str,
container_group_name: str,
container_name: str,
tail: int | None = None
) -> str:
"""
Get logs from a running or terminated container.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group
container_name (str): Name of the container
tail (int | None): Number of log lines to retrieve from the end
Returns:
str: Container logs
"""
def delete(
self,
resource_group_name: str,
container_group_name: str
) -> None:
"""
Delete a container group.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group to delete
"""
def exists(
self,
resource_group_name: str,
container_group_name: str
) -> bool:
"""
Check if a container group exists.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group
Returns:
bool: True if container group exists, False otherwise
"""
def get_state(
self,
resource_group_name: str,
container_group_name: str,
container_name: str
) -> str:
"""
Get the current state of a container.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group
container_name (str): Name of the container
Returns:
str: Current container state (Running, Terminated, Waiting, etc.)
"""
def get_messages(
self,
resource_group_name: str,
container_group_name: str,
container_name: str
) -> list[str]:
"""
Get status messages from a container.
Args:
resource_group_name (str): Name of the resource group
container_group_name (str): Name of the container group
container_name (str): Name of the container
Returns:
list[str]: List of container status messages
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Azure Container Instance connection.
Returns:
tuple[bool, str]: Success status and message
"""Hook for Azure Container Registry operations providing image management and credential handling capabilities.
class AzureContainerRegistryHook(BaseHook):
"""
Hook for Azure Container Registry operations.
Provides methods for managing container registries, retrieving credentials,
and handling image operations.
"""
def get_conn(self) -> ImageRegistryCredential:
"""
Get registry credentials for authentication.
Returns:
ImageRegistryCredential: Registry credentials for authentication
"""
def get_registry_credentials(
self,
registry_name: str,
resource_group_name: str
) -> dict[str, str]:
"""
Get container registry admin credentials.
Args:
registry_name (str): Name of the container registry
resource_group_name (str): Name of the resource group
Returns:
dict[str, str]: Registry credentials including username and passwords
"""
def get_login_server(
self,
registry_name: str,
resource_group_name: str
) -> str:
"""
Get the login server URL for the container registry.
Args:
registry_name (str): Name of the container registry
resource_group_name (str): Name of the resource group
Returns:
str: Login server URL
"""
def list_repositories(
self,
registry_name: str,
resource_group_name: str
) -> list[str]:
"""
List all repositories in the container registry.
Args:
registry_name (str): Name of the container registry
resource_group_name (str): Name of the resource group
Returns:
list[str]: List of repository names
"""
def list_tags(
self,
registry_name: str,
resource_group_name: str,
repository_name: str
) -> list[str]:
"""
List all tags for a specific repository.
Args:
registry_name (str): Name of the container registry
resource_group_name (str): Name of the resource group
repository_name (str): Name of the repository
Returns:
list[str]: List of image tags
"""
def delete_repository(
self,
registry_name: str,
resource_group_name: str,
repository_name: str
) -> None:
"""
Delete a repository from the container registry.
Args:
registry_name (str): Name of the container registry
resource_group_name (str): Name of the resource group
repository_name (str): Name of the repository to delete
"""
def delete_tag(
self,
registry_name: str,
resource_group_name: str,
repository_name: str,
tag: str
) -> None:
"""
Delete a specific tag from a repository.
Args:
registry_name (str): Name of the container registry
resource_group_name (str): Name of the resource group
repository_name (str): Name of the repository
tag (str): Tag to delete
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Azure Container Registry connection.
Returns:
tuple[bool, str]: Success status and message
"""Hook for Azure Container Volume management providing persistent storage capabilities for containers.
class AzureContainerVolumeHook(BaseHook):
"""
Hook for Azure Container Volume management.
Provides methods for creating and managing persistent volumes
for Azure Container Instances with various storage backends.
"""
def get_conn(self) -> Any:
"""
Get authenticated connection for volume operations.
Returns:
Any: Volume management client
"""
def create_file_share_volume(
self,
volume_name: str,
share_name: str,
storage_account_name: str,
storage_account_key: str,
read_only: bool = False
) -> Volume:
"""
Create a volume backed by Azure File Share.
Args:
volume_name (str): Name of the volume
share_name (str): Name of the Azure File Share
storage_account_name (str): Storage account name
storage_account_key (str): Storage account key
read_only (bool): Whether the volume is read-only (default: False)
Returns:
Volume: Created volume configuration
"""
def create_secret_volume(
self,
volume_name: str,
secrets: dict[str, str]
) -> Volume:
"""
Create a volume for storing secrets.
Args:
volume_name (str): Name of the volume
secrets (dict[str, str]): Dictionary of secret files and content
Returns:
Volume: Created secret volume configuration
"""
def create_empty_dir_volume(
self,
volume_name: str
) -> Volume:
"""
Create an empty directory volume.
Args:
volume_name (str): Name of the volume
Returns:
Volume: Created empty directory volume configuration
"""
def create_git_repo_volume(
self,
volume_name: str,
repository_url: str,
directory_name: str | None = None,
revision: str | None = None
) -> Volume:
"""
Create a volume from a Git repository.
Args:
volume_name (str): Name of the volume
repository_url (str): URL of the Git repository
directory_name (str | None): Directory to clone into
revision (str | None): Specific revision to clone
Returns:
Volume: Created Git repository volume configuration
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Azure Container Volume connection.
Returns:
tuple[bool, str]: Success status and message
"""Execute Azure Container Services operations as Airflow tasks with comprehensive container management capabilities.
class AzureContainerInstancesOperator(BaseOperator):
"""
Runs containers on Azure Container Instances.
Supports creating and managing containerized workloads with custom
configurations, networking, and volume mounting options.
"""
def __init__(
self,
*,
ci_conn_id: str = "azure_container_instance_default",
registry_conn_id: str | None = None,
resource_group: str,
name: str,
image: str,
region: str = "West US",
environment_variables: dict[str, str] | None = None,
secured_variables: list[str] | None = None,
volumes: list[Volume] | None = None,
volume_mounts: list[VolumeMount] | None = None,
memory_in_gb: float = 1.5,
cpu: float = 1.0,
gpu: GpuResource | None = None,
command: list[str] | None = None,
restart_policy: str = "Never",
os_type: str = "Linux",
ports: list[ContainerPort] | None = None,
dns_name_label: str | None = None,
ip_address_type: str = "Public",
network_profile: ContainerGroupNetworkProfile | None = None,
tags: dict[str, str] | None = None,
**kwargs
):
"""
Initialize Azure Container Instances operator.
Args:
ci_conn_id (str): Airflow connection ID for Container Instances
registry_conn_id (str | None): Connection ID for container registry
resource_group (str): Azure resource group name
name (str): Container group name
image (str): Container image to run
region (str): Azure region (default: "West US")
environment_variables (dict[str, str] | None): Environment variables
secured_variables (list[str] | None): Secured environment variables
volumes (list[Volume] | None): Volumes to mount
volume_mounts (list[VolumeMount] | None): Volume mount configurations
memory_in_gb (float): Memory allocation in GB (default: 1.5)
cpu (float): CPU allocation (default: 1.0)
gpu (GpuResource | None): GPU resource configuration
command (list[str] | None): Command to run in container
restart_policy (str): Restart policy (default: "Never")
os_type (str): Operating system type (default: "Linux")
ports (list[ContainerPort] | None): Port configurations
dns_name_label (str | None): DNS name label for public IP
ip_address_type (str): IP address type (default: "Public")
network_profile (ContainerGroupNetworkProfile | None): Network profile
tags (dict[str, str] | None): Resource tags
"""
def execute(self, context: Context) -> str:
"""
Execute container instance creation and management.
Args:
context (Context): Airflow task context
Returns:
str: Container execution results or status
"""
def on_kill(self) -> None:
"""Clean up container resources on task termination."""from airflow import DAG
from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def check_container_results(**context):
"""Check and process container execution results."""
result = context['task_instance'].xcom_pull(task_ids='run_data_processing')
print(f"Container execution result: {result}")
dag = DAG(
'azure_container_workflow',
default_args={
'owner': 'container-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
},
description='Azure Container Instances workflow',
schedule_interval=timedelta(hours=6),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Run data processing container
run_container = AzureContainerInstancesOperator(
task_id='run_data_processing',
ci_conn_id='azure_container_conn',
resource_group='data-processing-rg',
name='data-processor-{{ ds_nodash }}',
image='myregistry.azurecr.io/data-processor:latest',
region='East US',
memory_in_gb=4.0,
cpu=2.0,
environment_variables={
'INPUT_PATH': '/data/input',
'OUTPUT_PATH': '/data/output',
'PROCESSING_MODE': 'batch'
},
command=[
'python',
'process_data.py',
'--input', '/data/input',
'--output', '/data/output'
],
restart_policy='Never',
dag=dag
)
# Process results
check_results = PythonOperator(
task_id='check_results',
python_callable=check_container_results,
dag=dag
)
run_container >> check_resultsfrom airflow import DAG
from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
from airflow.providers.microsoft.azure.hooks.container_volume import AzureContainerVolumeHook
from azure.mgmt.containerinstance.models import (
Volume, VolumeMount, ContainerPort, GpuResource,
ContainerGroupNetworkProfile, ResourceRequests, ResourceRequirements
)
from datetime import datetime, timedelta
def setup_volumes():
"""Set up volumes for container workload."""
volume_hook = AzureContainerVolumeHook(azure_container_volume_conn_id='volume_conn')
# Create file share volume for persistent storage
data_volume = volume_hook.create_file_share_volume(
volume_name='data-volume',
share_name='container-data',
storage_account_name='mystorageaccount',
storage_account_key='storage-key-here',
read_only=False
)
# Create secret volume for configuration
config_volume = volume_hook.create_secret_volume(
volume_name='config-volume',
secrets={
'config.json': '{"database": "prod", "debug": false}',
'secrets.env': 'DB_PASSWORD=secret123\nAPI_KEY=key456'
}
)
return [data_volume, config_volume]
dag = DAG(
'advanced_container_workflow',
default_args={
'owner': 'ml-team',
'retries': 1,
'retry_delay': timedelta(minutes=3)
},
description='Advanced container workflow with GPU and volumes',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Configure volumes
volumes = [
Volume(
name='data-volume',
azure_file={
'share_name': 'ml-data',
'storage_account_name': 'mlstorageaccount',
'storage_account_key': 'storage-key'
}
),
Volume(
name='model-volume',
azure_file={
'share_name': 'ml-models',
'storage_account_name': 'mlstorageaccount',
'storage_account_key': 'storage-key',
'read_only': True
}
)
]
# Configure volume mounts
volume_mounts = [
VolumeMount(
name='data-volume',
mount_path='/data',
read_only=False
),
VolumeMount(
name='model-volume',
mount_path='/models',
read_only=True
)
]
# Configure GPU resources for ML workload
gpu_resource = GpuResource(
count=1,
sku='K80'
)
# Configure network ports
ports = [
ContainerPort(port=8080, protocol='TCP'),
ContainerPort(port=8443, protocol='TCP')
]
# Run ML training container with GPU
ml_training = AzureContainerInstancesOperator(
task_id='run_ml_training',
ci_conn_id='azure_container_conn',
registry_conn_id='azure_container_registry_conn',
resource_group='ml-training-rg',
name='ml-trainer-{{ ds_nodash }}',
image='mlregistry.azurecr.io/trainer:v2.1',
region='East US',
memory_in_gb=16.0,
cpu=4.0,
gpu=gpu_resource,
volumes=volumes,
volume_mounts=volume_mounts,
ports=ports,
environment_variables={
'DATASET_PATH': '/data/training_set.csv',
'MODEL_OUTPUT_PATH': '/data/models',
'EPOCHS': '100',
'BATCH_SIZE': '32',
'LEARNING_RATE': '0.001'
},
secured_variables=['DB_PASSWORD', 'API_SECRET_KEY'],
command=[
'python',
'train_model.py',
'--config', '/config/training_config.json',
'--data-path', '/data',
'--model-path', '/models',
'--gpu'
],
restart_policy='Never',
dns_name_label='ml-trainer-{{ ds_nodash }}',
tags={
'project': 'machine-learning',
'environment': 'production',
'cost-center': 'research'
},
dag=dag
)
# Run inference container
ml_inference = AzureContainerInstancesOperator(
task_id='run_ml_inference',
ci_conn_id='azure_container_conn',
registry_conn_id='azure_container_registry_conn',
resource_group='ml-inference-rg',
name='ml-inference-{{ ds_nodash }}',
image='mlregistry.azurecr.io/inference:v2.1',
region='East US',
memory_in_gb=8.0,
cpu=2.0,
volumes=[volumes[0]], # Only data volume needed
volume_mounts=[volume_mounts[0]],
environment_variables={
'MODEL_PATH': '/data/models/latest_model.pkl',
'INPUT_PATH': '/data/inference_input.csv',
'OUTPUT_PATH': '/data/inference_output.csv'
},
command=[
'python',
'run_inference.py',
'--model', '/data/models/latest_model.pkl',
'--input', '/data/inference_input.csv',
'--output', '/data/inference_output.csv'
],
restart_policy='Never',
dag=dag
)
ml_training >> ml_inferencefrom airflow import DAG
from airflow.providers.microsoft.azure.hooks.container_registry import AzureContainerRegistryHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def manage_container_images():
"""Manage container registry images."""
registry_hook = AzureContainerRegistryHook(
azure_container_registry_conn_id='registry_conn'
)
registry_name = 'myregistry'
resource_group = 'container-rg'
# Get registry credentials
credentials = registry_hook.get_registry_credentials(
registry_name=registry_name,
resource_group_name=resource_group
)
print(f"Registry username: {credentials['username']}")
# Get login server
login_server = registry_hook.get_login_server(
registry_name=registry_name,
resource_group_name=resource_group
)
print(f"Login server: {login_server}")
# List repositories
repositories = registry_hook.list_repositories(
registry_name=registry_name,
resource_group_name=resource_group
)
print(f"Found repositories: {repositories}")
# List tags for each repository
for repo in repositories:
tags = registry_hook.list_tags(
registry_name=registry_name,
resource_group_name=resource_group,
repository_name=repo
)
print(f"Repository {repo} tags: {tags}")
# Clean up old tags (keep only latest 5)
if len(tags) > 5:
old_tags = sorted(tags)[:-5] # Keep latest 5
for old_tag in old_tags:
try:
registry_hook.delete_tag(
registry_name=registry_name,
resource_group_name=resource_group,
repository_name=repo,
tag=old_tag
)
print(f"Deleted old tag: {repo}:{old_tag}")
except Exception as e:
print(f"Failed to delete tag {old_tag}: {e}")
def monitor_container_execution():
"""Monitor running container instances."""
from airflow.providers.microsoft.azure.hooks.container_instance import AzureContainerInstanceHook
aci_hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
resource_group = 'container-rg'
container_group = 'data-processor'
container_name = 'processor'
# Check if container exists
if aci_hook.exists(resource_group, container_group):
# Get container state
state = aci_hook.get_state(resource_group, container_group, container_name)
print(f"Container state: {state}")
# Get logs if container is running or terminated
if state in ['Running', 'Terminated']:
logs = aci_hook.get_logs(
resource_group,
container_group,
container_name,
tail=100
)
print(f"Container logs:\n{logs}")
# Get detailed state information
state, exit_code, details = aci_hook.get_state_exitcode_details(
resource_group,
container_group,
container_name
)
print(f"State: {state}, Exit Code: {exit_code}, Details: {details}")
# Get status messages
messages = aci_hook.get_messages(resource_group, container_group, container_name)
for message in messages:
print(f"Status message: {message}")
dag = DAG(
'container_registry_management',
default_args={
'owner': 'devops-team',
'retries': 1,
'retry_delay': timedelta(minutes=2)
},
description='Container registry and instance management',
schedule_interval=timedelta(hours=12),
start_date=datetime(2024, 1, 1),
catchup=False
)
manage_images = PythonOperator(
task_id='manage_registry_images',
python_callable=manage_container_images,
dag=dag
)
monitor_containers = PythonOperator(
task_id='monitor_containers',
python_callable=monitor_container_execution,
dag=dag
)
manage_images >> monitor_containersazure_container_instance)Configure Azure Container Instances connections in Airflow:
# Connection configuration for Container Instances
{
"conn_id": "azure_container_instance_default",
"conn_type": "azure_container_instance",
"extra": {
"subscriptionId": "your-subscription-id",
"tenantId": "your-tenant-id",
"clientId": "your-client-id",
"clientSecret": "your-client-secret"
}
}azure_container_registry)Configure Azure Container Registry connections in Airflow:
# Connection configuration for Container Registry
{
"conn_id": "azure_container_registry_default",
"conn_type": "azure_container_registry",
"login": "myregistry", # Registry name
"host": "myregistry.azurecr.io", # Registry URL
"extra": {
"subscriptionId": "your-subscription-id",
"resourceGroupName": "your-resource-group",
"tenantId": "your-tenant-id",
"clientId": "your-client-id",
"clientSecret": "your-client-secret"
}
}azure_container_volume)Configure Azure Container Volume connections in Airflow:
# Connection configuration for Container Volume
{
"conn_id": "azure_container_volume_default",
"conn_type": "azure_container_volume",
"extra": {
"tenantId": "your-tenant-id",
"clientId": "your-client-id",
"clientSecret": "your-client-secret"
}
}All container services support multiple authentication methods:
Service Principal Authentication:
extra = {
"tenantId": "your-tenant-id",
"clientId": "your-client-id",
"clientSecret": "your-client-secret"
}Managed Identity Authentication:
extra = {
"managedIdentityClientId": "your-managed-identity-client-id"
}Azure CLI Authentication:
extra = {
"use_azure_cli": True
}from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
from azure.mgmt.containerinstance.models import ContainerGroup
from airflow.providers.microsoft.azure.hooks.container_instance import AzureContainerInstanceHook
def robust_container_operations():
"""Demonstrate error handling patterns for container operations."""
hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
resource_group = 'my-rg'
container_group_name = 'my-container-group'
try:
# Check if container group exists before creating
if hook.exists(resource_group, container_group_name):
print("Container group already exists, deleting first...")
hook.delete(resource_group, container_group_name)
# Wait for deletion to complete
import time
time.sleep(30)
# Create container group
container_group = ContainerGroup(
location='East US',
containers=[],
os_type='Linux'
)
result = hook.create_or_update(
resource_group_name=resource_group,
container_group_name=container_group_name,
container_group=container_group
)
print(f"Container group created: {result.name}")
except ResourceNotFoundError as e:
print(f"Resource not found: {e}")
# Handle missing resource group or other dependencies
except ResourceExistsError as e:
print(f"Resource already exists: {e}")
# Handle existing resources
except Exception as e:
print(f"Unexpected error: {e}")
# Cleanup on failure
try:
hook.delete(resource_group, container_group_name)
except:
pass # Ignore cleanup errors
raisedef test_container_connections():
"""Test all container service connections."""
# Test Container Instances connection
try:
aci_hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
success, message = aci_hook.test_connection()
print(f"Container Instances: {message}")
except Exception as e:
print(f"Container Instances connection failed: {e}")
# Test Container Registry connection
try:
acr_hook = AzureContainerRegistryHook(azure_container_registry_conn_id='acr_conn')
success, message = acr_hook.test_connection()
print(f"Container Registry: {message}")
except Exception as e:
print(f"Container Registry connection failed: {e}")
# Test Container Volume connection
try:
volume_hook = AzureContainerVolumeHook(azure_container_volume_conn_id='volume_conn')
success, message = volume_hook.test_connection()
print(f"Container Volume: {message}")
except Exception as e:
print(f"Container Volume connection failed: {e}")def optimize_container_performance():
"""Demonstrate performance optimization techniques."""
# Use appropriate resource sizing
memory_optimized_config = {
'memory_in_gb': 8.0, # Adequate memory for workload
'cpu': 2.0, # Balanced CPU allocation
}
# Configure restart policies appropriately
batch_job_config = {
'restart_policy': 'Never', # For batch jobs
}
long_running_config = {
'restart_policy': 'Always', # For services
}
# Use local SSD storage for temporary files
volume_config = [
Volume(
name='temp-volume',
empty_dir={} # Uses local SSD storage
)
]
# Pre-pull images for faster startup
# This would be done in the registry/image management process
return {
'optimized_resources': memory_optimized_config,
'batch_config': batch_job_config,
'service_config': long_running_config,
'volume_config': volume_config
}
def implement_container_monitoring():
"""Implement comprehensive container monitoring."""
hook = AzureContainerInstanceHook(azure_container_instance_conn_id='aci_conn')
def monitor_container_health(resource_group: str, container_group: str, container_name: str):
"""Monitor container health and performance."""
# Get current state
state = hook.get_state(resource_group, container_group, container_name)
# Get recent logs for error detection
logs = hook.get_logs(resource_group, container_group, container_name, tail=50)
# Check for error patterns in logs
error_keywords = ['ERROR', 'FATAL', 'Exception', 'Failed']
errors_found = []
for line in logs.split('\n'):
for keyword in error_keywords:
if keyword in line:
errors_found.append(line.strip())
# Get detailed status information
state, exit_code, details = hook.get_state_exitcode_details(
resource_group, container_group, container_name
)
health_status = {
'state': state,
'exit_code': exit_code,
'details': details,
'errors_found': errors_found,
'healthy': state == 'Running' and len(errors_found) == 0
}
return health_status
return monitor_container_healthThis comprehensive documentation covers all Azure Container Services capabilities in the Apache Airflow Microsoft Azure Provider, including Container Instances, Container Registry, and Container Volume management with practical usage patterns and optimization techniques.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure