Microsoft Azure Kusto Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Management of data ingestion connections from various Azure services including Event Hub, IoT Hub, Event Grid, and Cosmos DB for streaming data into Kusto databases. Data connections enable automated data ingestion pipelines from external sources.
Core lifecycle operations for managing data connections that ingest data into Kusto databases.
def get(
resource_group_name: str,
cluster_name: str,
database_name: str,
data_connection_name: str,
**kwargs
) -> DataConnection:
"""
Get a data connection in a database.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
- data_connection_name: Name of the data connection
Returns:
DataConnection object (EventHubDataConnection, IotHubDataConnection, etc.)
"""
def begin_create_or_update(
resource_group_name: str,
cluster_name: str,
database_name: str,
data_connection_name: str,
parameters: DataConnection,
**kwargs
) -> LROPoller[DataConnection]:
"""
Create or update a data connection in a database.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
- data_connection_name: Name of the data connection
- parameters: DataConnection object with configuration
Returns:
LROPoller for the long-running operation returning DataConnection
"""
def begin_update(
resource_group_name: str,
cluster_name: str,
database_name: str,
data_connection_name: str,
parameters: DataConnection,
**kwargs
) -> LROPoller[DataConnection]:
"""
Update a data connection in a database.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
- data_connection_name: Name of the data connection
- parameters: DataConnection object with updates
Returns:
LROPoller for the long-running operation returning updated DataConnection
"""
def begin_delete(
resource_group_name: str,
cluster_name: str,
database_name: str,
data_connection_name: str,
**kwargs
) -> LROPoller[None]:
"""
Delete a data connection from a database.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
- data_connection_name: Name of the data connection
Returns:
LROPoller for the long-running delete operation
"""Operations to discover and list data connections within databases.
def list_by_database(
resource_group_name: str,
cluster_name: str,
database_name: str,
**kwargs
) -> Iterable[DataConnection]:
"""
List data connections in a database.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
Returns:
Iterable of DataConnection objects
"""Operations to validate data connection configurations before deployment.
def begin_data_connection_validation(
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: DataConnectionValidation,
**kwargs
) -> LROPoller[DataConnectionValidationListResult]:
"""
Validate a data connection configuration.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
- parameters: DataConnectionValidation with configuration to validate
Returns:
LROPoller returning DataConnectionValidationListResult with validation results
"""
def check_name_availability(
resource_group_name: str,
cluster_name: str,
database_name: str,
data_connection_name: DataConnectionCheckNameRequest,
**kwargs
) -> CheckNameResult:
"""
Check if a data connection name is available in the database.
Parameters:
- resource_group_name: Name of the resource group
- cluster_name: Name of the Kusto cluster
- database_name: Name of the database
- data_connection_name: DataConnectionCheckNameRequest with name to check
Returns:
CheckNameResult indicating availability and any issues
"""from azure.mgmt.kusto.models import (
EventHubDataConnection,
EventHubDataFormat,
Compression
)
# Configure Event Hub data connection
event_hub_connection = EventHubDataConnection(
location="East US",
event_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.EventHub/namespaces/eventhub-ns/eventhubs/my-eventhub",
consumer_group="$Default",
table_name="MyTable",
mapping_rule_name="MyMapping",
data_format=EventHubDataFormat.JSON,
compression=Compression.NONE,
event_system_properties=["x-opt-enqueued-time", "x-opt-sequence-number"],
managed_identity_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my-identity"
)
# Create the data connection
poller = client.data_connections.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-cluster",
database_name="my-database",
data_connection_name="my-eventhub-connection",
parameters=event_hub_connection
)
connection = poller.result()
print(f"Event Hub connection created: {connection.name}")from azure.mgmt.kusto.models import (
IotHubDataConnection,
IotHubDataFormat
)
# Configure IoT Hub data connection
iot_hub_connection = IotHubDataConnection(
location="East US",
iot_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Devices/IotHubs/my-iothub",
consumer_group="$Default",
table_name="IoTData",
mapping_rule_name="IoTMapping",
data_format=IotHubDataFormat.JSON,
event_system_properties=["iothub-connection-device-id", "iothub-enqueuedtime"],
shared_access_policy_name="iothubowner"
)
# Create the data connection
poller = client.data_connections.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-cluster",
database_name="my-database",
data_connection_name="my-iothub-connection",
parameters=iot_hub_connection
)
connection = poller.result()
print(f"IoT Hub connection created: {connection.name}")from azure.mgmt.kusto.models import (
EventGridDataConnection,
EventGridDataFormat,
BlobStorageEventType
)
# Configure Event Grid data connection
event_grid_connection = EventGridDataConnection(
location="East US",
storage_account_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Storage/storageAccounts/mystorageaccount",
event_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.EventHub/namespaces/eventhub-ns/eventhubs/eventgrid-hub",
consumer_group="$Default",
table_name="BlobEvents",
mapping_rule_name="BlobMapping",
data_format=EventGridDataFormat.JSON,
ignore_first_record=False,
blob_storage_event_type=BlobStorageEventType.MICROSOFT_STORAGE_BLOB_CREATED,
managed_identity_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my-identity"
)
# Create the data connection
poller = client.data_connections.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-cluster",
database_name="my-database",
data_connection_name="my-eventgrid-connection",
parameters=event_grid_connection
)
connection = poller.result()
print(f"Event Grid connection created: {connection.name}")from azure.mgmt.kusto.models import DataConnectionValidation
# Create validation request
validation_request = DataConnectionValidation(
data_connection_name="my-test-connection",
properties=event_hub_connection # Use the connection object from above
)
# Validate the configuration
poller = client.data_connections.begin_data_connection_validation(
resource_group_name="my-resource-group",
cluster_name="my-cluster",
database_name="my-database",
parameters=validation_request
)
validation_result = poller.result()
for result in validation_result.value:
if result.error_message:
print(f"Validation error: {result.error_message}")
else:
print("Validation passed")class DataConnection:
"""Base class for data connection resources."""
# Read-only properties
id: str # Resource ID
name: str # Data connection name
type: str # Resource type
# Common properties
location: str # Azure region
kind: DataConnectionKind # Connection type
class EventHubDataConnection(DataConnection):
"""Event Hub data connection for streaming ingestion."""
kind: DataConnectionKind = DataConnectionKind.EVENT_HUB
# Event Hub configuration
event_hub_resource_id: str # Event Hub resource ID
consumer_group: str # Consumer group name
table_name: str # Target table name
mapping_rule_name: str # Data mapping rule name
data_format: EventHubDataFormat # Data format
event_system_properties: List[str] # System properties to include
compression: Compression # Data compression type
provisioning_state: ProvisioningState # Provisioning state
managed_identity_resource_id: str # Managed identity for authentication
database_routing: DatabaseRouting # Database routing mode
class IotHubDataConnection(DataConnection):
"""IoT Hub data connection for device telemetry ingestion."""
kind: DataConnectionKind = DataConnectionKind.IOT_HUB
# IoT Hub configuration
iot_hub_resource_id: str # IoT Hub resource ID
consumer_group: str # Consumer group name
table_name: str # Target table name
mapping_rule_name: str # Data mapping rule name
data_format: IotHubDataFormat # Data format
event_system_properties: List[str] # System properties to include
shared_access_policy_name: str # Shared access policy name
provisioning_state: ProvisioningState # Provisioning state
database_routing: DatabaseRouting # Database routing mode
class EventGridDataConnection(DataConnection):
"""Event Grid data connection for blob storage events."""
kind: DataConnectionKind = DataConnectionKind.EVENT_GRID
# Event Grid configuration
storage_account_resource_id: str # Storage account resource ID
event_hub_resource_id: str # Event Hub resource ID for events
consumer_group: str # Consumer group name
table_name: str # Target table name
mapping_rule_name: str # Data mapping rule name
data_format: EventGridDataFormat # Data format
ignore_first_record: bool # Skip first record (headers)
blob_storage_event_type: BlobStorageEventType # Event type to process
managed_identity_resource_id: str # Managed identity for authentication
provisioning_state: ProvisioningState # Provisioning state
database_routing: DatabaseRouting # Database routing mode
class CosmosDbDataConnection(DataConnection):
"""Cosmos DB data connection for change feed ingestion."""
kind: DataConnectionKind = DataConnectionKind.COSMOS_DB
# Cosmos DB configuration
cosmos_db_account_resource_id: str # Cosmos DB account resource ID
cosmos_db_database: str # Cosmos DB database name
cosmos_db_container: str # Cosmos DB container name
table_name: str # Target table name
mapping_rule_name: str # Data mapping rule name
managed_identity_resource_id: str # Managed identity for authentication
provisioning_state: ProvisioningState # Provisioning state
retrieval_start_date: datetime # Start date for data retrieval
class DataConnectionValidation:
"""Request to validate a data connection configuration."""
data_connection_name: str # Name of the data connection
properties: DataConnection # Data connection configuration to validate
class DataConnectionValidationResult:
"""Result of data connection validation."""
error_message: str # Error message if validation failed
class DataConnectionValidationListResult:
"""List of data connection validation results."""
value: List[DataConnectionValidationResult] # Validation results
class DataConnectionCheckNameRequest:
"""Request to check data connection name availability."""
name: str # Name to check
type: str # Resource type
from enum import Enum
class DataConnectionKind(str, Enum):
"""Data connection type values."""
EVENT_HUB = "EventHub"
IOT_HUB = "IotHub"
EVENT_GRID = "EventGrid"
COSMOS_DB = "CosmosDb"
class EventHubDataFormat(str, Enum):
"""Event Hub data format values."""
MULTIJSON = "MULTIJSON"
JSON = "JSON"
CSV = "CSV"
TSV = "TSV"
SCSV = "SCSV"
SOHSV = "SOHSV"
PSV = "PSV"
TXT = "TXT"
RAW = "RAW"
SINGLEJSON = "SINGLEJSON"
AVRO = "AVRO"
TSVE = "TSVE"
PARQUET = "PARQUET"
ORC = "ORC"
APACHEAVRO = "APACHEAVRO"
W3CLOGFILE = "W3CLOGFILE"
class IotHubDataFormat(str, Enum):
"""IoT Hub data format values."""
MULTIJSON = "MULTIJSON"
JSON = "JSON"
CSV = "CSV"
TSV = "TSV"
SCSV = "SCSV"
SOHSV = "SOHSV"
PSV = "PSV"
TXT = "TXT"
RAW = "RAW"
SINGLEJSON = "SINGLEJSON"
AVRO = "AVRO"
TSVE = "TSVE"
PARQUET = "PARQUET"
ORC = "ORC"
APACHEAVRO = "APACHEAVRO"
W3CLOGFILE = "W3CLOGFILE"
class EventGridDataFormat(str, Enum):
"""Event Grid data format values."""
MULTIJSON = "MULTIJSON"
JSON = "JSON"
CSV = "CSV"
TSV = "TSV"
SCSV = "SCSV"
SOHSV = "SOHSV"
PSV = "PSV"
TXT = "TXT"
RAW = "RAW"
SINGLEJSON = "SINGLEJSON"
AVRO = "AVRO"
TSVE = "TSVE"
PARQUET = "PARQUET"
ORC = "ORC"
APACHEAVRO = "APACHEAVRO"
W3CLOGFILE = "W3CLOGFILE"
class Compression(str, Enum):
"""Data compression type values."""
NONE = "None"
GZIP = "GZip"
class BlobStorageEventType(str, Enum):
"""Blob storage event type values."""
MICROSOFT_STORAGE_BLOB_CREATED = "Microsoft.Storage.BlobCreated"
class DatabaseRouting(str, Enum):
"""Database routing mode values."""
SINGLE = "Single"
MULTI = "Multi"Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-kusto