0
# Async Client Operations
1
2
The azure-search-documents library provides full async support through `aio` submodules. All client classes have async variants that provide the same functionality with async/await support, enabling high-performance applications with non-blocking I/O operations.
3
4
## Capabilities
5
6
### Async Search Client
7
8
Asynchronous version of SearchClient for document search and management operations.
9
10
```python { .api }
11
# Import from aio module
12
from azure.search.documents.aio import SearchClient
13
14
class SearchClient:
15
"""Async client for search operations."""
16
17
def __init__(
18
self,
19
endpoint: str,
20
index_name: str,
21
credential: Union[AzureKeyCredential, TokenCredential],
22
**kwargs
23
) -> None:
24
"""Initialize async SearchClient."""
25
26
async def close(self) -> None:
27
"""Close the async session."""
28
29
async def __aenter__(self) -> "SearchClient": ...
30
async def __aexit__(self, *args) -> None: ...
31
32
# Search operations
33
async def search(
34
self,
35
search_text: Optional[str] = None,
36
**kwargs
37
) -> AsyncSearchItemPaged:
38
"""Execute async search query."""
39
40
async def suggest(
41
self,
42
search_text: str,
43
suggester_name: str,
44
**kwargs
45
) -> List[Dict]:
46
"""Get async search suggestions."""
47
48
async def autocomplete(
49
self,
50
search_text: str,
51
suggester_name: str,
52
**kwargs
53
) -> List[Dict]:
54
"""Get async autocomplete results."""
55
56
# Document operations
57
async def get_document(
58
self,
59
key: str,
60
selected_fields: Optional[List[str]] = None,
61
**kwargs
62
) -> Dict:
63
"""Retrieve document asynchronously."""
64
65
async def get_document_count(self, **kwargs) -> int:
66
"""Get document count asynchronously."""
67
68
async def upload_documents(
69
self,
70
documents: List[Dict],
71
**kwargs
72
) -> List[IndexingResult]:
73
"""Upload documents asynchronously."""
74
75
async def merge_documents(
76
self,
77
documents: List[Dict],
78
**kwargs
79
) -> List[IndexingResult]:
80
"""Merge documents asynchronously."""
81
82
async def merge_or_upload_documents(
83
self,
84
documents: List[Dict],
85
**kwargs
86
) -> List[IndexingResult]:
87
"""Merge or upload documents asynchronously."""
88
89
async def delete_documents(
90
self,
91
documents: List[Dict],
92
**kwargs
93
) -> List[IndexingResult]:
94
"""Delete documents asynchronously."""
95
96
async def index_documents(
97
self,
98
batch: IndexDocumentsBatch,
99
**kwargs
100
) -> List[IndexingResult]:
101
"""Execute document batch asynchronously."""
102
```
103
104
### Async Search Results
105
106
Asynchronous iterator for paginated search results.
107
108
```python { .api }
109
class AsyncSearchItemPaged:
110
"""Async iterator for search results."""
111
112
def __aiter__(self) -> AsyncIterator[Dict[str, Any]]:
113
"""Async iterator over individual results."""
114
115
async def __anext__(self) -> Dict[str, Any]:
116
"""Get next result asynchronously."""
117
118
def by_page(self) -> AsyncIterator[List[Dict[str, Any]]]:
119
"""Iterate by pages asynchronously."""
120
121
async def get_count(self) -> Optional[int]:
122
"""Get total count asynchronously."""
123
124
async def get_coverage(self) -> Optional[float]:
125
"""Get coverage percentage asynchronously."""
126
127
async def get_facets(self) -> Optional[Dict[str, List[Dict[str, Any]]]]:
128
"""Get facet results asynchronously."""
129
```
130
131
### Async Buffered Sender
132
133
High-throughput async document indexing with automatic batching.
134
135
```python { .api }
136
from azure.search.documents.aio import SearchIndexingBufferedSender
137
138
class SearchIndexingBufferedSender:
139
"""Async buffered sender for high-throughput indexing."""
140
141
def __init__(
142
self,
143
endpoint: str,
144
index_name: str,
145
credential: Union[AzureKeyCredential, TokenCredential],
146
**kwargs
147
) -> None:
148
"""Initialize async buffered sender."""
149
150
async def upload_documents(self, documents: List[Dict], **kwargs) -> None:
151
"""Queue documents for async upload."""
152
153
async def delete_documents(self, documents: List[Dict], **kwargs) -> None:
154
"""Queue documents for async deletion."""
155
156
async def merge_documents(self, documents: List[Dict], **kwargs) -> None:
157
"""Queue documents for async merge."""
158
159
async def merge_or_upload_documents(self, documents: List[Dict], **kwargs) -> None:
160
"""Queue documents for async merge or upload."""
161
162
async def flush(self, timeout: Optional[int] = None, **kwargs) -> bool:
163
"""Flush all pending operations asynchronously."""
164
165
async def close(self, **kwargs) -> None:
166
"""Close sender and flush operations asynchronously."""
167
168
async def __aenter__(self) -> "SearchIndexingBufferedSender": ...
169
async def __aexit__(self, *args) -> None: ...
170
```
171
172
### Async Index Management
173
174
Asynchronous version of SearchIndexClient for index management operations.
175
176
```python { .api }
177
from azure.search.documents.indexes.aio import SearchIndexClient
178
179
class SearchIndexClient:
180
"""Async client for index management."""
181
182
def __init__(
183
self,
184
endpoint: str,
185
credential: Union[AzureKeyCredential, TokenCredential],
186
**kwargs
187
) -> None:
188
"""Initialize async SearchIndexClient."""
189
190
async def close(self) -> None:
191
"""Close async session."""
192
193
async def __aenter__(self) -> "SearchIndexClient": ...
194
async def __aexit__(self, *args) -> None: ...
195
196
# Index operations
197
async def create_index(self, index: SearchIndex, **kwargs) -> SearchIndex:
198
"""Create index asynchronously."""
199
200
async def get_index(self, name: str, **kwargs) -> SearchIndex:
201
"""Get index asynchronously."""
202
203
async def list_indexes(
204
self,
205
*,
206
select: Optional[List[str]] = None,
207
**kwargs
208
) -> AsyncItemPaged[SearchIndex]:
209
"""List indexes asynchronously."""
210
211
async def list_index_names(self, **kwargs) -> AsyncItemPaged[str]:
212
"""List index names asynchronously."""
213
214
async def delete_index(
215
self,
216
index: Union[str, SearchIndex],
217
**kwargs
218
) -> None:
219
"""Delete index asynchronously."""
220
221
async def create_or_update_index(
222
self,
223
index: SearchIndex,
224
**kwargs
225
) -> SearchIndex:
226
"""Create or update index asynchronously."""
227
228
async def get_index_statistics(
229
self,
230
index_name: str,
231
**kwargs
232
) -> Dict[str, Any]:
233
"""Get index statistics asynchronously."""
234
235
async def analyze_text(
236
self,
237
index_name: str,
238
analyze_request: AnalyzeTextOptions,
239
**kwargs
240
) -> AnalyzeResult:
241
"""Analyze text asynchronously."""
242
243
# Synonym map operations
244
async def create_synonym_map(
245
self,
246
synonym_map: SynonymMap,
247
**kwargs
248
) -> SynonymMap:
249
"""Create synonym map asynchronously."""
250
251
async def get_synonym_map(self, name: str, **kwargs) -> SynonymMap:
252
"""Get synonym map asynchronously."""
253
254
async def get_synonym_maps(
255
self,
256
*,
257
select: Optional[List[str]] = None,
258
**kwargs
259
) -> List[SynonymMap]:
260
"""List synonym maps asynchronously."""
261
262
async def delete_synonym_map(
263
self,
264
synonym_map: Union[str, SynonymMap],
265
**kwargs
266
) -> None:
267
"""Delete synonym map asynchronously."""
268
```
269
270
### Async Indexer Management
271
272
Asynchronous version of SearchIndexerClient for data ingestion and AI enrichment.
273
274
```python { .api }
275
from azure.search.documents.indexes.aio import SearchIndexerClient
276
277
class SearchIndexerClient:
278
"""Async client for indexer management."""
279
280
def __init__(
281
self,
282
endpoint: str,
283
credential: Union[AzureKeyCredential, TokenCredential],
284
**kwargs
285
) -> None:
286
"""Initialize async SearchIndexerClient."""
287
288
async def close(self) -> None:
289
"""Close async session."""
290
291
async def __aenter__(self) -> "SearchIndexerClient": ...
292
async def __aexit__(self, *args) -> None: ...
293
294
# Indexer operations
295
async def create_indexer(
296
self,
297
indexer: SearchIndexer,
298
**kwargs
299
) -> SearchIndexer:
300
"""Create indexer asynchronously."""
301
302
async def get_indexer(self, name: str, **kwargs) -> SearchIndexer:
303
"""Get indexer asynchronously."""
304
305
async def get_indexers(
306
self,
307
*,
308
select: Optional[List[str]] = None,
309
**kwargs
310
) -> Sequence[SearchIndexer]:
311
"""List indexers asynchronously."""
312
313
async def delete_indexer(
314
self,
315
indexer: Union[str, SearchIndexer],
316
**kwargs
317
) -> None:
318
"""Delete indexer asynchronously."""
319
320
async def run_indexer(self, name: str, **kwargs) -> None:
321
"""Run indexer asynchronously."""
322
323
async def reset_indexer(self, name: str, **kwargs) -> None:
324
"""Reset indexer asynchronously."""
325
326
async def get_indexer_status(
327
self,
328
name: str,
329
**kwargs
330
) -> SearchIndexerStatus:
331
"""Get indexer status asynchronously."""
332
333
# Data source operations
334
async def create_data_source_connection(
335
self,
336
data_source: SearchIndexerDataSourceConnection,
337
**kwargs
338
) -> SearchIndexerDataSourceConnection:
339
"""Create data source asynchronously."""
340
341
async def get_data_source_connection(
342
self,
343
name: str,
344
**kwargs
345
) -> SearchIndexerDataSourceConnection:
346
"""Get data source asynchronously."""
347
348
async def get_data_source_connections(
349
self,
350
**kwargs
351
) -> Sequence[SearchIndexerDataSourceConnection]:
352
"""List data sources asynchronously."""
353
354
async def delete_data_source_connection(
355
self,
356
data_source: Union[str, SearchIndexerDataSourceConnection],
357
**kwargs
358
) -> None:
359
"""Delete data source asynchronously."""
360
361
# Skillset operations
362
async def create_skillset(
363
self,
364
skillset: SearchIndexerSkillset,
365
**kwargs
366
) -> SearchIndexerSkillset:
367
"""Create skillset asynchronously."""
368
369
async def get_skillset(self, name: str, **kwargs) -> SearchIndexerSkillset:
370
"""Get skillset asynchronously."""
371
372
async def get_skillsets(
373
self,
374
*,
375
select: Optional[List[str]] = None,
376
**kwargs
377
) -> Sequence[SearchIndexerSkillset]:
378
"""List skillsets asynchronously."""
379
380
async def delete_skillset(
381
self,
382
skillset: Union[str, SearchIndexerSkillset],
383
**kwargs
384
) -> None:
385
"""Delete skillset asynchronously."""
386
```
387
388
## Usage Examples
389
390
### Async Search Operations
391
392
```python
393
import asyncio
394
from azure.search.documents.aio import SearchClient
395
from azure.core.credentials import AzureKeyCredential
396
397
async def search_example():
398
"""Example of async search operations."""
399
400
# Initialize async client
401
async with SearchClient(
402
endpoint="https://service.search.windows.net",
403
index_name="hotels",
404
credential=AzureKeyCredential("admin-key")
405
) as client:
406
407
# Async search
408
results = await client.search("luxury hotel", top=5)
409
410
# Iterate over results asynchronously
411
async for result in results:
412
print(f"{result['name']}: {result['@search.score']}")
413
414
# Get document asynchronously
415
document = await client.get_document("hotel_1")
416
print(f"Retrieved: {document['name']}")
417
418
# Upload documents asynchronously
419
documents = [
420
{"id": "1", "name": "Hotel A", "rating": 4.5},
421
{"id": "2", "name": "Hotel B", "rating": 4.0}
422
]
423
results = await client.upload_documents(documents)
424
425
for result in results:
426
if result.succeeded():
427
print(f"Uploaded: {result.key}")
428
429
# Run async function
430
asyncio.run(search_example())
431
```
432
433
### Async Batch Processing
434
435
```python
436
import asyncio
437
from azure.search.documents.aio import SearchIndexingBufferedSender
438
439
async def batch_processing_example():
440
"""Example of async high-throughput indexing."""
441
442
async with SearchIndexingBufferedSender(
443
endpoint="https://service.search.windows.net",
444
index_name="documents",
445
credential=AzureKeyCredential("admin-key"),
446
auto_flush_interval=30 # Auto-flush every 30 seconds
447
) as sender:
448
449
# Queue large batches - they will be automatically batched and sent
450
large_document_batch = [
451
{"id": str(i), "content": f"Document {i}", "category": f"cat_{i % 10}"}
452
for i in range(10000)
453
]
454
455
# These operations are non-blocking and batched automatically
456
await sender.upload_documents(large_document_batch[:5000])
457
await sender.merge_documents(large_document_batch[5000:])
458
459
# Explicit flush to ensure all operations complete
460
success = await sender.flush(timeout=60)
461
print(f"All operations completed: {success}")
462
463
asyncio.run(batch_processing_example())
464
```
465
466
### Async Index Management
467
468
```python
469
import asyncio
470
from azure.search.documents.indexes.aio import SearchIndexClient
471
from azure.search.documents.indexes.models import SearchIndex, SearchField, SearchFieldDataType
472
473
async def index_management_example():
474
"""Example of async index management."""
475
476
async with SearchIndexClient(
477
endpoint="https://service.search.windows.net",
478
credential=AzureKeyCredential("admin-key")
479
) as client:
480
481
# Create index asynchronously
482
fields = [
483
SearchField("id", SearchFieldDataType.String, key=True),
484
SearchField("title", SearchFieldDataType.String, searchable=True),
485
SearchField("content", SearchFieldDataType.String, searchable=True)
486
]
487
488
index = SearchIndex(name="async-index", fields=fields)
489
created_index = await client.create_index(index)
490
print(f"Created index: {created_index.name}")
491
492
# List indexes asynchronously
493
indexes = client.list_indexes()
494
async for index in indexes:
495
print(f"Index: {index.name}")
496
497
# Get index statistics asynchronously
498
stats = await client.get_index_statistics("async-index")
499
print(f"Document count: {stats.get('documentCount', 0)}")
500
501
asyncio.run(index_management_example())
502
```
503
504
### Async Indexer Operations
505
506
```python
507
import asyncio
508
from azure.search.documents.indexes.aio import SearchIndexerClient
509
from azure.search.documents.indexes.models import SearchIndexer
510
511
async def indexer_management_example():
512
"""Example of async indexer management."""
513
514
async with SearchIndexerClient(
515
endpoint="https://service.search.windows.net",
516
credential=AzureKeyCredential("admin-key")
517
) as client:
518
519
# Run indexer asynchronously
520
await client.run_indexer("my-indexer")
521
print("Indexer started")
522
523
# Monitor indexer status
524
while True:
525
status = await client.get_indexer_status("my-indexer")
526
print(f"Indexer status: {status.status}")
527
528
if status.last_result and status.last_result.status in ["success", "failure"]:
529
break
530
531
# Wait before checking again
532
await asyncio.sleep(30)
533
534
# Get final results
535
if status.last_result:
536
print(f"Final status: {status.last_result.status}")
537
print(f"Items processed: {status.last_result.item_count}")
538
539
asyncio.run(indexer_management_example())
540
```
541
542
### Concurrent Operations
543
544
```python
545
import asyncio
546
from azure.search.documents.aio import SearchClient
547
548
async def concurrent_searches():
549
"""Example of running multiple searches concurrently."""
550
551
client = SearchClient(
552
endpoint="https://service.search.windows.net",
553
index_name="hotels",
554
credential=AzureKeyCredential("admin-key")
555
)
556
557
try:
558
# Define multiple search queries
559
search_queries = [
560
"luxury hotel",
561
"budget accommodation",
562
"beach resort",
563
"mountain lodge",
564
"city center hotel"
565
]
566
567
# Create coroutines for concurrent execution
568
search_coroutines = [
569
client.search(query, top=3)
570
for query in search_queries
571
]
572
573
# Run searches concurrently
574
search_results = await asyncio.gather(*search_coroutines)
575
576
# Process results
577
for i, results in enumerate(search_results):
578
print(f"\nResults for '{search_queries[i]}':")
579
async for result in results:
580
print(f" - {result.get('name', 'N/A')}")
581
582
finally:
583
await client.close()
584
585
asyncio.run(concurrent_searches())
586
```
587
588
### Error Handling in Async Operations
589
590
```python
591
import asyncio
592
from azure.search.documents.aio import SearchClient
593
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
594
595
async def async_error_handling():
596
"""Example of error handling in async operations."""
597
598
async with SearchClient(
599
endpoint="https://service.search.windows.net",
600
index_name="hotels",
601
credential=AzureKeyCredential("admin-key")
602
) as client:
603
604
try:
605
# This might fail if document doesn't exist
606
document = await client.get_document("nonexistent-key")
607
print(f"Found document: {document}")
608
609
except ResourceNotFoundError:
610
print("Document not found")
611
612
except HttpResponseError as e:
613
print(f"HTTP error: {e.status_code} - {e.message}")
614
615
try:
616
# Batch operation with error handling
617
documents = [
618
{"id": "1", "name": "Valid Document"},
619
{"invalid": "Missing required id field"} # This will fail
620
]
621
622
results = await client.upload_documents(documents)
623
624
for result in results:
625
if result.succeeded():
626
print(f"Success: {result.key}")
627
else:
628
print(f"Failed: {result.key} - {result.error_message}")
629
630
except Exception as e:
631
print(f"Unexpected error: {e}")
632
633
asyncio.run(async_error_handling())
634
```
635
636
## Common Async Patterns
637
638
### Context Managers
639
640
All async clients support async context managers for automatic resource cleanup:
641
642
```python
643
# Automatic resource cleanup
644
async with SearchClient(endpoint, index_name, credential) as client:
645
results = await client.search("query")
646
# Client is automatically closed when exiting context
647
```
648
649
### Async Iterators
650
651
Search results support async iteration:
652
653
```python
654
# Async iteration over results
655
results = await client.search("query")
656
async for result in results:
657
process_result(result)
658
659
# Async pagination
660
async for page in results.by_page():
661
for result in page:
662
process_result(result)
663
```
664
665
### Concurrent Operations
666
667
Use `asyncio.gather()` for concurrent operations:
668
669
```python
670
# Run multiple operations concurrently
671
tasks = [
672
client.search("query1"),
673
client.search("query2"),
674
client.get_document("doc1")
675
]
676
677
results = await asyncio.gather(*tasks)
678
```
679
680
### Async Generators
681
682
For processing large result sets efficiently:
683
684
```python
685
async def process_all_documents(client):
686
"""Process all documents in an index."""
687
results = await client.search("*", top=1000)
688
689
async for result in results:
690
# Process each document
691
await process_document(result)
692
yield result
693
```