Microsoft Azure Container Service Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Node pool snapshot management for backup, restore, template creation scenarios, and consistent deployments enabling disaster recovery, infrastructure-as-code patterns, and standardized cluster configurations. Snapshots capture the exact state of node pools for reliable reproduction and operational consistency.
Create, update, and delete node pool snapshots for backup and template scenarios.
def create_or_update(
resource_group_name: str,
resource_name: str,
parameters: Snapshot
) -> Snapshot:
"""
Creates or updates a node pool snapshot.
Args:
resource_group_name: Azure resource group name
resource_name: Snapshot name
parameters: Complete snapshot configuration
Returns:
Snapshot: Created or updated snapshot
"""
def delete(resource_group_name: str, resource_name: str) -> None:
"""
Deletes a snapshot.
Args:
resource_group_name: Azure resource group name
resource_name: Snapshot name
"""
def update_tags(
resource_group_name: str,
resource_name: str,
parameters: TagsObject
) -> Snapshot:
"""
Updates snapshot tags for organization and management.
Args:
resource_group_name: Azure resource group name
resource_name: Snapshot name
parameters: Tags to update
Returns:
Snapshot: Updated snapshot with new tags
"""List and retrieve snapshots for backup management and template discovery.
def list() -> ItemPaged[Snapshot]:
"""
Lists all snapshots in the subscription.
Returns:
ItemPaged[Snapshot]: Paginated snapshot list across all resource groups
"""
def list_by_resource_group(resource_group_name: str) -> ItemPaged[Snapshot]:
"""
Lists snapshots in a specific resource group.
Args:
resource_group_name: Azure resource group name
Returns:
ItemPaged[Snapshot]: Paginated snapshot list for the resource group
"""
def get(resource_group_name: str, resource_name: str) -> Snapshot:
"""
Gets a specific snapshot with complete configuration details.
Args:
resource_group_name: Azure resource group name
resource_name: Snapshot name
Returns:
Snapshot: Complete snapshot configuration and metadata
"""class Snapshot:
"""
Node pool snapshot definition with configuration and metadata.
Attributes:
location: Azure region where snapshot is stored
tags: Resource tags for organization
creation_data: Source information for snapshot creation
snapshot_type: Type of snapshot ('NodePool')
kubernetes_version: Kubernetes version captured in snapshot (read-only)
node_image_version: Node image version captured (read-only)
vm_size: VM size of nodes in snapshot (read-only)
os_type: Operating system type (read-only)
os_sku: Operating system SKU (read-only)
enable_fips: FIPS compliance setting (read-only)
system_data: System metadata (read-only)
id: Resource ID (read-only)
name: Snapshot name (read-only)
type: Resource type (read-only)
"""
location: str
tags: Optional[Dict[str, str]]
creation_data: Optional[CreationData]
snapshot_type: Optional[str]
kubernetes_version: Optional[str]
node_image_version: Optional[str]
vm_size: Optional[str]
os_type: Optional[str]
os_sku: Optional[str]
enable_fips: Optional[bool]
system_data: Optional[SystemData]
id: Optional[str]
name: Optional[str]
type: Optional[str]
class CreationData:
"""
Snapshot creation metadata identifying the source.
Attributes:
source_resource_id: Resource ID of source agent pool
"""
source_resource_id: Optional[str]
class TagsObject:
"""
Resource tags for organization and management.
Attributes:
tags: Dictionary of key-value tag pairs
"""
tags: Optional[Dict[str, str]]from azure.identity import DefaultAzureCredential
from azure.mgmt.containerservice import ContainerServiceClient
from azure.mgmt.containerservice.models import Snapshot, CreationData
credential = DefaultAzureCredential()
client = ContainerServiceClient(credential, subscription_id)
# Create snapshot of existing agent pool
source_agent_pool_id = "/subscriptions/sub-id/resourceGroups/myRG/providers/Microsoft.ContainerService/managedClusters/myCluster/agentPools/nodepool1"
snapshot = Snapshot(
location="eastus",
creation_data=CreationData(
source_resource_id=source_agent_pool_id
),
tags={
"environment": "production",
"backup-date": "2024-01-15",
"cluster": "myCluster",
"purpose": "disaster-recovery"
}
)
result = client.snapshots.create_or_update(
"myResourceGroup",
"nodepool1-backup-20240115",
snapshot
)
print(f"Snapshot created: {result.name}")
print(f"Kubernetes version: {result.kubernetes_version}")
print(f"Node image version: {result.node_image_version}")# Create template snapshots for different environments
environments = ["dev", "staging", "prod"]
configurations = {
"dev": {
"cluster": "dev-cluster",
"agent_pool": "nodepool1",
"tags": {"environment": "dev", "cost-center": "engineering"}
},
"staging": {
"cluster": "staging-cluster",
"agent_pool": "nodepool1",
"tags": {"environment": "staging", "cost-center": "qa"}
},
"prod": {
"cluster": "prod-cluster",
"agent_pool": "nodepool1",
"tags": {"environment": "prod", "cost-center": "operations"}
}
}
for env in environments:
config = configurations[env]
source_id = f"/subscriptions/sub-id/resourceGroups/{env}-rg/providers/Microsoft.ContainerService/managedClusters/{config['cluster']}/agentPools/{config['agent_pool']}"
template_snapshot = Snapshot(
location="eastus",
creation_data=CreationData(source_resource_id=source_id),
tags={
**config["tags"],
"template": "true",
"created-date": "2024-01-15"
}
)
result = client.snapshots.create_or_update(
f"{env}-rg",
f"{env}-nodepool-template",
template_snapshot
)
print(f"Template snapshot created for {env}: {result.name}")# List all snapshots in subscription
print("All snapshots:")
for snapshot in client.snapshots.list():
print(f" {snapshot.name} - {snapshot.location}")
if snapshot.tags:
env = snapshot.tags.get("environment", "unknown")
purpose = snapshot.tags.get("purpose", "unknown")
print(f" Environment: {env}, Purpose: {purpose}")
# List snapshots in specific resource group
prod_snapshots = client.snapshots.list_by_resource_group("prod-rg")
print("\nProduction snapshots:")
for snapshot in prod_snapshots:
print(f" {snapshot.name}")
print(f" K8s version: {snapshot.kubernetes_version}")
print(f" Node image: {snapshot.node_image_version}")
print(f" VM size: {snapshot.vm_size}")
# Get detailed snapshot information
detailed_snapshot = client.snapshots.get("prod-rg", "prod-nodepool-template")
print(f"\nSnapshot details for {detailed_snapshot.name}:")
print(f" OS Type: {detailed_snapshot.os_type}")
print(f" OS SKU: {detailed_snapshot.os_sku}")
print(f" FIPS enabled: {detailed_snapshot.enable_fips}")
if detailed_snapshot.creation_data:
print(f" Source: {detailed_snapshot.creation_data.source_resource_id}")import datetime
from azure.mgmt.containerservice.models import TagsObject
def create_automated_backup(client, cluster_rg, cluster_name, agent_pool_name):
"""Create automated daily backup of agent pool."""
today = datetime.date.today()
backup_name = f"{cluster_name}-{agent_pool_name}-backup-{today.strftime('%Y%m%d')}"
source_id = f"/subscriptions/{subscription_id}/resourceGroups/{cluster_rg}/providers/Microsoft.ContainerService/managedClusters/{cluster_name}/agentPools/{agent_pool_name}"
backup_snapshot = Snapshot(
location="eastus",
creation_data=CreationData(source_resource_id=source_id),
tags={
"backup-type": "automated",
"backup-date": today.isoformat(),
"cluster": cluster_name,
"agent-pool": agent_pool_name,
"retention": "30-days"
}
)
try:
result = client.snapshots.create_or_update(
f"{cluster_rg}-backups",
backup_name,
backup_snapshot
)
print(f"Automated backup created: {result.name}")
return result
except Exception as e:
print(f"Backup failed: {e}")
return None
# Create backups for multiple clusters
clusters = [
{"rg": "prod-rg", "name": "prod-cluster", "pools": ["system", "apps"]},
{"rg": "staging-rg", "name": "staging-cluster", "pools": ["nodepool1"]}
]
for cluster in clusters:
for pool in cluster["pools"]:
create_automated_backup(
client,
cluster["rg"],
cluster["name"],
pool
)def restore_from_snapshot(client, snapshot_rg, snapshot_name, target_cluster_rg, target_cluster, new_pool_name):
"""Create new agent pool from snapshot configuration."""
# Get snapshot details
snapshot = client.snapshots.get(snapshot_rg, snapshot_name)
from azure.mgmt.containerservice.models import AgentPool
# Create agent pool configuration based on snapshot
restored_pool = AgentPool(
count=3, # Default count, adjust as needed
vm_size=snapshot.vm_size,
os_type=snapshot.os_type,
os_sku=snapshot.os_sku,
mode="User", # Default to User mode
orchestrator_version=snapshot.kubernetes_version,
# Reference the snapshot for node image
creation_data=CreationData(
source_resource_id=f"/subscriptions/{subscription_id}/resourceGroups/{snapshot_rg}/providers/Microsoft.ContainerService/snapshots/{snapshot_name}"
)
)
# Create the agent pool
operation = client.agent_pools.begin_create_or_update(
target_cluster_rg,
target_cluster,
new_pool_name,
restored_pool
)
result = operation.result()
print(f"Agent pool restored from snapshot: {result.name}")
return result
# Restore production configuration to staging
restore_from_snapshot(
client,
"prod-rg",
"prod-nodepool-template",
"staging-rg",
"staging-cluster",
"restored-nodepool"
)from azure.mgmt.containerservice.models import TagsObject
# Update snapshot tags for better organization
update_tags = TagsObject(
tags={
"environment": "production",
"backup-type": "manual",
"retention": "90-days",
"compliance": "required",
"owner": "platform-team"
}
)
updated_snapshot = client.snapshots.update_tags(
"prod-rg",
"important-snapshot",
update_tags
)
print(f"Tags updated for snapshot: {updated_snapshot.name}")
# Bulk tag updates for compliance
compliance_tags = TagsObject(
tags={
"compliance-reviewed": "2024-01-15",
"gdpr-compliant": "true",
"retention-policy": "applied"
}
)
snapshots = client.snapshots.list_by_resource_group("compliance-rg")
for snapshot in snapshots:
if "prod" in snapshot.name:
client.snapshots.update_tags(
"compliance-rg",
snapshot.name,
compliance_tags
)
print(f"Compliance tags applied to {snapshot.name}")# Create snapshots across multiple regions for disaster recovery
regions = ["eastus", "westus", "northeurope"]
critical_clusters = ["prod-cluster-1", "prod-cluster-2"]
for region in regions:
for cluster in critical_clusters:
# Create snapshot in each region
regional_snapshot = Snapshot(
location=region,
creation_data=CreationData(
source_resource_id=f"/subscriptions/{subscription_id}/resourceGroups/{region}-rg/providers/Microsoft.ContainerService/managedClusters/{cluster}/agentPools/nodepool1"
),
tags={
"region": region,
"cluster": cluster,
"dr-purpose": "cross-region-backup",
"backup-date": datetime.date.today().isoformat()
}
)
result = client.snapshots.create_or_update(
f"{region}-dr-rg",
f"{cluster}-{region}-dr-snapshot",
regional_snapshot
)
print(f"DR snapshot created: {result.name} in {region}")import datetime
def cleanup_old_snapshots(client, resource_group, retention_days=30):
"""Remove snapshots older than retention period."""
cutoff_date = datetime.date.today() - datetime.timedelta(days=retention_days)
deleted_count = 0
snapshots = client.snapshots.list_by_resource_group(resource_group)
for snapshot in snapshots:
if snapshot.tags and "backup-date" in snapshot.tags:
backup_date = datetime.datetime.strptime(
snapshot.tags["backup-date"], "%Y-%m-%d"
).date()
if backup_date < cutoff_date:
# Check if it's not a template or important snapshot
if snapshot.tags.get("template") != "true" and snapshot.tags.get("retention") != "permanent":
try:
client.snapshots.delete(resource_group, snapshot.name)
print(f"Deleted old snapshot: {snapshot.name}")
deleted_count += 1
except Exception as e:
print(f"Failed to delete {snapshot.name}: {e}")
print(f"Cleanup completed. Deleted {deleted_count} old snapshots.")
# Run cleanup for backup resource groups
backup_rgs = ["prod-backups-rg", "staging-backups-rg"]
for rg in backup_rgs:
cleanup_old_snapshots(client, rg, retention_days=30)from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
try:
snapshot = client.snapshots.get("myRG", "nonexistent-snapshot")
except ResourceNotFoundError:
print("Snapshot not found")
except HttpResponseError as e:
print(f"HTTP error: {e.status_code}")
# Handle snapshot creation failures
try:
result = client.snapshots.create_or_update("myRG", "new-snapshot", snapshot)
except HttpResponseError as e:
if e.status_code == 400:
print("Invalid snapshot configuration")
elif e.status_code == 409:
print("Snapshot with this name already exists")
elif "SourceAgentPoolNotFound" in str(e):
print("Source agent pool no longer exists")
else:
print(f"Snapshot creation failed: {e}")
# Handle deletion errors
try:
client.snapshots.delete("myRG", "snapshot-to-delete")
except HttpResponseError as e:
if e.status_code == 409:
print("Snapshot is currently being used and cannot be deleted")
else:
print(f"Deletion failed: {e}")# Consistent naming for better management
def generate_snapshot_name(cluster_name, agent_pool, purpose, date=None):
"""Generate consistent snapshot names."""
if date is None:
date = datetime.date.today()
date_str = date.strftime("%Y%m%d")
return f"{cluster_name}-{agent_pool}-{purpose}-{date_str}"
# Examples
backup_name = generate_snapshot_name("prod-cluster", "nodepool1", "backup")
template_name = generate_snapshot_name("gold-cluster", "system", "template")
dr_name = generate_snapshot_name("prod-cluster", "nodepool1", "dr")# Comprehensive tagging for snapshot management
standard_tags = {
"created-by": "platform-automation",
"backup-schedule": "daily",
"retention-policy": "30-days",
"environment": "production",
"cost-center": "platform",
"compliance": "sox-required",
"notification": "platform-team@company.com"
}
# Apply tags based on snapshot type
def get_snapshot_tags(purpose, environment, cluster_name):
"""Get appropriate tags for snapshot."""
base_tags = {
"purpose": purpose,
"environment": environment,
"cluster": cluster_name,
"created-date": datetime.date.today().isoformat()
}
if purpose == "backup":
base_tags.update({
"retention": "30-days",
"automated": "true"
})
elif purpose == "template":
base_tags.update({
"retention": "permanent",
"template": "true"
})
elif purpose == "dr":
base_tags.update({
"retention": "90-days",
"disaster-recovery": "true"
})
return base_tagsInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-containerservice