Microsoft Azure AI Search Client Library for Python providing comprehensive search, indexing, and AI-powered document processing capabilities.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The azure-search-documents library provides full async support through aio submodules. All client classes have async variants that provide the same functionality with async/await support, enabling high-performance applications with non-blocking I/O operations.
Asynchronous version of SearchClient for document search and management operations.
# Import from aio module
from azure.search.documents.aio import SearchClient
class SearchClient:
"""Async client for search operations."""
def __init__(
self,
endpoint: str,
index_name: str,
credential: Union[AzureKeyCredential, TokenCredential],
**kwargs
) -> None:
"""Initialize async SearchClient."""
async def close(self) -> None:
"""Close the async session."""
async def __aenter__(self) -> "SearchClient": ...
async def __aexit__(self, *args) -> None: ...
# Search operations
async def search(
self,
search_text: Optional[str] = None,
**kwargs
) -> AsyncSearchItemPaged:
"""Execute async search query."""
async def suggest(
self,
search_text: str,
suggester_name: str,
**kwargs
) -> List[Dict]:
"""Get async search suggestions."""
async def autocomplete(
self,
search_text: str,
suggester_name: str,
**kwargs
) -> List[Dict]:
"""Get async autocomplete results."""
# Document operations
async def get_document(
self,
key: str,
selected_fields: Optional[List[str]] = None,
**kwargs
) -> Dict:
"""Retrieve document asynchronously."""
async def get_document_count(self, **kwargs) -> int:
"""Get document count asynchronously."""
async def upload_documents(
self,
documents: List[Dict],
**kwargs
) -> List[IndexingResult]:
"""Upload documents asynchronously."""
async def merge_documents(
self,
documents: List[Dict],
**kwargs
) -> List[IndexingResult]:
"""Merge documents asynchronously."""
async def merge_or_upload_documents(
self,
documents: List[Dict],
**kwargs
) -> List[IndexingResult]:
"""Merge or upload documents asynchronously."""
async def delete_documents(
self,
documents: List[Dict],
**kwargs
) -> List[IndexingResult]:
"""Delete documents asynchronously."""
async def index_documents(
self,
batch: IndexDocumentsBatch,
**kwargs
) -> List[IndexingResult]:
"""Execute document batch asynchronously."""Asynchronous iterator for paginated search results.
class AsyncSearchItemPaged:
"""Async iterator for search results."""
def __aiter__(self) -> AsyncIterator[Dict[str, Any]]:
"""Async iterator over individual results."""
async def __anext__(self) -> Dict[str, Any]:
"""Get next result asynchronously."""
def by_page(self) -> AsyncIterator[List[Dict[str, Any]]]:
"""Iterate by pages asynchronously."""
async def get_count(self) -> Optional[int]:
"""Get total count asynchronously."""
async def get_coverage(self) -> Optional[float]:
"""Get coverage percentage asynchronously."""
async def get_facets(self) -> Optional[Dict[str, List[Dict[str, Any]]]]:
"""Get facet results asynchronously."""High-throughput async document indexing with automatic batching.
from azure.search.documents.aio import SearchIndexingBufferedSender
class SearchIndexingBufferedSender:
"""Async buffered sender for high-throughput indexing."""
def __init__(
self,
endpoint: str,
index_name: str,
credential: Union[AzureKeyCredential, TokenCredential],
**kwargs
) -> None:
"""Initialize async buffered sender."""
async def upload_documents(self, documents: List[Dict], **kwargs) -> None:
"""Queue documents for async upload."""
async def delete_documents(self, documents: List[Dict], **kwargs) -> None:
"""Queue documents for async deletion."""
async def merge_documents(self, documents: List[Dict], **kwargs) -> None:
"""Queue documents for async merge."""
async def merge_or_upload_documents(self, documents: List[Dict], **kwargs) -> None:
"""Queue documents for async merge or upload."""
async def flush(self, timeout: Optional[int] = None, **kwargs) -> bool:
"""Flush all pending operations asynchronously."""
async def close(self, **kwargs) -> None:
"""Close sender and flush operations asynchronously."""
async def __aenter__(self) -> "SearchIndexingBufferedSender": ...
async def __aexit__(self, *args) -> None: ...Asynchronous version of SearchIndexClient for index management operations.
from azure.search.documents.indexes.aio import SearchIndexClient
class SearchIndexClient:
"""Async client for index management."""
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, TokenCredential],
**kwargs
) -> None:
"""Initialize async SearchIndexClient."""
async def close(self) -> None:
"""Close async session."""
async def __aenter__(self) -> "SearchIndexClient": ...
async def __aexit__(self, *args) -> None: ...
# Index operations
async def create_index(self, index: SearchIndex, **kwargs) -> SearchIndex:
"""Create index asynchronously."""
async def get_index(self, name: str, **kwargs) -> SearchIndex:
"""Get index asynchronously."""
async def list_indexes(
self,
*,
select: Optional[List[str]] = None,
**kwargs
) -> AsyncItemPaged[SearchIndex]:
"""List indexes asynchronously."""
async def list_index_names(self, **kwargs) -> AsyncItemPaged[str]:
"""List index names asynchronously."""
async def delete_index(
self,
index: Union[str, SearchIndex],
**kwargs
) -> None:
"""Delete index asynchronously."""
async def create_or_update_index(
self,
index: SearchIndex,
**kwargs
) -> SearchIndex:
"""Create or update index asynchronously."""
async def get_index_statistics(
self,
index_name: str,
**kwargs
) -> Dict[str, Any]:
"""Get index statistics asynchronously."""
async def analyze_text(
self,
index_name: str,
analyze_request: AnalyzeTextOptions,
**kwargs
) -> AnalyzeResult:
"""Analyze text asynchronously."""
# Synonym map operations
async def create_synonym_map(
self,
synonym_map: SynonymMap,
**kwargs
) -> SynonymMap:
"""Create synonym map asynchronously."""
async def get_synonym_map(self, name: str, **kwargs) -> SynonymMap:
"""Get synonym map asynchronously."""
async def get_synonym_maps(
self,
*,
select: Optional[List[str]] = None,
**kwargs
) -> List[SynonymMap]:
"""List synonym maps asynchronously."""
async def delete_synonym_map(
self,
synonym_map: Union[str, SynonymMap],
**kwargs
) -> None:
"""Delete synonym map asynchronously."""Asynchronous version of SearchIndexerClient for data ingestion and AI enrichment.
from azure.search.documents.indexes.aio import SearchIndexerClient
class SearchIndexerClient:
"""Async client for indexer management."""
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, TokenCredential],
**kwargs
) -> None:
"""Initialize async SearchIndexerClient."""
async def close(self) -> None:
"""Close async session."""
async def __aenter__(self) -> "SearchIndexerClient": ...
async def __aexit__(self, *args) -> None: ...
# Indexer operations
async def create_indexer(
self,
indexer: SearchIndexer,
**kwargs
) -> SearchIndexer:
"""Create indexer asynchronously."""
async def get_indexer(self, name: str, **kwargs) -> SearchIndexer:
"""Get indexer asynchronously."""
async def get_indexers(
self,
*,
select: Optional[List[str]] = None,
**kwargs
) -> Sequence[SearchIndexer]:
"""List indexers asynchronously."""
async def delete_indexer(
self,
indexer: Union[str, SearchIndexer],
**kwargs
) -> None:
"""Delete indexer asynchronously."""
async def run_indexer(self, name: str, **kwargs) -> None:
"""Run indexer asynchronously."""
async def reset_indexer(self, name: str, **kwargs) -> None:
"""Reset indexer asynchronously."""
async def get_indexer_status(
self,
name: str,
**kwargs
) -> SearchIndexerStatus:
"""Get indexer status asynchronously."""
# Data source operations
async def create_data_source_connection(
self,
data_source: SearchIndexerDataSourceConnection,
**kwargs
) -> SearchIndexerDataSourceConnection:
"""Create data source asynchronously."""
async def get_data_source_connection(
self,
name: str,
**kwargs
) -> SearchIndexerDataSourceConnection:
"""Get data source asynchronously."""
async def get_data_source_connections(
self,
**kwargs
) -> Sequence[SearchIndexerDataSourceConnection]:
"""List data sources asynchronously."""
async def delete_data_source_connection(
self,
data_source: Union[str, SearchIndexerDataSourceConnection],
**kwargs
) -> None:
"""Delete data source asynchronously."""
# Skillset operations
async def create_skillset(
self,
skillset: SearchIndexerSkillset,
**kwargs
) -> SearchIndexerSkillset:
"""Create skillset asynchronously."""
async def get_skillset(self, name: str, **kwargs) -> SearchIndexerSkillset:
"""Get skillset asynchronously."""
async def get_skillsets(
self,
*,
select: Optional[List[str]] = None,
**kwargs
) -> Sequence[SearchIndexerSkillset]:
"""List skillsets asynchronously."""
async def delete_skillset(
self,
skillset: Union[str, SearchIndexerSkillset],
**kwargs
) -> None:
"""Delete skillset asynchronously."""import asyncio
from azure.search.documents.aio import SearchClient
from azure.core.credentials import AzureKeyCredential
async def search_example():
"""Example of async search operations."""
# Initialize async client
async with SearchClient(
endpoint="https://service.search.windows.net",
index_name="hotels",
credential=AzureKeyCredential("admin-key")
) as client:
# Async search
results = await client.search("luxury hotel", top=5)
# Iterate over results asynchronously
async for result in results:
print(f"{result['name']}: {result['@search.score']}")
# Get document asynchronously
document = await client.get_document("hotel_1")
print(f"Retrieved: {document['name']}")
# Upload documents asynchronously
documents = [
{"id": "1", "name": "Hotel A", "rating": 4.5},
{"id": "2", "name": "Hotel B", "rating": 4.0}
]
results = await client.upload_documents(documents)
for result in results:
if result.succeeded():
print(f"Uploaded: {result.key}")
# Run async function
asyncio.run(search_example())import asyncio
from azure.search.documents.aio import SearchIndexingBufferedSender
async def batch_processing_example():
"""Example of async high-throughput indexing."""
async with SearchIndexingBufferedSender(
endpoint="https://service.search.windows.net",
index_name="documents",
credential=AzureKeyCredential("admin-key"),
auto_flush_interval=30 # Auto-flush every 30 seconds
) as sender:
# Queue large batches - they will be automatically batched and sent
large_document_batch = [
{"id": str(i), "content": f"Document {i}", "category": f"cat_{i % 10}"}
for i in range(10000)
]
# These operations are non-blocking and batched automatically
await sender.upload_documents(large_document_batch[:5000])
await sender.merge_documents(large_document_batch[5000:])
# Explicit flush to ensure all operations complete
success = await sender.flush(timeout=60)
print(f"All operations completed: {success}")
asyncio.run(batch_processing_example())import asyncio
from azure.search.documents.indexes.aio import SearchIndexClient
from azure.search.documents.indexes.models import SearchIndex, SearchField, SearchFieldDataType
async def index_management_example():
"""Example of async index management."""
async with SearchIndexClient(
endpoint="https://service.search.windows.net",
credential=AzureKeyCredential("admin-key")
) as client:
# Create index asynchronously
fields = [
SearchField("id", SearchFieldDataType.String, key=True),
SearchField("title", SearchFieldDataType.String, searchable=True),
SearchField("content", SearchFieldDataType.String, searchable=True)
]
index = SearchIndex(name="async-index", fields=fields)
created_index = await client.create_index(index)
print(f"Created index: {created_index.name}")
# List indexes asynchronously
indexes = client.list_indexes()
async for index in indexes:
print(f"Index: {index.name}")
# Get index statistics asynchronously
stats = await client.get_index_statistics("async-index")
print(f"Document count: {stats.get('documentCount', 0)}")
asyncio.run(index_management_example())import asyncio
from azure.search.documents.indexes.aio import SearchIndexerClient
from azure.search.documents.indexes.models import SearchIndexer
async def indexer_management_example():
"""Example of async indexer management."""
async with SearchIndexerClient(
endpoint="https://service.search.windows.net",
credential=AzureKeyCredential("admin-key")
) as client:
# Run indexer asynchronously
await client.run_indexer("my-indexer")
print("Indexer started")
# Monitor indexer status
while True:
status = await client.get_indexer_status("my-indexer")
print(f"Indexer status: {status.status}")
if status.last_result and status.last_result.status in ["success", "failure"]:
break
# Wait before checking again
await asyncio.sleep(30)
# Get final results
if status.last_result:
print(f"Final status: {status.last_result.status}")
print(f"Items processed: {status.last_result.item_count}")
asyncio.run(indexer_management_example())import asyncio
from azure.search.documents.aio import SearchClient
async def concurrent_searches():
"""Example of running multiple searches concurrently."""
client = SearchClient(
endpoint="https://service.search.windows.net",
index_name="hotels",
credential=AzureKeyCredential("admin-key")
)
try:
# Define multiple search queries
search_queries = [
"luxury hotel",
"budget accommodation",
"beach resort",
"mountain lodge",
"city center hotel"
]
# Create coroutines for concurrent execution
search_coroutines = [
client.search(query, top=3)
for query in search_queries
]
# Run searches concurrently
search_results = await asyncio.gather(*search_coroutines)
# Process results
for i, results in enumerate(search_results):
print(f"\nResults for '{search_queries[i]}':")
async for result in results:
print(f" - {result.get('name', 'N/A')}")
finally:
await client.close()
asyncio.run(concurrent_searches())import asyncio
from azure.search.documents.aio import SearchClient
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
async def async_error_handling():
"""Example of error handling in async operations."""
async with SearchClient(
endpoint="https://service.search.windows.net",
index_name="hotels",
credential=AzureKeyCredential("admin-key")
) as client:
try:
# This might fail if document doesn't exist
document = await client.get_document("nonexistent-key")
print(f"Found document: {document}")
except ResourceNotFoundError:
print("Document not found")
except HttpResponseError as e:
print(f"HTTP error: {e.status_code} - {e.message}")
try:
# Batch operation with error handling
documents = [
{"id": "1", "name": "Valid Document"},
{"invalid": "Missing required id field"} # This will fail
]
results = await client.upload_documents(documents)
for result in results:
if result.succeeded():
print(f"Success: {result.key}")
else:
print(f"Failed: {result.key} - {result.error_message}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(async_error_handling())All async clients support async context managers for automatic resource cleanup:
# Automatic resource cleanup
async with SearchClient(endpoint, index_name, credential) as client:
results = await client.search("query")
# Client is automatically closed when exiting contextSearch results support async iteration:
# Async iteration over results
results = await client.search("query")
async for result in results:
process_result(result)
# Async pagination
async for page in results.by_page():
for result in page:
process_result(result)Use asyncio.gather() for concurrent operations:
# Run multiple operations concurrently
tasks = [
client.search("query1"),
client.search("query2"),
client.get_document("doc1")
]
results = await asyncio.gather(*tasks)For processing large result sets efficiently:
async def process_all_documents(client):
"""Process all documents in an index."""
results = await client.search("*", top=1000)
async for result in results:
# Process each document
await process_document(result)
yield resultInstall with Tessl CLI
npx tessl i tessl/pypi-azure-search-documents