or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-management.mddata-connections.mddatabase-management.mdindex.mdmonitoring-operations.mdscripts-extensions.mdsecurity-management.md
tile.json

tessl/pypi-azure-mgmt-kusto

Microsoft Azure Kusto Management Client Library for Python

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/azure-mgmt-kusto@3.4.x

To install, run

npx @tessl/cli install tessl/pypi-azure-mgmt-kusto@3.4.0

index.mddocs/

Azure Kusto Management Client

Microsoft Azure Kusto Management Client Library for Python provides comprehensive management of Azure Data Explorer (Kusto) resources through the Azure Resource Manager API. This library enables developers to programmatically create, configure, and manage Kusto clusters, databases, and related resources including database principal assignments, private endpoints, managed private endpoints, and data connections.

Package Information

  • Package Name: azure-mgmt-kusto
  • Language: Python
  • Installation: pip install azure-mgmt-kusto azure-identity
  • Python Requirements: >=3.8
  • API Version: 2024-04-13

Core Imports

from azure.mgmt.kusto import KustoManagementClient
from azure.identity import DefaultAzureCredential

For async operations:

from azure.mgmt.kusto.aio import KustoManagementClient
from azure.identity.aio import DefaultAzureCredential

Basic Usage

import os
from azure.identity import DefaultAzureCredential
from azure.mgmt.kusto import KustoManagementClient

# Authentication using environment variables:
# AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, AZURE_SUBSCRIPTION_ID
credential = DefaultAzureCredential()
subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID")

# Create the management client
client = KustoManagementClient(
    credential=credential,
    subscription_id=subscription_id
)

# List all Kusto clusters in the subscription
clusters = list(client.clusters.list())
print(f"Found {len(clusters)} Kusto clusters")

# Get a specific cluster
cluster = client.clusters.get(
    resource_group_name="my-resource-group",
    cluster_name="my-kusto-cluster"
)
print(f"Cluster URI: {cluster.uri}")

# List databases in a cluster
databases = list(client.databases.list_by_cluster(
    resource_group_name="my-resource-group",
    cluster_name="my-kusto-cluster"
))
print(f"Found {len(databases)} databases")

# Close the client when done
client.close()

Architecture

The azure-mgmt-kusto library follows Azure SDK design patterns with a layered architecture:

  • KustoManagementClient: Main entry point providing access to all operations
  • Operations Classes: 16 specialized operation groups handling different resource types
  • Model Classes: 93+ data models representing Azure resources and request/response objects
  • Enumeration Classes: 41+ enums defining valid values for various properties
  • Authentication: Integration with Azure Identity for credential management
  • Long-Running Operations: Built-in polling for async Azure operations via LROPoller

Both synchronous and asynchronous versions are available, with the async client providing identical functionality using async/await patterns.

Capabilities

Cluster Management

Complete lifecycle management of Kusto clusters including creation, configuration, scaling, start/stop operations, migration, network diagnostics, and SKU management.

class ClustersOperations:
    def get(self, resource_group_name: str, cluster_name: str, **kwargs) -> Cluster: ...
    def begin_create_or_update(self, resource_group_name: str, cluster_name: str, parameters: Cluster, **kwargs) -> LROPoller[Cluster]: ...
    def begin_delete(self, resource_group_name: str, cluster_name: str, **kwargs) -> LROPoller[None]: ...
    def list(self, **kwargs) -> Iterable[Cluster]: ...
    def begin_start(self, resource_group_name: str, cluster_name: str, **kwargs) -> LROPoller[None]: ...
    def begin_stop(self, resource_group_name: str, cluster_name: str, **kwargs) -> LROPoller[None]: ...

Cluster Management

Database Management

Database lifecycle management within Kusto clusters including creation, configuration, principal management, and database sharing through attached database configurations.

class DatabasesOperations:
    def get(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs) -> Database: ...
    def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, parameters: Database, **kwargs) -> LROPoller[Database]: ...
    def list_by_cluster(self, resource_group_name: str, cluster_name: str, **kwargs) -> Iterable[Database]: ...
    def add_principals(self, resource_group_name: str, cluster_name: str, database_name: str, database_principals_to_add: DatabasePrincipalListRequest, **kwargs) -> DatabasePrincipalListResult: ...

Database Management

Data Connections

Management of data ingestion connections from various Azure services including Event Hub, IoT Hub, Event Grid, and Cosmos DB for streaming data into Kusto databases.

class DataConnectionsOperations:
    def get(self, resource_group_name: str, cluster_name: str, database_name: str, data_connection_name: str, **kwargs) -> DataConnection: ...
    def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, data_connection_name: str, parameters: DataConnection, **kwargs) -> LROPoller[DataConnection]: ...
    def list_by_database(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs) -> Iterable[DataConnection]: ...
    def begin_data_connection_validation(self, resource_group_name: str, cluster_name: str, database_name: str, parameters: DataConnectionValidation, **kwargs) -> LROPoller[DataConnectionValidationListResult]: ...

Data Connections

Security Management

Comprehensive security management including cluster and database-level principal assignments, private endpoints, managed private endpoints, and private link resources.

class ClusterPrincipalAssignmentsOperations:
    def get(self, resource_group_name: str, cluster_name: str, principal_assignment_name: str, **kwargs) -> ClusterPrincipalAssignment: ...
    def begin_create_or_update(self, resource_group_name: str, cluster_name: str, principal_assignment_name: str, parameters: ClusterPrincipalAssignment, **kwargs) -> LROPoller[ClusterPrincipalAssignment]: ...
    def list(self, resource_group_name: str, cluster_name: str, **kwargs) -> Iterable[ClusterPrincipalAssignment]: ...

class DatabasePrincipalAssignmentsOperations:
    def get(self, resource_group_name: str, cluster_name: str, database_name: str, principal_assignment_name: str, **kwargs) -> DatabasePrincipalAssignment: ...
    def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, principal_assignment_name: str, parameters: DatabasePrincipalAssignment, **kwargs) -> LROPoller[DatabasePrincipalAssignment]: ...

Security Management

Scripts and Extensions

Management of KQL scripts for database automation and language extensions for custom query capabilities including Python, R, and other supported languages.

class ScriptsOperations:
    def get(self, resource_group_name: str, cluster_name: str, database_name: str, script_name: str, **kwargs) -> Script: ...
    def begin_create_or_update(self, resource_group_name: str, cluster_name: str, database_name: str, script_name: str, parameters: Script, **kwargs) -> LROPoller[Script]: ...
    def list_by_database(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs) -> Iterable[Script]: ...

Scripts and Extensions

Monitoring and Operations

Monitoring capabilities including operation results tracking, SKU information retrieval, and sandbox custom image management for specialized compute scenarios.

class Operations:
    def list(self, **kwargs) -> Iterable[Operation]: ...

class OperationsResultsOperations:
    def get(self, location: str, operation_id: str, **kwargs) -> OperationResult: ...

class SkusOperations:
    def list(self, **kwargs) -> Iterable[SkuDescription]: ...

Monitoring and Operations

Core Types

class KustoManagementClient:
    """Main management client for Azure Kusto services."""
    def __init__(
        self,
        credential: TokenCredential,
        subscription_id: str,
        base_url: str = "https://management.azure.com",
        **kwargs
    ): ...
    def close(self) -> None: ...

class Cluster:
    """Represents a Kusto cluster resource."""
    id: str  # Read-only resource ID
    name: str  # Read-only cluster name
    type: str  # Read-only resource type
    location: str  # Resource location
    tags: Dict[str, str]  # Resource tags
    sku: AzureSku  # Cluster SKU configuration
    zones: List[str]  # Availability zones
    identity: Identity  # Managed identity
    state: State  # Cluster state
    provisioning_state: ProvisioningState  # Provisioning state
    uri: str  # Cluster URI
    data_ingestion_uri: str  # Data ingestion URI

class Database:
    """Base class for database resources."""
    id: str  # Read-only resource ID
    name: str  # Read-only database name
    type: str  # Read-only resource type
    location: str  # Resource location
    kind: Kind  # Database kind (ReadWrite or ReadOnlyFollowing)

class DataConnection:
    """Base class for data connection resources."""
    id: str  # Read-only resource ID
    name: str  # Read-only data connection name  
    type: str  # Read-only resource type
    location: str  # Resource location
    kind: DataConnectionKind  # Data connection type

# Additional supporting types
class CheckNameRequest:
    """Request to check resource name availability."""
    name: str  # Name to check
    type: str  # Resource type

class CheckNameResult:
    """Result of name availability check."""
    name_available: bool  # Whether name is available
    reason: str  # Reason if not available
    message: str  # Additional message

class Identity:
    """Managed identity configuration."""
    type: IdentityType  # Identity type
    principal_id: str  # Principal ID (read-only)
    tenant_id: str  # Tenant ID (read-only)
    user_assigned_identities: Dict[str, Any]  # User assigned identities

class AzureSku:
    """Azure SKU configuration."""
    name: AzureSkuName  # SKU name
    tier: AzureSkuTier  # SKU tier
    capacity: int  # Instance capacity

class VirtualNetworkConfiguration:
    """Virtual network configuration."""
    subnet_id: str  # Subnet resource ID
    engine_public_ip_id: str  # Engine public IP resource ID
    data_management_public_ip_id: str  # Data management public IP resource ID

class KeyVaultProperties:
    """Key vault configuration for encryption."""
    key_name: str  # Key name
    key_version: str  # Key version
    key_vault_uri: str  # Key vault URI
    user_identity: str  # User identity for key access

class OptimizedAutoscale:
    """Auto-scaling configuration."""
    version: int  # Configuration version
    is_enabled: bool  # Whether auto-scaling is enabled
    minimum: int  # Minimum instance count
    maximum: int  # Maximum instance count

class AcceptedAudiences:
    """Accepted audience for authentication."""
    value: str  # Audience value (GUID or URL)

class ErrorDetail:
    """Error detail information."""
    code: str  # Error code
    message: str  # Error message
    target: str  # Error target
    details: List['ErrorDetail']  # Additional error details

class OperationResult:
    """Result of a long-running operation."""
    id: str  # Operation ID
    name: str  # Operation name
    status: str  # Operation status
    start_time: datetime  # Start time
    end_time: datetime  # End time
    percent_complete: float  # Completion percentage
    error: ErrorDetail  # Error details if failed

class SkuDescription:
    """Description of available SKU."""
    resource_type: str  # Resource type
    name: str  # SKU name
    tier: str  # SKU tier
    locations: List[str]  # Available locations
    capabilities: List[Dict[str, str]]  # SKU capabilities

from enum import Enum

class State(str, Enum):
    """Cluster state values."""
    CREATING = "Creating"
    UNAVAILABLE = "Unavailable"
    RUNNING = "Running"
    DELETING = "Deleting"
    DELETED = "Deleted"
    STOPPING = "Stopping"
    STOPPED = "Stopped"
    STARTING = "Starting"
    UPDATING = "Updating"
    MIGRATED = "Migrated"

class ProvisioningState(str, Enum):
    """Provisioning state values."""
    RUNNING = "Running"
    CREATING = "Creating"
    DELETING = "Deleting"
    SUCCEEDED = "Succeeded"
    FAILED = "Failed"
    MOVING = "Moving"
    CANCELED = "Canceled"

class DataConnectionKind(str, Enum):
    """Data connection type values."""
    EVENT_HUB = "EventHub"
    IOT_HUB = "IotHub"
    EVENT_GRID = "EventGrid"
    COSMOS_DB = "CosmosDb"

class Kind(str, Enum):
    """Database kind values."""
    READ_WRITE = "ReadWrite"
    READ_ONLY_FOLLOWING = "ReadOnlyFollowing"

class AzureSkuName(str, Enum):
    """SKU name values."""
    STANDARD_D13_V2 = "Standard_D13_v2"
    STANDARD_D14_V2 = "Standard_D14_v2"
    STANDARD_L4_S = "Standard_L4s"
    STANDARD_L8_S = "Standard_L8s"
    STANDARD_L16_S = "Standard_L16s"

class AzureSkuTier(str, Enum):
    """SKU tier values."""
    BASIC = "Basic"
    STANDARD = "Standard"

class IdentityType(str, Enum):
    """Identity type values."""
    NONE = "None"
    SYSTEM_ASSIGNED = "SystemAssigned"
    USER_ASSIGNED = "UserAssigned"
    SYSTEM_ASSIGNED_USER_ASSIGNED = "SystemAssigned, UserAssigned"

from typing import Union, Dict, List, Any
from datetime import datetime
from azure.core.polling import LROPoller  
from azure.core.paging import ItemPaged
from azure.core.credentials import TokenCredential

# Type aliases for better readability
Iterable = ItemPaged

Error Handling

All operations can raise Azure SDK exceptions. Common error handling pattern:

from azure.core.exceptions import HttpResponseError

try:
    cluster = client.clusters.get(
        resource_group_name="my-rg",
        cluster_name="my-cluster"
    )
except HttpResponseError as e:
    print(f"Request failed: {e.status_code} - {e.message}")

Long-running operations return LROPoller objects that can be used to track progress and handle failures:

try:
    poller = client.clusters.begin_create_or_update(
        resource_group_name="my-rg",
        cluster_name="my-cluster", 
        parameters=cluster_params
    )
    result = poller.result()  # Blocks until completion
except HttpResponseError as e:
    print(f"Operation failed: {e}")