CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-kusto

Microsoft Azure Kusto Management Client Library for Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-connections.mddocs/

Data Connections

Management of data ingestion connections from various Azure services including Event Hub, IoT Hub, Event Grid, and Cosmos DB for streaming data into Kusto databases. Data connections enable automated data ingestion pipelines from external sources.

Capabilities

Data Connection CRUD Operations

Core lifecycle operations for managing data connections that ingest data into Kusto databases.

def get(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    data_connection_name: str,
    **kwargs
) -> DataConnection:
    """
    Get a data connection in a database.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    - data_connection_name: Name of the data connection
    
    Returns:
    DataConnection object (EventHubDataConnection, IotHubDataConnection, etc.)
    """

def begin_create_or_update(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    data_connection_name: str,
    parameters: DataConnection,
    **kwargs
) -> LROPoller[DataConnection]:
    """
    Create or update a data connection in a database.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    - data_connection_name: Name of the data connection
    - parameters: DataConnection object with configuration
    
    Returns:
    LROPoller for the long-running operation returning DataConnection
    """

def begin_update(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    data_connection_name: str,
    parameters: DataConnection,
    **kwargs
) -> LROPoller[DataConnection]:
    """
    Update a data connection in a database.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    - data_connection_name: Name of the data connection
    - parameters: DataConnection object with updates
    
    Returns:
    LROPoller for the long-running operation returning updated DataConnection
    """

def begin_delete(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    data_connection_name: str,
    **kwargs
) -> LROPoller[None]:
    """
    Delete a data connection from a database.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    - data_connection_name: Name of the data connection
    
    Returns:
    LROPoller for the long-running delete operation
    """

Data Connection Listing Operations

Operations to discover and list data connections within databases.

def list_by_database(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    **kwargs
) -> Iterable[DataConnection]:
    """
    List data connections in a database.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    
    Returns:
    Iterable of DataConnection objects
    """

Data Connection Validation

Operations to validate data connection configurations before deployment.

def begin_data_connection_validation(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    parameters: DataConnectionValidation,
    **kwargs
) -> LROPoller[DataConnectionValidationListResult]:
    """
    Validate a data connection configuration.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    - parameters: DataConnectionValidation with configuration to validate
    
    Returns:
    LROPoller returning DataConnectionValidationListResult with validation results
    """

def check_name_availability(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    data_connection_name: DataConnectionCheckNameRequest,
    **kwargs
) -> CheckNameResult:
    """
    Check if a data connection name is available in the database.
    
    Parameters:
    - resource_group_name: Name of the resource group
    - cluster_name: Name of the Kusto cluster
    - database_name: Name of the database
    - data_connection_name: DataConnectionCheckNameRequest with name to check
    
    Returns:
    CheckNameResult indicating availability and any issues
    """

Usage Examples

Creating an Event Hub Data Connection

from azure.mgmt.kusto.models import (
    EventHubDataConnection,
    EventHubDataFormat,
    Compression
)

# Configure Event Hub data connection
event_hub_connection = EventHubDataConnection(
    location="East US",
    event_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.EventHub/namespaces/eventhub-ns/eventhubs/my-eventhub",
    consumer_group="$Default",
    table_name="MyTable",
    mapping_rule_name="MyMapping",
    data_format=EventHubDataFormat.JSON,
    compression=Compression.NONE,
    event_system_properties=["x-opt-enqueued-time", "x-opt-sequence-number"],
    managed_identity_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my-identity"
)

# Create the data connection
poller = client.data_connections.begin_create_or_update(
    resource_group_name="my-resource-group",
    cluster_name="my-cluster",
    database_name="my-database",
    data_connection_name="my-eventhub-connection",
    parameters=event_hub_connection
)

connection = poller.result()
print(f"Event Hub connection created: {connection.name}")

Creating an IoT Hub Data Connection

from azure.mgmt.kusto.models import (
    IotHubDataConnection,
    IotHubDataFormat
)

# Configure IoT Hub data connection
iot_hub_connection = IotHubDataConnection(
    location="East US",
    iot_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Devices/IotHubs/my-iothub",
    consumer_group="$Default",
    table_name="IoTData",
    mapping_rule_name="IoTMapping",
    data_format=IotHubDataFormat.JSON,
    event_system_properties=["iothub-connection-device-id", "iothub-enqueuedtime"],
    shared_access_policy_name="iothubowner"
)

# Create the data connection
poller = client.data_connections.begin_create_or_update(
    resource_group_name="my-resource-group",
    cluster_name="my-cluster",
    database_name="my-database",
    data_connection_name="my-iothub-connection",
    parameters=iot_hub_connection
)

connection = poller.result()
print(f"IoT Hub connection created: {connection.name}")

Creating an Event Grid Data Connection

from azure.mgmt.kusto.models import (
    EventGridDataConnection,
    EventGridDataFormat,
    BlobStorageEventType
)

# Configure Event Grid data connection
event_grid_connection = EventGridDataConnection(
    location="East US",
    storage_account_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Storage/storageAccounts/mystorageaccount",
    event_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.EventHub/namespaces/eventhub-ns/eventhubs/eventgrid-hub",
    consumer_group="$Default",
    table_name="BlobEvents",
    mapping_rule_name="BlobMapping",
    data_format=EventGridDataFormat.JSON,
    ignore_first_record=False,
    blob_storage_event_type=BlobStorageEventType.MICROSOFT_STORAGE_BLOB_CREATED,
    managed_identity_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my-identity"
)

# Create the data connection
poller = client.data_connections.begin_create_or_update(
    resource_group_name="my-resource-group",
    cluster_name="my-cluster",
    database_name="my-database",
    data_connection_name="my-eventgrid-connection",
    parameters=event_grid_connection
)

connection = poller.result()
print(f"Event Grid connection created: {connection.name}")

Validating a Data Connection

from azure.mgmt.kusto.models import DataConnectionValidation

# Create validation request
validation_request = DataConnectionValidation(
    data_connection_name="my-test-connection",
    properties=event_hub_connection  # Use the connection object from above
)

# Validate the configuration
poller = client.data_connections.begin_data_connection_validation(
    resource_group_name="my-resource-group",
    cluster_name="my-cluster",
    database_name="my-database",
    parameters=validation_request
)

validation_result = poller.result()
for result in validation_result.value:
    if result.error_message:
        print(f"Validation error: {result.error_message}")
    else:
        print("Validation passed")

Key Types

class DataConnection:
    """Base class for data connection resources."""
    # Read-only properties
    id: str  # Resource ID
    name: str  # Data connection name
    type: str  # Resource type
    
    # Common properties
    location: str  # Azure region
    kind: DataConnectionKind  # Connection type

class EventHubDataConnection(DataConnection):
    """Event Hub data connection for streaming ingestion."""
    kind: DataConnectionKind = DataConnectionKind.EVENT_HUB
    
    # Event Hub configuration
    event_hub_resource_id: str  # Event Hub resource ID
    consumer_group: str  # Consumer group name
    table_name: str  # Target table name
    mapping_rule_name: str  # Data mapping rule name
    data_format: EventHubDataFormat  # Data format
    event_system_properties: List[str]  # System properties to include
    compression: Compression  # Data compression type
    provisioning_state: ProvisioningState  # Provisioning state
    managed_identity_resource_id: str  # Managed identity for authentication
    database_routing: DatabaseRouting  # Database routing mode

class IotHubDataConnection(DataConnection):
    """IoT Hub data connection for device telemetry ingestion."""
    kind: DataConnectionKind = DataConnectionKind.IOT_HUB
    
    # IoT Hub configuration
    iot_hub_resource_id: str  # IoT Hub resource ID
    consumer_group: str  # Consumer group name
    table_name: str  # Target table name
    mapping_rule_name: str  # Data mapping rule name
    data_format: IotHubDataFormat  # Data format
    event_system_properties: List[str]  # System properties to include
    shared_access_policy_name: str  # Shared access policy name
    provisioning_state: ProvisioningState  # Provisioning state
    database_routing: DatabaseRouting  # Database routing mode

class EventGridDataConnection(DataConnection):
    """Event Grid data connection for blob storage events."""
    kind: DataConnectionKind = DataConnectionKind.EVENT_GRID
    
    # Event Grid configuration
    storage_account_resource_id: str  # Storage account resource ID
    event_hub_resource_id: str  # Event Hub resource ID for events
    consumer_group: str  # Consumer group name
    table_name: str  # Target table name
    mapping_rule_name: str  # Data mapping rule name
    data_format: EventGridDataFormat  # Data format
    ignore_first_record: bool  # Skip first record (headers)
    blob_storage_event_type: BlobStorageEventType  # Event type to process
    managed_identity_resource_id: str  # Managed identity for authentication
    provisioning_state: ProvisioningState  # Provisioning state
    database_routing: DatabaseRouting  # Database routing mode

class CosmosDbDataConnection(DataConnection):
    """Cosmos DB data connection for change feed ingestion."""
    kind: DataConnectionKind = DataConnectionKind.COSMOS_DB
    
    # Cosmos DB configuration
    cosmos_db_account_resource_id: str  # Cosmos DB account resource ID
    cosmos_db_database: str  # Cosmos DB database name
    cosmos_db_container: str  # Cosmos DB container name
    table_name: str  # Target table name
    mapping_rule_name: str  # Data mapping rule name
    managed_identity_resource_id: str  # Managed identity for authentication
    provisioning_state: ProvisioningState  # Provisioning state
    retrieval_start_date: datetime  # Start date for data retrieval

class DataConnectionValidation:
    """Request to validate a data connection configuration."""
    data_connection_name: str  # Name of the data connection
    properties: DataConnection  # Data connection configuration to validate

class DataConnectionValidationResult:
    """Result of data connection validation."""
    error_message: str  # Error message if validation failed

class DataConnectionValidationListResult:
    """List of data connection validation results."""
    value: List[DataConnectionValidationResult]  # Validation results

class DataConnectionCheckNameRequest:
    """Request to check data connection name availability."""
    name: str  # Name to check
    type: str  # Resource type

from enum import Enum

class DataConnectionKind(str, Enum):
    """Data connection type values."""
    EVENT_HUB = "EventHub"
    IOT_HUB = "IotHub"
    EVENT_GRID = "EventGrid"
    COSMOS_DB = "CosmosDb"

class EventHubDataFormat(str, Enum):
    """Event Hub data format values."""
    MULTIJSON = "MULTIJSON"
    JSON = "JSON"
    CSV = "CSV"
    TSV = "TSV"
    SCSV = "SCSV"
    SOHSV = "SOHSV"
    PSV = "PSV"
    TXT = "TXT"
    RAW = "RAW"
    SINGLEJSON = "SINGLEJSON"
    AVRO = "AVRO"
    TSVE = "TSVE"
    PARQUET = "PARQUET"
    ORC = "ORC"
    APACHEAVRO = "APACHEAVRO"
    W3CLOGFILE = "W3CLOGFILE"

class IotHubDataFormat(str, Enum):
    """IoT Hub data format values."""
    MULTIJSON = "MULTIJSON"
    JSON = "JSON"
    CSV = "CSV"
    TSV = "TSV"
    SCSV = "SCSV"
    SOHSV = "SOHSV"
    PSV = "PSV"
    TXT = "TXT"
    RAW = "RAW"
    SINGLEJSON = "SINGLEJSON"
    AVRO = "AVRO"
    TSVE = "TSVE"
    PARQUET = "PARQUET"
    ORC = "ORC"
    APACHEAVRO = "APACHEAVRO"
    W3CLOGFILE = "W3CLOGFILE"

class EventGridDataFormat(str, Enum):
    """Event Grid data format values."""
    MULTIJSON = "MULTIJSON"
    JSON = "JSON"
    CSV = "CSV"
    TSV = "TSV"
    SCSV = "SCSV"
    SOHSV = "SOHSV"
    PSV = "PSV"
    TXT = "TXT"
    RAW = "RAW"
    SINGLEJSON = "SINGLEJSON"
    AVRO = "AVRO"
    TSVE = "TSVE"
    PARQUET = "PARQUET"
    ORC = "ORC"
    APACHEAVRO = "APACHEAVRO"
    W3CLOGFILE = "W3CLOGFILE"

class Compression(str, Enum):
    """Data compression type values."""
    NONE = "None"
    GZIP = "GZip"

class BlobStorageEventType(str, Enum):
    """Blob storage event type values."""
    MICROSOFT_STORAGE_BLOB_CREATED = "Microsoft.Storage.BlobCreated"

class DatabaseRouting(str, Enum):
    """Database routing mode values."""
    SINGLE = "Single"
    MULTI = "Multi"

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-kusto

docs

cluster-management.md

data-connections.md

database-management.md

index.md

monitoring-operations.md

scripts-extensions.md

security-management.md

tile.json