CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

azure-data-explorer.mddocs/

Azure Data Explorer (ADX)

Comprehensive Azure Data Explorer (Kusto) integration for executing KQL queries and managing connections to Azure Data Explorer clusters. Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data.

Capabilities

Azure Data Explorer Hook

Core hook for connecting to and interacting with Azure Data Explorer clusters using KQL (Kusto Query Language).

class AzureDataExplorerHook(BaseHook):
    """
    Hook for Azure Data Explorer (Kusto) operations.
    
    Provides authentication and connection management for executing KQL queries
    against Azure Data Explorer clusters.
    """
    
    def get_conn(self) -> KustoClient: ...
    def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSet: ...

Query Execution Operations

Operators for executing KQL queries against Azure Data Explorer databases with support for templated queries and query options.

class AzureDataExplorerQueryOperator(BaseOperator):
    """
    Operator for querying Azure Data Explorer (Kusto).
    
    Parameters:
    - query: KQL query to run (templated)
    - database: Database to run the query on (templated)  
    - options: Optional query options for ClientRequestProperties
    - azure_data_explorer_conn_id: Connection ID for Azure Data Explorer
    """
    
    def __init__(
        self,
        *,
        query: str,
        database: str,
        options: dict | None = None,
        azure_data_explorer_conn_id: str = "azure_data_explorer_default",
        **kwargs,
    ): ...
    
    def execute(self, context: Context) -> KustoResultTable | str: ...

Usage Examples

Basic KQL Query Execution

from airflow import DAG
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
from datetime import datetime, timedelta

dag = DAG(
    'adx_query_example',
    default_args={'owner': 'data-team'},
    description='Execute KQL query in Azure Data Explorer',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Execute a KQL query
query_task = AzureDataExplorerQueryOperator(
    task_id='run_kql_query',
    query="""
    StormEvents
    | where State == "TEXAS" 
    | summarize count() by EventType
    | order by count_ desc
    """,
    database='Samples',
    azure_data_explorer_conn_id='adx_connection',
    dag=dag
)

Advanced Query with Options

# Query with custom client request properties
advanced_query = AzureDataExplorerQueryOperator(
    task_id='advanced_kql_query',
    query="""
    let start_time = datetime({{ ds }});
    let end_time = start_time + 1d;
    MyTable
    | where Timestamp between (start_time .. end_time)
    | summarize avg(Value) by bin(Timestamp, 1h)
    """,
    database='Production',
    options={
        'query_timeout': '00:10:00',  # 10 minutes timeout
        'max_memory_consumption_per_query_per_node': 68719476736,  # 64GB
        'truncationmaxrecords': 1000000
    },
    azure_data_explorer_conn_id='adx_production',
    dag=dag
)

Authentication and Connection

Azure Data Explorer supports multiple authentication methods through Airflow connections:

  • Service Principal: Using client ID, client secret, and tenant ID
  • Managed Identity: For Azure-hosted Airflow instances
  • DefaultAzureCredential: Azure SDK default credential chain
  • Device Code: Interactive authentication for development

Connection configuration requires the cluster URI and database name in the connection extras.

Types

# Query response data structure
class KustoResponseDataSet:
    """Response object containing query results and metadata."""
    primary_results: list[KustoResultTable]

class KustoResultTable:
    """Table containing query result data."""
    rows: list[list[Any]]
    columns: list[dict[str, str]]
    
# Client request properties for query options
class ClientRequestProperties:
    """Configuration options for KQL query execution."""
    def set_option(self, option_name: str, option_value: Any) -> None: ...

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json