0
# Azure Data Explorer (ADX)
1
2
Comprehensive Azure Data Explorer (Kusto) integration for executing KQL queries and managing connections to Azure Data Explorer clusters. Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data.
3
4
## Capabilities
5
6
### Azure Data Explorer Hook
7
8
Core hook for connecting to and interacting with Azure Data Explorer clusters using KQL (Kusto Query Language).
9
10
```python { .api }
11
class AzureDataExplorerHook(BaseHook):
12
"""
13
Hook for Azure Data Explorer (Kusto) operations.
14
15
Provides authentication and connection management for executing KQL queries
16
against Azure Data Explorer clusters.
17
"""
18
19
def get_conn(self) -> KustoClient: ...
20
def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSet: ...
21
```
22
23
### Query Execution Operations
24
25
Operators for executing KQL queries against Azure Data Explorer databases with support for templated queries and query options.
26
27
```python { .api }
28
class AzureDataExplorerQueryOperator(BaseOperator):
29
"""
30
Operator for querying Azure Data Explorer (Kusto).
31
32
Parameters:
33
- query: KQL query to run (templated)
34
- database: Database to run the query on (templated)
35
- options: Optional query options for ClientRequestProperties
36
- azure_data_explorer_conn_id: Connection ID for Azure Data Explorer
37
"""
38
39
def __init__(
40
self,
41
*,
42
query: str,
43
database: str,
44
options: dict | None = None,
45
azure_data_explorer_conn_id: str = "azure_data_explorer_default",
46
**kwargs,
47
): ...
48
49
def execute(self, context: Context) -> KustoResultTable | str: ...
50
```
51
52
## Usage Examples
53
54
### Basic KQL Query Execution
55
56
```python
57
from airflow import DAG
58
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
59
from datetime import datetime, timedelta
60
61
dag = DAG(
62
'adx_query_example',
63
default_args={'owner': 'data-team'},
64
description='Execute KQL query in Azure Data Explorer',
65
schedule_interval=timedelta(days=1),
66
start_date=datetime(2024, 1, 1),
67
catchup=False
68
)
69
70
# Execute a KQL query
71
query_task = AzureDataExplorerQueryOperator(
72
task_id='run_kql_query',
73
query="""
74
StormEvents
75
| where State == "TEXAS"
76
| summarize count() by EventType
77
| order by count_ desc
78
""",
79
database='Samples',
80
azure_data_explorer_conn_id='adx_connection',
81
dag=dag
82
)
83
```
84
85
### Advanced Query with Options
86
87
```python
88
# Query with custom client request properties
89
advanced_query = AzureDataExplorerQueryOperator(
90
task_id='advanced_kql_query',
91
query="""
92
let start_time = datetime({{ ds }});
93
let end_time = start_time + 1d;
94
MyTable
95
| where Timestamp between (start_time .. end_time)
96
| summarize avg(Value) by bin(Timestamp, 1h)
97
""",
98
database='Production',
99
options={
100
'query_timeout': '00:10:00', # 10 minutes timeout
101
'max_memory_consumption_per_query_per_node': 68719476736, # 64GB
102
'truncationmaxrecords': 1000000
103
},
104
azure_data_explorer_conn_id='adx_production',
105
dag=dag
106
)
107
```
108
109
## Authentication and Connection
110
111
Azure Data Explorer supports multiple authentication methods through Airflow connections:
112
113
- **Service Principal**: Using client ID, client secret, and tenant ID
114
- **Managed Identity**: For Azure-hosted Airflow instances
115
- **DefaultAzureCredential**: Azure SDK default credential chain
116
- **Device Code**: Interactive authentication for development
117
118
Connection configuration requires the cluster URI and database name in the connection extras.
119
120
## Types
121
122
```python { .api }
123
# Query response data structure
124
class KustoResponseDataSet:
125
"""Response object containing query results and metadata."""
126
primary_results: list[KustoResultTable]
127
128
class KustoResultTable:
129
"""Table containing query result data."""
130
rows: list[list[Any]]
131
columns: list[dict[str, str]]
132
133
# Client request properties for query options
134
class ClientRequestProperties:
135
"""Configuration options for KQL query execution."""
136
def set_option(self, option_name: str, option_value: Any) -> None: ...
137
```