Microsoft Azure Kusto Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Monitoring capabilities including operation results tracking, SKU information retrieval, and management of attached database configurations for database sharing scenarios. This enables operational oversight and resource planning for Kusto deployments.
Operations to discover available management operations and their capabilities.
def list(**kwargs) -> Iterable[Operation]:
"""
List available operations for the Kusto resource provider.
Returns:
Iterable of Operation objects describing available operations
"""Operations to track the status and results of long-running operations.
def get(
location: str,
operation_id: str,
**kwargs
) -> OperationResult:
"""
Get the result of a long-running operation.
Parameters:
- location: Azure region where the operation was initiated
- operation_id: Unique identifier of the operation
Returns:
OperationResult with operation status and details
"""Operations to retrieve operation results based on location for geographically distributed operations.
def get(
location: str,
operation_id: str,
**kwargs
) -> OperationResult:
"""
Get the result of a location-based long-running operation.
Parameters:
- location: Azure region where the operation was initiated
- operation_id: Unique identifier of the operation
Returns:
OperationResult with operation status and details
"""Operations to discover available SKUs and their capabilities for capacity planning.
def list(**kwargs) -> Iterable[SkuDescription]:
"""
List available SKUs for Kusto clusters.
Returns:
Iterable of SkuDescription objects with SKU details and availability
"""Management of attached database configurations that enable database sharing between clusters.
def get(
resource_group_name: str,
cluster_name: str,
attached_database_configuration_name: str,
**kwargs
) -> AttachedDatabaseConfiguration:
"""
Get an attached database configuration.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- attached_database_configuration_name: Name of the configuration
Returns:
AttachedDatabaseConfiguration object
"""
def begin_create_or_update(
resource_group_name: str,
cluster_name: str,
attached_database_configuration_name: str,
parameters: AttachedDatabaseConfiguration,
**kwargs
) -> LROPoller[AttachedDatabaseConfiguration]:
"""
Create or update an attached database configuration.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- attached_database_configuration_name: Name of the configuration
- parameters: AttachedDatabaseConfiguration with settings
Returns:
LROPoller for the long-running operation returning AttachedDatabaseConfiguration
"""
def begin_delete(
resource_group_name: str,
cluster_name: str,
attached_database_configuration_name: str,
**kwargs
) -> LROPoller[None]:
"""
Delete an attached database configuration.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- attached_database_configuration_name: Name of the configuration
Returns:
LROPoller for the long-running delete operation
"""
def list_by_cluster(
resource_group_name: str,
cluster_name: str,
**kwargs
) -> Iterable[AttachedDatabaseConfiguration]:
"""
List attached database configurations for a cluster.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
Returns:
Iterable of AttachedDatabaseConfiguration objects
"""
def check_name_availability(
resource_group_name: str,
cluster_name: str,
resource_name: AttachedDatabaseConfigurationsCheckNameRequest,
**kwargs
) -> CheckNameResult:
"""
Check if an attached database configuration name is available.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- resource_name: CheckNameRequest with name to validate
Returns:
CheckNameResult indicating availability
"""# Start a long-running operation (e.g., cluster creation)
poller = client.clusters.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-cluster",
parameters=cluster_params
)
# Get operation details
operation_id = poller._operation.resource().name
location = "East US"
# Check operation status
operation_result = client.operations_results.get(
location=location,
operation_id=operation_id
)
print(f"Operation Status: {operation_result.status}")
print(f"Operation Name: {operation_result.name}")
if operation_result.error:
print(f"Operation Error: {operation_result.error.message}")# Get all available SKUs
available_skus = list(client.skus.list())
print("Available Kusto Cluster SKUs:")
for sku in available_skus:
print(f"- {sku.name}: {sku.tier}")
print(f" Locations: {', '.join(sku.locations)}")
print(f" Capabilities: {len(sku.capabilities)} features")
for capability in sku.capabilities:
print(f" {capability.name}: {capability.value}")
print()from azure.mgmt.kusto.models import (
AttachedDatabaseConfiguration,
DefaultPrincipalsModificationKind,
TableLevelSharingProperties
)
# Configure table-level sharing
table_sharing = TableLevelSharingProperties(
tables_to_include=["Table1", "Table2", "Table3"],
tables_to_exclude=[],
external_tables_to_include=["ExternalTable1"],
external_tables_to_exclude=[],
materialized_views_to_include=["MaterializedView1"],
materialized_views_to_exclude=[],
functions_to_include=["Function1", "Function2"],
functions_to_exclude=[]
)
# Create attached database configuration
attached_config = AttachedDatabaseConfiguration(
location="East US",
database_name="SharedDatabase",
cluster_resource_id="/subscriptions/sub-id/resourceGroups/source-rg/providers/Microsoft.Kusto/clusters/source-cluster",
default_principals_modification_kind=DefaultPrincipalsModificationKind.UNION,
table_level_sharing_properties=table_sharing,
database_name_prefix="Shared_"
)
# Create the configuration
poller = client.attached_database_configurations.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-follower-cluster",
attached_database_configuration_name="shared-database-config",
parameters=attached_config
)
config = poller.result()
print(f"Attached database configuration created: {config.name}")
print(f"Attached databases: {', '.join(config.attached_database_names)}")# List all available operations
operations = list(client.operations.list())
print("Available Kusto Management Operations:")
for operation in operations:
print(f"- {operation.name}")
print(f" Display Name: {operation.display.operation}")
print(f" Description: {operation.display.description}")
print(f" Provider: {operation.display.provider}")
print(f" Resource: {operation.display.resource}")
print()# Get cluster details for health monitoring
cluster = client.clusters.get(
resource_group_name="my-resource-group",
cluster_name="my-cluster"
)
print(f"Cluster Health Status:")
print(f"- Name: {cluster.name}")
print(f"- State: {cluster.state}")
print(f"- Provisioning State: {cluster.provisioning_state}")
print(f"- URI: {cluster.uri}")
print(f"- Data Ingestion URI: {cluster.data_ingestion_uri}")
print(f"- SKU: {cluster.sku.name} ({cluster.sku.capacity} instances)")
# List follower databases
followers = list(client.clusters.list_follower_databases(
resource_group_name="my-resource-group",
cluster_name="my-cluster"
))
print(f"- Follower Databases: {len(followers)}")
for follower in followers:
print(f" - {follower.cluster_resource_id} -> {follower.database_name}")class Operation:
"""Represents an available management operation."""
name: str # Operation name
is_data_action: bool # Whether operation is a data action
display: OperationDisplay # Display information
origin: str # Operation origin
properties: dict # Additional properties
class OperationDisplay:
"""Display information for an operation."""
provider: str # Resource provider name
resource: str # Resource type
operation: str # Operation name
description: str # Operation description
class OperationResult:
"""Result of a long-running operation."""
# Read-only properties
id: str # Operation ID
name: str # Operation name
status: str # Operation status
start_time: datetime # Operation start time
end_time: datetime # Operation end time
percent_complete: float # Completion percentage
properties: dict # Operation-specific properties
error: ErrorDetail # Error details if operation failed
class SkuDescription:
"""Description of an available SKU."""
resource_type: str # Resource type
name: str # SKU name
tier: str # SKU tier
locations: List[str] # Available locations
location_info: List[SkuLocationInfoItem] # Location-specific information
capabilities: List[ResourceSkuCapabilities] # SKU capabilities
restrictions: List[dict] # SKU restrictions
class ResourceSkuCapabilities:
"""Capabilities of a resource SKU."""
name: str # Capability name
value: str # Capability value
class SkuLocationInfoItem:
"""Location-specific SKU information."""
location: str # Azure region
zones: List[str] # Available zones
zone_details: List[ResourceSkuZoneDetails] # Zone-specific details
class AttachedDatabaseConfiguration:
"""Configuration for sharing databases between clusters."""
# Read-only properties
id: str # Resource ID
name: str # Configuration name
type: str # Resource type
provisioning_state: ProvisioningState # Provisioning state
attached_database_names: List[str] # Names of attached databases
# Configuration properties
location: str # Azure region
database_name: str # Database name pattern (* for all)
cluster_resource_id: str # Source cluster resource ID
default_principals_modification_kind: DefaultPrincipalsModificationKind # Principal handling
table_level_sharing_properties: TableLevelSharingProperties # Table sharing config
database_name_override: str # Override database name
database_name_prefix: str # Prefix for database names
class TableLevelSharingProperties:
"""Table-level sharing configuration."""
tables_to_include: List[str] # Tables to include
tables_to_exclude: List[str] # Tables to exclude
external_tables_to_include: List[str] # External tables to include
external_tables_to_exclude: List[str] # External tables to exclude
materialized_views_to_include: List[str] # Materialized views to include
materialized_views_to_exclude: List[str] # Materialized views to exclude
functions_to_include: List[str] # Functions to include
functions_to_exclude: List[str] # Functions to exclude
class AttachedDatabaseConfigurationsCheckNameRequest:
"""Request to check attached database configuration name availability."""
name: str # Name to check
type: str # Resource type
from enum import Enum
class DefaultPrincipalsModificationKind(str, Enum):
"""Default principals modification behavior."""
UNION = "Union" # Combine source and target principals
REPLACE = "Replace" # Replace target principals with source
NONE = "None" # No principal modificationfrom azure.core.exceptions import HttpResponseError
import time
def monitor_long_running_operation(poller, operation_name):
"""Monitor a long-running operation with progress reporting."""
print(f"Starting {operation_name}...")
try:
while not poller.done():
print(f"Operation in progress... Status: {poller.status()}")
time.sleep(30) # Wait 30 seconds between checks
result = poller.result()
print(f"{operation_name} completed successfully")
return result
except HttpResponseError as e:
print(f"{operation_name} failed: {e.message}")
# Get detailed error information
if hasattr(e, 'error') and e.error:
print(f"Error Code: {e.error.code}")
print(f"Error Details: {e.error.message}")
if hasattr(e.error, 'details') and e.error.details:
for detail in e.error.details:
print(f"Additional Info: {detail.code} - {detail.message}")
raise
# Usage example
poller = client.clusters.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-cluster",
parameters=cluster_params
)
cluster = monitor_long_running_operation(poller, "Cluster Creation")def perform_cluster_health_check(client, resource_group_name, cluster_name):
"""Perform comprehensive health check on a Kusto cluster."""
health_status = {
"cluster_accessible": False,
"provisioning_state": None,
"cluster_state": None,
"databases": [],
"data_connections": [],
"issues": []
}
try:
# Check cluster status
cluster = client.clusters.get(
resource_group_name=resource_group_name,
cluster_name=cluster_name
)
health_status["cluster_accessible"] = True
health_status["provisioning_state"] = cluster.provisioning_state
health_status["cluster_state"] = cluster.state
# Check if cluster is in a healthy state
if cluster.state != "Running":
health_status["issues"].append(f"Cluster state is {cluster.state}, expected Running")
if cluster.provisioning_state != "Succeeded":
health_status["issues"].append(f"Provisioning state is {cluster.provisioning_state}, expected Succeeded")
# Check databases
databases = list(client.databases.list_by_cluster(
resource_group_name=resource_group_name,
cluster_name=cluster_name
))
for db in databases:
db_status = {
"name": db.name,
"provisioning_state": db.provisioning_state,
"healthy": db.provisioning_state == "Succeeded"
}
health_status["databases"].append(db_status)
if not db_status["healthy"]:
health_status["issues"].append(f"Database {db.name} provisioning state is {db.provisioning_state}")
# Check data connections for each database
try:
connections = list(client.data_connections.list_by_database(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=db.name
))
for conn in connections:
conn_status = {
"database": db.name,
"name": conn.name,
"kind": conn.kind,
"provisioning_state": conn.provisioning_state,
"healthy": conn.provisioning_state == "Succeeded"
}
health_status["data_connections"].append(conn_status)
if not conn_status["healthy"]:
health_status["issues"].append(f"Data connection {conn.name} in {db.name} provisioning state is {conn.provisioning_state}")
except HttpResponseError as e:
health_status["issues"].append(f"Failed to check data connections for {db.name}: {e.message}")
except HttpResponseError as e:
health_status["issues"].append(f"Failed to access cluster: {e.message}")
return health_status
# Usage example
health = perform_cluster_health_check(client, "my-resource-group", "my-cluster")
print(f"Cluster Health: {'HEALTHY' if not health['issues'] else 'ISSUES FOUND'}")
if health["issues"]:
print("Issues found:")
for issue in health["issues"]:
print(f"- {issue}")Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-kusto