Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Azure Data Explorer (Kusto) integration for executing KQL queries and managing connections to Azure Data Explorer clusters. Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data.
Core hook for connecting to and interacting with Azure Data Explorer clusters using KQL (Kusto Query Language).
class AzureDataExplorerHook(BaseHook):
"""
Hook for Azure Data Explorer (Kusto) operations.
Provides authentication and connection management for executing KQL queries
against Azure Data Explorer clusters.
"""
def get_conn(self) -> KustoClient: ...
def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSet: ...Operators for executing KQL queries against Azure Data Explorer databases with support for templated queries and query options.
class AzureDataExplorerQueryOperator(BaseOperator):
"""
Operator for querying Azure Data Explorer (Kusto).
Parameters:
- query: KQL query to run (templated)
- database: Database to run the query on (templated)
- options: Optional query options for ClientRequestProperties
- azure_data_explorer_conn_id: Connection ID for Azure Data Explorer
"""
def __init__(
self,
*,
query: str,
database: str,
options: dict | None = None,
azure_data_explorer_conn_id: str = "azure_data_explorer_default",
**kwargs,
): ...
def execute(self, context: Context) -> KustoResultTable | str: ...from airflow import DAG
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
from datetime import datetime, timedelta
dag = DAG(
'adx_query_example',
default_args={'owner': 'data-team'},
description='Execute KQL query in Azure Data Explorer',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Execute a KQL query
query_task = AzureDataExplorerQueryOperator(
task_id='run_kql_query',
query="""
StormEvents
| where State == "TEXAS"
| summarize count() by EventType
| order by count_ desc
""",
database='Samples',
azure_data_explorer_conn_id='adx_connection',
dag=dag
)# Query with custom client request properties
advanced_query = AzureDataExplorerQueryOperator(
task_id='advanced_kql_query',
query="""
let start_time = datetime({{ ds }});
let end_time = start_time + 1d;
MyTable
| where Timestamp between (start_time .. end_time)
| summarize avg(Value) by bin(Timestamp, 1h)
""",
database='Production',
options={
'query_timeout': '00:10:00', # 10 minutes timeout
'max_memory_consumption_per_query_per_node': 68719476736, # 64GB
'truncationmaxrecords': 1000000
},
azure_data_explorer_conn_id='adx_production',
dag=dag
)Azure Data Explorer supports multiple authentication methods through Airflow connections:
Connection configuration requires the cluster URI and database name in the connection extras.
# Query response data structure
class KustoResponseDataSet:
"""Response object containing query results and metadata."""
primary_results: list[KustoResultTable]
class KustoResultTable:
"""Table containing query result data."""
rows: list[list[Any]]
columns: list[dict[str, str]]
# Client request properties for query options
class ClientRequestProperties:
"""Configuration options for KQL query execution."""
def set_option(self, option_name: str, option_value: Any) -> None: ...Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure