Microsoft Azure Authorization Management Client Library for Python providing RBAC, PIM, and access control capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Role assignment metrics and operational monitoring capabilities for analyzing authorization usage patterns and performance. These operations provide insights into RBAC usage, PIM metrics, and overall authorization health across Azure resources.
Analyze role assignment patterns, usage statistics, and governance metrics across Azure subscriptions and resources.
def get_metrics_for_subscription(subscription_id: str) -> RoleAssignmentMetrics:
"""
Get role assignment metrics for a subscription.
Parameters:
- subscription_id: Azure subscription ID to get metrics for
Returns:
RoleAssignmentMetrics object with usage statistics and patterns
"""
def get_metrics_for_management_group(management_group_id: str) -> RoleAssignmentMetrics:
"""
Get role assignment metrics for a management group.
Parameters:
- management_group_id: Management group ID to get metrics for
Returns:
RoleAssignmentMetrics object with aggregated metrics across the management group
"""Discover and monitor available Azure Authorization operations and their metadata for API usage analysis.
def list() -> Iterator[Operation]:
"""
List all available operations in the Azure Authorization resource provider.
Returns:
Iterator of Operation objects describing available authorization operations
"""from azure.mgmt.authorization import AuthorizationManagementClient
from azure.identity import DefaultAzureCredential
# Initialize client
credential = DefaultAzureCredential()
client = AuthorizationManagementClient(
credential=credential,
subscription_id="your-subscription-id"
)
# Get role assignment metrics for subscription
metrics = client.role_assignment_metrics.get_metrics_for_subscription(
subscription_id="your-subscription-id"
)
print("Role Assignment Metrics:")
print(f"Total Assignments: {metrics.total_role_assignments}")
print(f"Total Principals: {metrics.total_principals}")
print(f"Total Roles: {metrics.total_role_definitions}")
print(f"Privileged Assignments: {metrics.privileged_role_assignments}")
# Analyze assignment distribution
if metrics.role_assignment_breakdown:
print("\nRole Assignment Breakdown:")
for role_name, count in metrics.role_assignment_breakdown.items():
print(f" {role_name}: {count} assignments")
# Check for potential issues
if metrics.excessive_permissions_count > 0:
print(f"\nWarning: {metrics.excessive_permissions_count} assignments with excessive permissions")
if metrics.stale_assignments_count > 0:
print(f"Warning: {metrics.stale_assignments_count} potentially stale assignments")# List all available authorization operations
operations = client.operations.list()
print("Available Authorization Operations:")
for operation in operations:
print(f"Operation: {operation.name}")
print(f"Display Name: {operation.display.operation}")
print(f"Resource: {operation.display.resource}")
print(f"Provider: {operation.display.provider}")
print(f"Description: {operation.display.description}")
print("---")
# Filter operations by category
rbac_operations = [op for op in operations if 'roleAssignments' in op.name]
pim_operations = [op for op in operations if 'schedule' in op.name.lower()]
review_operations = [op for op in operations if 'accessReview' in op.name]
print(f"\nRBAC Operations: {len(rbac_operations)}")
print(f"PIM Operations: {len(pim_operations)}")
print(f"Access Review Operations: {len(review_operations)}")# Analyze metrics across a management group hierarchy
management_group_id = "your-management-group-id"
try:
mg_metrics = client.role_assignment_metrics.get_metrics_for_management_group(
management_group_id=management_group_id
)
print(f"Management Group Metrics for: {management_group_id}")
print(f"Total Child Subscriptions: {mg_metrics.subscription_count}")
print(f"Total Role Assignments: {mg_metrics.total_role_assignments}")
print(f"Inherited Assignments: {mg_metrics.inherited_assignments}")
print(f"Direct Assignments: {mg_metrics.direct_assignments}")
# Analyze consistency across subscriptions
if mg_metrics.subscription_metrics:
print("\nPer-Subscription Breakdown:")
for sub_metric in mg_metrics.subscription_metrics:
print(f" Subscription {sub_metric.subscription_id}:")
print(f" Assignments: {sub_metric.assignment_count}")
print(f" Unique Principals: {sub_metric.principal_count}")
except Exception as e:
print(f"Error retrieving management group metrics: {e}")from datetime import datetime, timedelta
# Generate a comprehensive governance report
def generate_governance_report(subscription_id):
"""Generate a comprehensive governance report for a subscription."""
# Get metrics
metrics = client.role_assignment_metrics.get_metrics_for_subscription(subscription_id)
# Get current assignments for detailed analysis
assignments = list(client.role_assignments.list_for_subscription())
# Analyze assignment age and patterns
now = datetime.utcnow()
old_assignments = []
privileged_assignments = []
privileged_roles = [
"Owner", "Contributor", "User Access Administrator",
"Security Administrator", "Global Administrator"
]
for assignment in assignments:
# Check for old assignments (example: older than 90 days)
if assignment.created_on and (now - assignment.created_on) > timedelta(days=90):
old_assignments.append(assignment)
# Check for privileged roles
role_name = get_role_name(assignment.role_definition_id) # Helper function
if role_name in privileged_roles:
privileged_assignments.append(assignment)
# Generate report
report = {
"subscription_id": subscription_id,
"report_date": now.isoformat(),
"summary": {
"total_assignments": len(assignments),
"old_assignments": len(old_assignments),
"privileged_assignments": len(privileged_assignments),
"compliance_score": calculate_compliance_score(metrics, assignments)
},
"recommendations": generate_recommendations(metrics, assignments)
}
return report
def get_role_name(role_definition_id):
"""Helper function to get role name from role definition ID."""
try:
role_def = client.role_definitions.get_by_id(role_definition_id)
return role_def.role_name
except:
return "Unknown"
def calculate_compliance_score(metrics, assignments):
"""Calculate a simple compliance score based on best practices."""
score = 100
# Deduct points for potential issues
if metrics.excessive_permissions_count > 0:
score -= min(20, metrics.excessive_permissions_count * 2)
if metrics.stale_assignments_count > 0:
score -= min(15, metrics.stale_assignments_count * 1)
# Check for too many owners
owner_count = len([a for a in assignments if "Owner" in get_role_name(a.role_definition_id)])
if owner_count > 5:
score -= min(10, (owner_count - 5) * 2)
return max(0, score)
def generate_recommendations(metrics, assignments):
"""Generate actionable recommendations based on analysis."""
recommendations = []
if metrics.stale_assignments_count > 0:
recommendations.append({
"priority": "High",
"category": "Access Review",
"description": f"Review {metrics.stale_assignments_count} stale role assignments",
"action": "Conduct access review to remove unused assignments"
})
if metrics.excessive_permissions_count > 0:
recommendations.append({
"priority": "Medium",
"category": "Least Privilege",
"description": f"{metrics.excessive_permissions_count} assignments may have excessive permissions",
"action": "Review and apply principle of least privilege"
})
return recommendations
# Generate and display report
report = generate_governance_report("your-subscription-id")
print(f"Governance Report for {report['subscription_id']}")
print(f"Compliance Score: {report['summary']['compliance_score']}/100")
print("\nRecommendations:")
for rec in report['recommendations']:
print(f"- [{rec['priority']}] {rec['description']}")
print(f" Action: {rec['action']}")class RoleAssignmentMetrics:
subscription_id: Optional[str]
management_group_id: Optional[str]
total_role_assignments: Optional[int]
total_principals: Optional[int]
total_role_definitions: Optional[int]
privileged_role_assignments: Optional[int]
excessive_permissions_count: Optional[int]
stale_assignments_count: Optional[int]
role_assignment_breakdown: Optional[Dict[str, int]]
principal_type_breakdown: Optional[Dict[str, int]]
created_date: Optional[datetime]
last_updated: Optional[datetime]
class ManagementGroupMetrics(RoleAssignmentMetrics):
subscription_count: Optional[int]
inherited_assignments: Optional[int]
direct_assignments: Optional[int]
subscription_metrics: Optional[List[SubscriptionMetric]]
class SubscriptionMetric:
subscription_id: Optional[str]
assignment_count: Optional[int]
principal_count: Optional[int]
role_count: Optional[int]
last_modified: Optional[datetime]class Operation:
name: Optional[str]
display: Optional[OperationDisplay]
origin: Optional[str]
is_data_action: Optional[bool]
class OperationDisplay:
provider: Optional[str]
resource: Optional[str]
operation: Optional[str]
description: Optional[str]
class OperationListResult:
value: Optional[List[Operation]]
next_link: Optional[str]class MetricCategory:
ROLE_ASSIGNMENTS = "RoleAssignments"
PRINCIPALS = "Principals"
ROLE_DEFINITIONS = "RoleDefinitions"
PERMISSIONS = "Permissions"
COMPLIANCE = "Compliance"
class AssignmentStatus:
ACTIVE = "Active"
STALE = "Stale"
EXCESSIVE = "Excessive"
COMPLIANT = "Compliant"
class PrincipalTypeBreakdown:
USER = "User"
GROUP = "Group"
SERVICE_PRINCIPAL = "ServicePrincipal"
MANAGED_IDENTITY = "ManagedIdentity"class OperationOrigin:
USER = "user"
SYSTEM = "system"
USER_SYSTEM = "user,system"
class OperationCategory:
RBAC = "rbac"
PIM = "pim"
ACCESS_REVIEWS = "accessReviews"
ALERTS = "alerts"
METRICS = "metrics"Role assignment metrics can be resource-intensive for large environments:
# For large subscriptions, consider pagination and filtering
def get_metrics_efficiently(subscription_id, max_assignments=1000):
"""Get metrics efficiently for large subscriptions."""
# First, get a sample of assignments to estimate scope
sample_assignments = list(client.role_assignments.list_for_subscription(
filter=f"$top={min(100, max_assignments)}"
))
if len(sample_assignments) < 100:
# Small subscription, get full metrics
return client.role_assignment_metrics.get_metrics_for_subscription(subscription_id)
else:
# Large subscription, get metrics with sampling
print(f"Large subscription detected ({len(sample_assignments)}+ assignments)")
print("Using sampling approach for metrics collection...")
# Custom metrics calculation with sampling
return calculate_sampled_metrics(subscription_id, sample_assignments)
def calculate_sampled_metrics(subscription_id, sample_assignments):
"""Calculate approximate metrics using sampling."""
# Implementation would extrapolate from sample data
# This is a simplified example
estimated_total = len(sample_assignments) * 10 # Rough estimate
return {
"subscription_id": subscription_id,
"estimated_total_assignments": estimated_total,
"sample_size": len(sample_assignments),
"note": "Metrics calculated using sampling due to large dataset"
}Common exceptions with metrics operations:
from azure.core.exceptions import ResourceNotFoundError, BadRequestError
try:
metrics = client.role_assignment_metrics.get_metrics_for_subscription(subscription_id)
except ResourceNotFoundError:
print("Subscription not found or no metrics available")
except BadRequestError:
print("Invalid subscription ID or request parameters")
except Exception as e:
print(f"Unexpected error retrieving metrics: {e}")
try:
operations = list(client.operations.list())
except Exception as e:
print(f"Error listing operations: {e}")# Example: Send metrics to Azure Monitor (requires azure-monitor-ingestion)
def send_metrics_to_monitor(metrics):
"""Send role assignment metrics to Azure Monitor."""
from azure.monitor.ingestion import LogsIngestionClient
# This is a conceptual example
monitor_client = LogsIngestionClient(endpoint="...", credential=credential)
# Prepare metrics for ingestion
log_data = {
"TimeGenerated": datetime.utcnow().isoformat(),
"SubscriptionId": metrics.subscription_id,
"TotalAssignments": metrics.total_role_assignments,
"PrivilegedAssignments": metrics.privileged_role_assignments,
"ComplianceScore": calculate_compliance_score(metrics, [])
}
# Send to custom log table
monitor_client.upload(
rule_id="your-data-collection-rule",
stream_name="Custom-AuthorizationMetrics_CL",
logs=[log_data]
)Metrics can be exported for Power BI reporting:
import json
import pandas as pd
def export_metrics_for_powerbi(subscription_ids):
"""Export metrics in format suitable for Power BI."""
all_metrics = []
for sub_id in subscription_ids:
try:
metrics = client.role_assignment_metrics.get_metrics_for_subscription(sub_id)
all_metrics.append({
"SubscriptionId": sub_id,
"TotalAssignments": metrics.total_role_assignments,
"TotalPrincipals": metrics.total_principals,
"PrivilegedAssignments": metrics.privileged_role_assignments,
"StaleAssignments": metrics.stale_assignments_count,
"LastUpdated": metrics.last_updated.isoformat() if metrics.last_updated else None
})
except Exception as e:
print(f"Error getting metrics for {sub_id}: {e}")
# Convert to DataFrame and export
df = pd.DataFrame(all_metrics)
df.to_csv("authorization_metrics.csv", index=False)
df.to_json("authorization_metrics.json", orient="records", indent=2)
return dfInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-authorization