Microsoft Azure Kusto Management Client Library for Python
npx @tessl/cli install tessl/pypi-azure-mgmt-kusto@3.4.0Microsoft Azure Kusto Management Client Library for Python provides comprehensive management of Azure Data Explorer (Kusto) resources through the Azure Resource Manager API. This library enables developers to programmatically create, configure, and manage Kusto clusters, databases, and related resources including database principal assignments, private endpoints, managed private endpoints, and data connections.
pip install azure-mgmt-kusto azure-identityfrom azure.mgmt.kusto import KustoManagementClient
from azure.identity import DefaultAzureCredentialFor async operations:
from azure.mgmt.kusto.aio import KustoManagementClient
from azure.identity.aio import DefaultAzureCredentialimport os
from azure.identity import DefaultAzureCredential
from azure.mgmt.kusto import KustoManagementClient
# Authentication using environment variables:
# AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, AZURE_SUBSCRIPTION_ID
credential = DefaultAzureCredential()
subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID")
# Create the management client
client = KustoManagementClient(
credential=credential,
subscription_id=subscription_id
)
# List all Kusto clusters in the subscription
clusters = list(client.clusters.list())
print(f"Found {len(clusters)} Kusto clusters")
# Get a specific cluster
cluster = client.clusters.get(
resource_group_name="my-resource-group",
cluster_name="my-kusto-cluster"
)
print(f"Cluster URI: {cluster.uri}")
# List databases in a cluster
databases = list(client.databases.list_by_cluster(
resource_group_name="my-resource-group",
cluster_name="my-kusto-cluster"
))
print(f"Found {len(databases)} databases")
# Close the client when done
client.close()The azure-mgmt-kusto library follows Azure SDK design patterns with a layered architecture:
Both synchronous and asynchronous versions are available, with the async client providing identical functionality using async/await patterns.
Complete lifecycle management of Kusto clusters including creation, configuration, scaling, start/stop operations, migration, network diagnostics, and SKU management.
class ClustersOperations:
def get(self, resource_group_name: str, cluster_name: str, **kwargs) -> Cluster: ...
def begin_create_or_update(self, resource_group_name: str, cluster_name: str, parameters: Cluster, **kwargs) -> LROPoller[Cluster]: ...
def begin_delete(self, resource_group_name: str, cluster_name: str, **kwargs) -> LROPoller[None]: ...
def list(self, **kwargs) -> Iterable[Cluster]: ...
def begin_start(self, resource_group_name: str, cluster_name: str, **kwargs) -> LROPoller[None]: ...
def begin_stop(self, resource_group_name: str, cluster_name: str, **kwargs) -> LROPoller[None]: ...Database lifecycle management within Kusto clusters including creation, configuration, principal management, and database sharing through attached database configurations.
class DatabasesOperations:
def get(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs) -> Database: ...
def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, parameters: Database, **kwargs) -> LROPoller[Database]: ...
def list_by_cluster(self, resource_group_name: str, cluster_name: str, **kwargs) -> Iterable[Database]: ...
def add_principals(self, resource_group_name: str, cluster_name: str, database_name: str, database_principals_to_add: DatabasePrincipalListRequest, **kwargs) -> DatabasePrincipalListResult: ...Management of data ingestion connections from various Azure services including Event Hub, IoT Hub, Event Grid, and Cosmos DB for streaming data into Kusto databases.
class DataConnectionsOperations:
def get(self, resource_group_name: str, cluster_name: str, database_name: str, data_connection_name: str, **kwargs) -> DataConnection: ...
def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, data_connection_name: str, parameters: DataConnection, **kwargs) -> LROPoller[DataConnection]: ...
def list_by_database(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs) -> Iterable[DataConnection]: ...
def begin_data_connection_validation(self, resource_group_name: str, cluster_name: str, database_name: str, parameters: DataConnectionValidation, **kwargs) -> LROPoller[DataConnectionValidationListResult]: ...Comprehensive security management including cluster and database-level principal assignments, private endpoints, managed private endpoints, and private link resources.
class ClusterPrincipalAssignmentsOperations:
def get(self, resource_group_name: str, cluster_name: str, principal_assignment_name: str, **kwargs) -> ClusterPrincipalAssignment: ...
def begin_create_or_update(self, resource_group_name: str, cluster_name: str, principal_assignment_name: str, parameters: ClusterPrincipalAssignment, **kwargs) -> LROPoller[ClusterPrincipalAssignment]: ...
def list(self, resource_group_name: str, cluster_name: str, **kwargs) -> Iterable[ClusterPrincipalAssignment]: ...
class DatabasePrincipalAssignmentsOperations:
def get(self, resource_group_name: str, cluster_name: str, database_name: str, principal_assignment_name: str, **kwargs) -> DatabasePrincipalAssignment: ...
def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, principal_assignment_name: str, parameters: DatabasePrincipalAssignment, **kwargs) -> LROPoller[DatabasePrincipalAssignment]: ...Management of KQL scripts for database automation and language extensions for custom query capabilities including Python, R, and other supported languages.
class ScriptsOperations:
def get(self, resource_group_name: str, cluster_name: str, database_name: str, script_name: str, **kwargs) -> Script: ...
def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, script_name: str, parameters: Script, **kwargs) -> LROPoller[Script]: ...
def list_by_database(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs) -> Iterable[Script]: ...Monitoring capabilities including operation results tracking, SKU information retrieval, and sandbox custom image management for specialized compute scenarios.
class Operations:
def list(self, **kwargs) -> Iterable[Operation]: ...
class OperationsResultsOperations:
def get(self, location: str, operation_id: str, **kwargs) -> OperationResult: ...
class SkusOperations:
def list(self, **kwargs) -> Iterable[SkuDescription]: ...class KustoManagementClient:
"""Main management client for Azure Kusto services."""
def __init__(
self,
credential: TokenCredential,
subscription_id: str,
base_url: str = "https://management.azure.com",
**kwargs
): ...
def close(self) -> None: ...
class Cluster:
"""Represents a Kusto cluster resource."""
id: str # Read-only resource ID
name: str # Read-only cluster name
type: str # Read-only resource type
location: str # Resource location
tags: Dict[str, str] # Resource tags
sku: AzureSku # Cluster SKU configuration
zones: List[str] # Availability zones
identity: Identity # Managed identity
state: State # Cluster state
provisioning_state: ProvisioningState # Provisioning state
uri: str # Cluster URI
data_ingestion_uri: str # Data ingestion URI
class Database:
"""Base class for database resources."""
id: str # Read-only resource ID
name: str # Read-only database name
type: str # Read-only resource type
location: str # Resource location
kind: Kind # Database kind (ReadWrite or ReadOnlyFollowing)
class DataConnection:
"""Base class for data connection resources."""
id: str # Read-only resource ID
name: str # Read-only data connection name
type: str # Read-only resource type
location: str # Resource location
kind: DataConnectionKind # Data connection type
# Additional supporting types
class CheckNameRequest:
"""Request to check resource name availability."""
name: str # Name to check
type: str # Resource type
class CheckNameResult:
"""Result of name availability check."""
name_available: bool # Whether name is available
reason: str # Reason if not available
message: str # Additional message
class Identity:
"""Managed identity configuration."""
type: IdentityType # Identity type
principal_id: str # Principal ID (read-only)
tenant_id: str # Tenant ID (read-only)
user_assigned_identities: Dict[str, Any] # User assigned identities
class AzureSku:
"""Azure SKU configuration."""
name: AzureSkuName # SKU name
tier: AzureSkuTier # SKU tier
capacity: int # Instance capacity
class VirtualNetworkConfiguration:
"""Virtual network configuration."""
subnet_id: str # Subnet resource ID
engine_public_ip_id: str # Engine public IP resource ID
data_management_public_ip_id: str # Data management public IP resource ID
class KeyVaultProperties:
"""Key vault configuration for encryption."""
key_name: str # Key name
key_version: str # Key version
key_vault_uri: str # Key vault URI
user_identity: str # User identity for key access
class OptimizedAutoscale:
"""Auto-scaling configuration."""
version: int # Configuration version
is_enabled: bool # Whether auto-scaling is enabled
minimum: int # Minimum instance count
maximum: int # Maximum instance count
class AcceptedAudiences:
"""Accepted audience for authentication."""
value: str # Audience value (GUID or URL)
class ErrorDetail:
"""Error detail information."""
code: str # Error code
message: str # Error message
target: str # Error target
details: List['ErrorDetail'] # Additional error details
class OperationResult:
"""Result of a long-running operation."""
id: str # Operation ID
name: str # Operation name
status: str # Operation status
start_time: datetime # Start time
end_time: datetime # End time
percent_complete: float # Completion percentage
error: ErrorDetail # Error details if failed
class SkuDescription:
"""Description of available SKU."""
resource_type: str # Resource type
name: str # SKU name
tier: str # SKU tier
locations: List[str] # Available locations
capabilities: List[Dict[str, str]] # SKU capabilities
from enum import Enum
class State(str, Enum):
"""Cluster state values."""
CREATING = "Creating"
UNAVAILABLE = "Unavailable"
RUNNING = "Running"
DELETING = "Deleting"
DELETED = "Deleted"
STOPPING = "Stopping"
STOPPED = "Stopped"
STARTING = "Starting"
UPDATING = "Updating"
MIGRATED = "Migrated"
class ProvisioningState(str, Enum):
"""Provisioning state values."""
RUNNING = "Running"
CREATING = "Creating"
DELETING = "Deleting"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
MOVING = "Moving"
CANCELED = "Canceled"
class DataConnectionKind(str, Enum):
"""Data connection type values."""
EVENT_HUB = "EventHub"
IOT_HUB = "IotHub"
EVENT_GRID = "EventGrid"
COSMOS_DB = "CosmosDb"
class Kind(str, Enum):
"""Database kind values."""
READ_WRITE = "ReadWrite"
READ_ONLY_FOLLOWING = "ReadOnlyFollowing"
class AzureSkuName(str, Enum):
"""SKU name values."""
STANDARD_D13_V2 = "Standard_D13_v2"
STANDARD_D14_V2 = "Standard_D14_v2"
STANDARD_L4_S = "Standard_L4s"
STANDARD_L8_S = "Standard_L8s"
STANDARD_L16_S = "Standard_L16s"
class AzureSkuTier(str, Enum):
"""SKU tier values."""
BASIC = "Basic"
STANDARD = "Standard"
class IdentityType(str, Enum):
"""Identity type values."""
NONE = "None"
SYSTEM_ASSIGNED = "SystemAssigned"
USER_ASSIGNED = "UserAssigned"
SYSTEM_ASSIGNED_USER_ASSIGNED = "SystemAssigned, UserAssigned"
from typing import Union, Dict, List, Any
from datetime import datetime
from azure.core.polling import LROPoller
from azure.core.paging import ItemPaged
from azure.core.credentials import TokenCredential
# Type aliases for better readability
Iterable = ItemPagedAll operations can raise Azure SDK exceptions. Common error handling pattern:
from azure.core.exceptions import HttpResponseError
try:
cluster = client.clusters.get(
resource_group_name="my-rg",
cluster_name="my-cluster"
)
except HttpResponseError as e:
print(f"Request failed: {e.status_code} - {e.message}")Long-running operations return LROPoller objects that can be used to track progress and handle failures:
try:
poller = client.clusters.begin_create_or_update(
resource_group_name="my-rg",
cluster_name="my-cluster",
parameters=cluster_params
)
result = poller.result() # Blocks until completion
except HttpResponseError as e:
print(f"Operation failed: {e}")