0
# Data Connections
1
2
Management of data ingestion connections from various Azure services including Event Hub, IoT Hub, Event Grid, and Cosmos DB for streaming data into Kusto databases. Data connections enable automated data ingestion pipelines from external sources.
3
4
## Capabilities
5
6
### Data Connection CRUD Operations
7
8
Core lifecycle operations for managing data connections that ingest data into Kusto databases.
9
10
```python { .api }
11
def get(
12
resource_group_name: str,
13
cluster_name: str,
14
database_name: str,
15
data_connection_name: str,
16
**kwargs
17
) -> DataConnection:
18
"""
19
Get a data connection in a database.
20
21
Parameters:
22
- resource_group_name: Name of the resource group
23
- cluster_name: Name of the Kusto cluster
24
- database_name: Name of the database
25
- data_connection_name: Name of the data connection
26
27
Returns:
28
DataConnection object (EventHubDataConnection, IotHubDataConnection, etc.)
29
"""
30
31
def begin_create_or_update(
32
resource_group_name: str,
33
cluster_name: str,
34
database_name: str,
35
data_connection_name: str,
36
parameters: DataConnection,
37
**kwargs
38
) -> LROPoller[DataConnection]:
39
"""
40
Create or update a data connection in a database.
41
42
Parameters:
43
- resource_group_name: Name of the resource group
44
- cluster_name: Name of the Kusto cluster
45
- database_name: Name of the database
46
- data_connection_name: Name of the data connection
47
- parameters: DataConnection object with configuration
48
49
Returns:
50
LROPoller for the long-running operation returning DataConnection
51
"""
52
53
def begin_update(
54
resource_group_name: str,
55
cluster_name: str,
56
database_name: str,
57
data_connection_name: str,
58
parameters: DataConnection,
59
**kwargs
60
) -> LROPoller[DataConnection]:
61
"""
62
Update a data connection in a database.
63
64
Parameters:
65
- resource_group_name: Name of the resource group
66
- cluster_name: Name of the Kusto cluster
67
- database_name: Name of the database
68
- data_connection_name: Name of the data connection
69
- parameters: DataConnection object with updates
70
71
Returns:
72
LROPoller for the long-running operation returning updated DataConnection
73
"""
74
75
def begin_delete(
76
resource_group_name: str,
77
cluster_name: str,
78
database_name: str,
79
data_connection_name: str,
80
**kwargs
81
) -> LROPoller[None]:
82
"""
83
Delete a data connection from a database.
84
85
Parameters:
86
- resource_group_name: Name of the resource group
87
- cluster_name: Name of the Kusto cluster
88
- database_name: Name of the database
89
- data_connection_name: Name of the data connection
90
91
Returns:
92
LROPoller for the long-running delete operation
93
"""
94
```
95
96
### Data Connection Listing Operations
97
98
Operations to discover and list data connections within databases.
99
100
```python { .api }
101
def list_by_database(
102
resource_group_name: str,
103
cluster_name: str,
104
database_name: str,
105
**kwargs
106
) -> Iterable[DataConnection]:
107
"""
108
List data connections in a database.
109
110
Parameters:
111
- resource_group_name: Name of the resource group
112
- cluster_name: Name of the Kusto cluster
113
- database_name: Name of the database
114
115
Returns:
116
Iterable of DataConnection objects
117
"""
118
```
119
120
### Data Connection Validation
121
122
Operations to validate data connection configurations before deployment.
123
124
```python { .api }
125
def begin_data_connection_validation(
126
resource_group_name: str,
127
cluster_name: str,
128
database_name: str,
129
parameters: DataConnectionValidation,
130
**kwargs
131
) -> LROPoller[DataConnectionValidationListResult]:
132
"""
133
Validate a data connection configuration.
134
135
Parameters:
136
- resource_group_name: Name of the resource group
137
- cluster_name: Name of the Kusto cluster
138
- database_name: Name of the database
139
- parameters: DataConnectionValidation with configuration to validate
140
141
Returns:
142
LROPoller returning DataConnectionValidationListResult with validation results
143
"""
144
145
def check_name_availability(
146
resource_group_name: str,
147
cluster_name: str,
148
database_name: str,
149
data_connection_name: DataConnectionCheckNameRequest,
150
**kwargs
151
) -> CheckNameResult:
152
"""
153
Check if a data connection name is available in the database.
154
155
Parameters:
156
- resource_group_name: Name of the resource group
157
- cluster_name: Name of the Kusto cluster
158
- database_name: Name of the database
159
- data_connection_name: DataConnectionCheckNameRequest with name to check
160
161
Returns:
162
CheckNameResult indicating availability and any issues
163
"""
164
```
165
166
## Usage Examples
167
168
### Creating an Event Hub Data Connection
169
170
```python
171
from azure.mgmt.kusto.models import (
172
EventHubDataConnection,
173
EventHubDataFormat,
174
Compression
175
)
176
177
# Configure Event Hub data connection
178
event_hub_connection = EventHubDataConnection(
179
location="East US",
180
event_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.EventHub/namespaces/eventhub-ns/eventhubs/my-eventhub",
181
consumer_group="$Default",
182
table_name="MyTable",
183
mapping_rule_name="MyMapping",
184
data_format=EventHubDataFormat.JSON,
185
compression=Compression.NONE,
186
event_system_properties=["x-opt-enqueued-time", "x-opt-sequence-number"],
187
managed_identity_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my-identity"
188
)
189
190
# Create the data connection
191
poller = client.data_connections.begin_create_or_update(
192
resource_group_name="my-resource-group",
193
cluster_name="my-cluster",
194
database_name="my-database",
195
data_connection_name="my-eventhub-connection",
196
parameters=event_hub_connection
197
)
198
199
connection = poller.result()
200
print(f"Event Hub connection created: {connection.name}")
201
```
202
203
### Creating an IoT Hub Data Connection
204
205
```python
206
from azure.mgmt.kusto.models import (
207
IotHubDataConnection,
208
IotHubDataFormat
209
)
210
211
# Configure IoT Hub data connection
212
iot_hub_connection = IotHubDataConnection(
213
location="East US",
214
iot_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Devices/IotHubs/my-iothub",
215
consumer_group="$Default",
216
table_name="IoTData",
217
mapping_rule_name="IoTMapping",
218
data_format=IotHubDataFormat.JSON,
219
event_system_properties=["iothub-connection-device-id", "iothub-enqueuedtime"],
220
shared_access_policy_name="iothubowner"
221
)
222
223
# Create the data connection
224
poller = client.data_connections.begin_create_or_update(
225
resource_group_name="my-resource-group",
226
cluster_name="my-cluster",
227
database_name="my-database",
228
data_connection_name="my-iothub-connection",
229
parameters=iot_hub_connection
230
)
231
232
connection = poller.result()
233
print(f"IoT Hub connection created: {connection.name}")
234
```
235
236
### Creating an Event Grid Data Connection
237
238
```python
239
from azure.mgmt.kusto.models import (
240
EventGridDataConnection,
241
EventGridDataFormat,
242
BlobStorageEventType
243
)
244
245
# Configure Event Grid data connection
246
event_grid_connection = EventGridDataConnection(
247
location="East US",
248
storage_account_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Storage/storageAccounts/mystorageaccount",
249
event_hub_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.EventHub/namespaces/eventhub-ns/eventhubs/eventgrid-hub",
250
consumer_group="$Default",
251
table_name="BlobEvents",
252
mapping_rule_name="BlobMapping",
253
data_format=EventGridDataFormat.JSON,
254
ignore_first_record=False,
255
blob_storage_event_type=BlobStorageEventType.MICROSOFT_STORAGE_BLOB_CREATED,
256
managed_identity_resource_id="/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my-identity"
257
)
258
259
# Create the data connection
260
poller = client.data_connections.begin_create_or_update(
261
resource_group_name="my-resource-group",
262
cluster_name="my-cluster",
263
database_name="my-database",
264
data_connection_name="my-eventgrid-connection",
265
parameters=event_grid_connection
266
)
267
268
connection = poller.result()
269
print(f"Event Grid connection created: {connection.name}")
270
```
271
272
### Validating a Data Connection
273
274
```python
275
from azure.mgmt.kusto.models import DataConnectionValidation
276
277
# Create validation request
278
validation_request = DataConnectionValidation(
279
data_connection_name="my-test-connection",
280
properties=event_hub_connection # Use the connection object from above
281
)
282
283
# Validate the configuration
284
poller = client.data_connections.begin_data_connection_validation(
285
resource_group_name="my-resource-group",
286
cluster_name="my-cluster",
287
database_name="my-database",
288
parameters=validation_request
289
)
290
291
validation_result = poller.result()
292
for result in validation_result.value:
293
if result.error_message:
294
print(f"Validation error: {result.error_message}")
295
else:
296
print("Validation passed")
297
```
298
299
## Key Types
300
301
```python { .api }
302
class DataConnection:
303
"""Base class for data connection resources."""
304
# Read-only properties
305
id: str # Resource ID
306
name: str # Data connection name
307
type: str # Resource type
308
309
# Common properties
310
location: str # Azure region
311
kind: DataConnectionKind # Connection type
312
313
class EventHubDataConnection(DataConnection):
314
"""Event Hub data connection for streaming ingestion."""
315
kind: DataConnectionKind = DataConnectionKind.EVENT_HUB
316
317
# Event Hub configuration
318
event_hub_resource_id: str # Event Hub resource ID
319
consumer_group: str # Consumer group name
320
table_name: str # Target table name
321
mapping_rule_name: str # Data mapping rule name
322
data_format: EventHubDataFormat # Data format
323
event_system_properties: List[str] # System properties to include
324
compression: Compression # Data compression type
325
provisioning_state: ProvisioningState # Provisioning state
326
managed_identity_resource_id: str # Managed identity for authentication
327
database_routing: DatabaseRouting # Database routing mode
328
329
class IotHubDataConnection(DataConnection):
330
"""IoT Hub data connection for device telemetry ingestion."""
331
kind: DataConnectionKind = DataConnectionKind.IOT_HUB
332
333
# IoT Hub configuration
334
iot_hub_resource_id: str # IoT Hub resource ID
335
consumer_group: str # Consumer group name
336
table_name: str # Target table name
337
mapping_rule_name: str # Data mapping rule name
338
data_format: IotHubDataFormat # Data format
339
event_system_properties: List[str] # System properties to include
340
shared_access_policy_name: str # Shared access policy name
341
provisioning_state: ProvisioningState # Provisioning state
342
database_routing: DatabaseRouting # Database routing mode
343
344
class EventGridDataConnection(DataConnection):
345
"""Event Grid data connection for blob storage events."""
346
kind: DataConnectionKind = DataConnectionKind.EVENT_GRID
347
348
# Event Grid configuration
349
storage_account_resource_id: str # Storage account resource ID
350
event_hub_resource_id: str # Event Hub resource ID for events
351
consumer_group: str # Consumer group name
352
table_name: str # Target table name
353
mapping_rule_name: str # Data mapping rule name
354
data_format: EventGridDataFormat # Data format
355
ignore_first_record: bool # Skip first record (headers)
356
blob_storage_event_type: BlobStorageEventType # Event type to process
357
managed_identity_resource_id: str # Managed identity for authentication
358
provisioning_state: ProvisioningState # Provisioning state
359
database_routing: DatabaseRouting # Database routing mode
360
361
class CosmosDbDataConnection(DataConnection):
362
"""Cosmos DB data connection for change feed ingestion."""
363
kind: DataConnectionKind = DataConnectionKind.COSMOS_DB
364
365
# Cosmos DB configuration
366
cosmos_db_account_resource_id: str # Cosmos DB account resource ID
367
cosmos_db_database: str # Cosmos DB database name
368
cosmos_db_container: str # Cosmos DB container name
369
table_name: str # Target table name
370
mapping_rule_name: str # Data mapping rule name
371
managed_identity_resource_id: str # Managed identity for authentication
372
provisioning_state: ProvisioningState # Provisioning state
373
retrieval_start_date: datetime # Start date for data retrieval
374
375
class DataConnectionValidation:
376
"""Request to validate a data connection configuration."""
377
data_connection_name: str # Name of the data connection
378
properties: DataConnection # Data connection configuration to validate
379
380
class DataConnectionValidationResult:
381
"""Result of data connection validation."""
382
error_message: str # Error message if validation failed
383
384
class DataConnectionValidationListResult:
385
"""List of data connection validation results."""
386
value: List[DataConnectionValidationResult] # Validation results
387
388
class DataConnectionCheckNameRequest:
389
"""Request to check data connection name availability."""
390
name: str # Name to check
391
type: str # Resource type
392
393
from enum import Enum
394
395
class DataConnectionKind(str, Enum):
396
"""Data connection type values."""
397
EVENT_HUB = "EventHub"
398
IOT_HUB = "IotHub"
399
EVENT_GRID = "EventGrid"
400
COSMOS_DB = "CosmosDb"
401
402
class EventHubDataFormat(str, Enum):
403
"""Event Hub data format values."""
404
MULTIJSON = "MULTIJSON"
405
JSON = "JSON"
406
CSV = "CSV"
407
TSV = "TSV"
408
SCSV = "SCSV"
409
SOHSV = "SOHSV"
410
PSV = "PSV"
411
TXT = "TXT"
412
RAW = "RAW"
413
SINGLEJSON = "SINGLEJSON"
414
AVRO = "AVRO"
415
TSVE = "TSVE"
416
PARQUET = "PARQUET"
417
ORC = "ORC"
418
APACHEAVRO = "APACHEAVRO"
419
W3CLOGFILE = "W3CLOGFILE"
420
421
class IotHubDataFormat(str, Enum):
422
"""IoT Hub data format values."""
423
MULTIJSON = "MULTIJSON"
424
JSON = "JSON"
425
CSV = "CSV"
426
TSV = "TSV"
427
SCSV = "SCSV"
428
SOHSV = "SOHSV"
429
PSV = "PSV"
430
TXT = "TXT"
431
RAW = "RAW"
432
SINGLEJSON = "SINGLEJSON"
433
AVRO = "AVRO"
434
TSVE = "TSVE"
435
PARQUET = "PARQUET"
436
ORC = "ORC"
437
APACHEAVRO = "APACHEAVRO"
438
W3CLOGFILE = "W3CLOGFILE"
439
440
class EventGridDataFormat(str, Enum):
441
"""Event Grid data format values."""
442
MULTIJSON = "MULTIJSON"
443
JSON = "JSON"
444
CSV = "CSV"
445
TSV = "TSV"
446
SCSV = "SCSV"
447
SOHSV = "SOHSV"
448
PSV = "PSV"
449
TXT = "TXT"
450
RAW = "RAW"
451
SINGLEJSON = "SINGLEJSON"
452
AVRO = "AVRO"
453
TSVE = "TSVE"
454
PARQUET = "PARQUET"
455
ORC = "ORC"
456
APACHEAVRO = "APACHEAVRO"
457
W3CLOGFILE = "W3CLOGFILE"
458
459
class Compression(str, Enum):
460
"""Data compression type values."""
461
NONE = "None"
462
GZIP = "GZip"
463
464
class BlobStorageEventType(str, Enum):
465
"""Blob storage event type values."""
466
MICROSOFT_STORAGE_BLOB_CREATED = "Microsoft.Storage.BlobCreated"
467
468
class DatabaseRouting(str, Enum):
469
"""Database routing mode values."""
470
SINGLE = "Single"
471
MULTI = "Multi"
472
```