CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-search-documents

Microsoft Azure AI Search Client Library for Python providing comprehensive search, indexing, and AI-powered document processing capabilities.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-clients.mddocs/

Async Client Operations

The azure-search-documents library provides full async support through aio submodules. All client classes have async variants that provide the same functionality with async/await support, enabling high-performance applications with non-blocking I/O operations.

Capabilities

Async Search Client

Asynchronous version of SearchClient for document search and management operations.

# Import from aio module
from azure.search.documents.aio import SearchClient

class SearchClient:
    """Async client for search operations."""
    
    def __init__(
        self, 
        endpoint: str, 
        index_name: str, 
        credential: Union[AzureKeyCredential, TokenCredential], 
        **kwargs
    ) -> None:
        """Initialize async SearchClient."""
    
    async def close(self) -> None:
        """Close the async session."""
    
    async def __aenter__(self) -> "SearchClient": ...
    async def __aexit__(self, *args) -> None: ...
    
    # Search operations
    async def search(
        self,
        search_text: Optional[str] = None,
        **kwargs
    ) -> AsyncSearchItemPaged:
        """Execute async search query."""
    
    async def suggest(
        self,
        search_text: str,
        suggester_name: str,
        **kwargs
    ) -> List[Dict]:
        """Get async search suggestions."""
    
    async def autocomplete(
        self,
        search_text: str,
        suggester_name: str,
        **kwargs
    ) -> List[Dict]:
        """Get async autocomplete results."""
    
    # Document operations
    async def get_document(
        self, 
        key: str, 
        selected_fields: Optional[List[str]] = None, 
        **kwargs
    ) -> Dict:
        """Retrieve document asynchronously."""
    
    async def get_document_count(self, **kwargs) -> int:
        """Get document count asynchronously."""
    
    async def upload_documents(
        self, 
        documents: List[Dict], 
        **kwargs
    ) -> List[IndexingResult]:
        """Upload documents asynchronously."""
    
    async def merge_documents(
        self, 
        documents: List[Dict], 
        **kwargs
    ) -> List[IndexingResult]:
        """Merge documents asynchronously."""
    
    async def merge_or_upload_documents(
        self, 
        documents: List[Dict], 
        **kwargs
    ) -> List[IndexingResult]:
        """Merge or upload documents asynchronously."""
    
    async def delete_documents(
        self, 
        documents: List[Dict], 
        **kwargs
    ) -> List[IndexingResult]:
        """Delete documents asynchronously."""
    
    async def index_documents(
        self, 
        batch: IndexDocumentsBatch, 
        **kwargs
    ) -> List[IndexingResult]:
        """Execute document batch asynchronously."""

Async Search Results

Asynchronous iterator for paginated search results.

class AsyncSearchItemPaged:
    """Async iterator for search results."""
    
    def __aiter__(self) -> AsyncIterator[Dict[str, Any]]:
        """Async iterator over individual results."""
    
    async def __anext__(self) -> Dict[str, Any]:
        """Get next result asynchronously."""
    
    def by_page(self) -> AsyncIterator[List[Dict[str, Any]]]:
        """Iterate by pages asynchronously."""
    
    async def get_count(self) -> Optional[int]:
        """Get total count asynchronously."""
    
    async def get_coverage(self) -> Optional[float]:
        """Get coverage percentage asynchronously."""
    
    async def get_facets(self) -> Optional[Dict[str, List[Dict[str, Any]]]]:
        """Get facet results asynchronously."""

Async Buffered Sender

High-throughput async document indexing with automatic batching.

from azure.search.documents.aio import SearchIndexingBufferedSender

class SearchIndexingBufferedSender:
    """Async buffered sender for high-throughput indexing."""
    
    def __init__(
        self,
        endpoint: str,
        index_name: str, 
        credential: Union[AzureKeyCredential, TokenCredential],
        **kwargs
    ) -> None:
        """Initialize async buffered sender."""
    
    async def upload_documents(self, documents: List[Dict], **kwargs) -> None:
        """Queue documents for async upload."""
    
    async def delete_documents(self, documents: List[Dict], **kwargs) -> None:
        """Queue documents for async deletion."""
    
    async def merge_documents(self, documents: List[Dict], **kwargs) -> None:
        """Queue documents for async merge."""
    
    async def merge_or_upload_documents(self, documents: List[Dict], **kwargs) -> None:
        """Queue documents for async merge or upload."""
    
    async def flush(self, timeout: Optional[int] = None, **kwargs) -> bool:
        """Flush all pending operations asynchronously."""
    
    async def close(self, **kwargs) -> None:
        """Close sender and flush operations asynchronously."""
    
    async def __aenter__(self) -> "SearchIndexingBufferedSender": ...
    async def __aexit__(self, *args) -> None: ...

Async Index Management

Asynchronous version of SearchIndexClient for index management operations.

from azure.search.documents.indexes.aio import SearchIndexClient

class SearchIndexClient:
    """Async client for index management."""
    
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, TokenCredential], 
        **kwargs
    ) -> None:
        """Initialize async SearchIndexClient."""
    
    async def close(self) -> None:
        """Close async session."""
    
    async def __aenter__(self) -> "SearchIndexClient": ...
    async def __aexit__(self, *args) -> None: ...
    
    # Index operations
    async def create_index(self, index: SearchIndex, **kwargs) -> SearchIndex:
        """Create index asynchronously."""
    
    async def get_index(self, name: str, **kwargs) -> SearchIndex:
        """Get index asynchronously."""
    
    async def list_indexes(
        self, 
        *, 
        select: Optional[List[str]] = None, 
        **kwargs
    ) -> AsyncItemPaged[SearchIndex]:
        """List indexes asynchronously."""
    
    async def list_index_names(self, **kwargs) -> AsyncItemPaged[str]:
        """List index names asynchronously."""
    
    async def delete_index(
        self,
        index: Union[str, SearchIndex],
        **kwargs
    ) -> None:
        """Delete index asynchronously."""
    
    async def create_or_update_index(
        self,
        index: SearchIndex,
        **kwargs
    ) -> SearchIndex:
        """Create or update index asynchronously."""
    
    async def get_index_statistics(
        self, 
        index_name: str, 
        **kwargs
    ) -> Dict[str, Any]:
        """Get index statistics asynchronously."""
    
    async def analyze_text(
        self,
        index_name: str,
        analyze_request: AnalyzeTextOptions,
        **kwargs
    ) -> AnalyzeResult:
        """Analyze text asynchronously."""
        
    # Synonym map operations
    async def create_synonym_map(
        self, 
        synonym_map: SynonymMap, 
        **kwargs
    ) -> SynonymMap:
        """Create synonym map asynchronously."""
    
    async def get_synonym_map(self, name: str, **kwargs) -> SynonymMap:
        """Get synonym map asynchronously."""
    
    async def get_synonym_maps(
        self, 
        *, 
        select: Optional[List[str]] = None, 
        **kwargs
    ) -> List[SynonymMap]:
        """List synonym maps asynchronously."""
    
    async def delete_synonym_map(
        self,
        synonym_map: Union[str, SynonymMap],
        **kwargs
    ) -> None:
        """Delete synonym map asynchronously."""

Async Indexer Management

Asynchronous version of SearchIndexerClient for data ingestion and AI enrichment.

from azure.search.documents.indexes.aio import SearchIndexerClient

class SearchIndexerClient:
    """Async client for indexer management."""
    
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, TokenCredential], 
        **kwargs
    ) -> None:
        """Initialize async SearchIndexerClient."""
    
    async def close(self) -> None:
        """Close async session."""
    
    async def __aenter__(self) -> "SearchIndexerClient": ...
    async def __aexit__(self, *args) -> None: ...
    
    # Indexer operations
    async def create_indexer(
        self, 
        indexer: SearchIndexer, 
        **kwargs
    ) -> SearchIndexer:
        """Create indexer asynchronously."""
    
    async def get_indexer(self, name: str, **kwargs) -> SearchIndexer:
        """Get indexer asynchronously."""
    
    async def get_indexers(
        self, 
        *, 
        select: Optional[List[str]] = None, 
        **kwargs
    ) -> Sequence[SearchIndexer]:
        """List indexers asynchronously."""
    
    async def delete_indexer(
        self,
        indexer: Union[str, SearchIndexer],
        **kwargs
    ) -> None:
        """Delete indexer asynchronously."""
    
    async def run_indexer(self, name: str, **kwargs) -> None:
        """Run indexer asynchronously."""
    
    async def reset_indexer(self, name: str, **kwargs) -> None:
        """Reset indexer asynchronously."""
    
    async def get_indexer_status(
        self, 
        name: str, 
        **kwargs
    ) -> SearchIndexerStatus:
        """Get indexer status asynchronously."""
    
    # Data source operations
    async def create_data_source_connection(
        self,
        data_source: SearchIndexerDataSourceConnection,
        **kwargs
    ) -> SearchIndexerDataSourceConnection:
        """Create data source asynchronously."""
    
    async def get_data_source_connection(
        self, 
        name: str, 
        **kwargs
    ) -> SearchIndexerDataSourceConnection:
        """Get data source asynchronously."""
    
    async def get_data_source_connections(
        self,
        **kwargs
    ) -> Sequence[SearchIndexerDataSourceConnection]:
        """List data sources asynchronously."""
    
    async def delete_data_source_connection(
        self,
        data_source: Union[str, SearchIndexerDataSourceConnection],
        **kwargs
    ) -> None:
        """Delete data source asynchronously."""
    
    # Skillset operations
    async def create_skillset(
        self, 
        skillset: SearchIndexerSkillset, 
        **kwargs
    ) -> SearchIndexerSkillset:
        """Create skillset asynchronously."""
    
    async def get_skillset(self, name: str, **kwargs) -> SearchIndexerSkillset:
        """Get skillset asynchronously."""
    
    async def get_skillsets(
        self, 
        *, 
        select: Optional[List[str]] = None, 
        **kwargs
    ) -> Sequence[SearchIndexerSkillset]:
        """List skillsets asynchronously."""
    
    async def delete_skillset(
        self,
        skillset: Union[str, SearchIndexerSkillset],
        **kwargs
    ) -> None:
        """Delete skillset asynchronously."""

Usage Examples

Async Search Operations

import asyncio
from azure.search.documents.aio import SearchClient
from azure.core.credentials import AzureKeyCredential

async def search_example():
    """Example of async search operations."""
    
    # Initialize async client
    async with SearchClient(
        endpoint="https://service.search.windows.net",
        index_name="hotels",
        credential=AzureKeyCredential("admin-key")
    ) as client:
        
        # Async search
        results = await client.search("luxury hotel", top=5)
        
        # Iterate over results asynchronously
        async for result in results:
            print(f"{result['name']}: {result['@search.score']}")
        
        # Get document asynchronously
        document = await client.get_document("hotel_1")
        print(f"Retrieved: {document['name']}")
        
        # Upload documents asynchronously
        documents = [
            {"id": "1", "name": "Hotel A", "rating": 4.5},
            {"id": "2", "name": "Hotel B", "rating": 4.0}
        ]
        results = await client.upload_documents(documents)
        
        for result in results:
            if result.succeeded():
                print(f"Uploaded: {result.key}")

# Run async function
asyncio.run(search_example())

Async Batch Processing

import asyncio
from azure.search.documents.aio import SearchIndexingBufferedSender

async def batch_processing_example():
    """Example of async high-throughput indexing."""
    
    async with SearchIndexingBufferedSender(
        endpoint="https://service.search.windows.net",
        index_name="documents",
        credential=AzureKeyCredential("admin-key"),
        auto_flush_interval=30  # Auto-flush every 30 seconds
    ) as sender:
        
        # Queue large batches - they will be automatically batched and sent
        large_document_batch = [
            {"id": str(i), "content": f"Document {i}", "category": f"cat_{i % 10}"}
            for i in range(10000)
        ]
        
        # These operations are non-blocking and batched automatically
        await sender.upload_documents(large_document_batch[:5000])
        await sender.merge_documents(large_document_batch[5000:])
        
        # Explicit flush to ensure all operations complete
        success = await sender.flush(timeout=60)
        print(f"All operations completed: {success}")

asyncio.run(batch_processing_example())

Async Index Management

import asyncio
from azure.search.documents.indexes.aio import SearchIndexClient
from azure.search.documents.indexes.models import SearchIndex, SearchField, SearchFieldDataType

async def index_management_example():
    """Example of async index management."""
    
    async with SearchIndexClient(
        endpoint="https://service.search.windows.net",
        credential=AzureKeyCredential("admin-key")
    ) as client:
        
        # Create index asynchronously
        fields = [
            SearchField("id", SearchFieldDataType.String, key=True),
            SearchField("title", SearchFieldDataType.String, searchable=True),
            SearchField("content", SearchFieldDataType.String, searchable=True)
        ]
        
        index = SearchIndex(name="async-index", fields=fields)
        created_index = await client.create_index(index)
        print(f"Created index: {created_index.name}")
        
        # List indexes asynchronously
        indexes = client.list_indexes()
        async for index in indexes:
            print(f"Index: {index.name}")
        
        # Get index statistics asynchronously
        stats = await client.get_index_statistics("async-index")
        print(f"Document count: {stats.get('documentCount', 0)}")

asyncio.run(index_management_example())

Async Indexer Operations

import asyncio
from azure.search.documents.indexes.aio import SearchIndexerClient
from azure.search.documents.indexes.models import SearchIndexer

async def indexer_management_example():
    """Example of async indexer management."""
    
    async with SearchIndexerClient(
        endpoint="https://service.search.windows.net",
        credential=AzureKeyCredential("admin-key")
    ) as client:
        
        # Run indexer asynchronously
        await client.run_indexer("my-indexer")
        print("Indexer started")
        
        # Monitor indexer status
        while True:
            status = await client.get_indexer_status("my-indexer")
            print(f"Indexer status: {status.status}")
            
            if status.last_result and status.last_result.status in ["success", "failure"]:
                break
            
            # Wait before checking again
            await asyncio.sleep(30)
        
        # Get final results
        if status.last_result:
            print(f"Final status: {status.last_result.status}")
            print(f"Items processed: {status.last_result.item_count}")

asyncio.run(indexer_management_example())

Concurrent Operations

import asyncio
from azure.search.documents.aio import SearchClient

async def concurrent_searches():
    """Example of running multiple searches concurrently."""
    
    client = SearchClient(
        endpoint="https://service.search.windows.net",
        index_name="hotels",
        credential=AzureKeyCredential("admin-key")
    )
    
    try:
        # Define multiple search queries
        search_queries = [
            "luxury hotel",
            "budget accommodation", 
            "beach resort",
            "mountain lodge",
            "city center hotel"
        ]
        
        # Create coroutines for concurrent execution
        search_coroutines = [
            client.search(query, top=3) 
            for query in search_queries
        ]
        
        # Run searches concurrently
        search_results = await asyncio.gather(*search_coroutines)
        
        # Process results
        for i, results in enumerate(search_results):
            print(f"\nResults for '{search_queries[i]}':")
            async for result in results:
                print(f"  - {result.get('name', 'N/A')}")
    
    finally:
        await client.close()

asyncio.run(concurrent_searches())

Error Handling in Async Operations

import asyncio
from azure.search.documents.aio import SearchClient
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError

async def async_error_handling():
    """Example of error handling in async operations."""
    
    async with SearchClient(
        endpoint="https://service.search.windows.net",
        index_name="hotels",
        credential=AzureKeyCredential("admin-key")
    ) as client:
        
        try:
            # This might fail if document doesn't exist
            document = await client.get_document("nonexistent-key")
            print(f"Found document: {document}")
            
        except ResourceNotFoundError:
            print("Document not found")
        
        except HttpResponseError as e:
            print(f"HTTP error: {e.status_code} - {e.message}")
        
        try:
            # Batch operation with error handling
            documents = [
                {"id": "1", "name": "Valid Document"},
                {"invalid": "Missing required id field"}  # This will fail
            ]
            
            results = await client.upload_documents(documents)
            
            for result in results:
                if result.succeeded():
                    print(f"Success: {result.key}")
                else:
                    print(f"Failed: {result.key} - {result.error_message}")
                    
        except Exception as e:
            print(f"Unexpected error: {e}")

asyncio.run(async_error_handling())

Common Async Patterns

Context Managers

All async clients support async context managers for automatic resource cleanup:

# Automatic resource cleanup
async with SearchClient(endpoint, index_name, credential) as client:
    results = await client.search("query")
    # Client is automatically closed when exiting context

Async Iterators

Search results support async iteration:

# Async iteration over results
results = await client.search("query")
async for result in results:
    process_result(result)

# Async pagination
async for page in results.by_page():
    for result in page:
        process_result(result)

Concurrent Operations

Use asyncio.gather() for concurrent operations:

# Run multiple operations concurrently
tasks = [
    client.search("query1"),
    client.search("query2"), 
    client.get_document("doc1")
]

results = await asyncio.gather(*tasks)

Async Generators

For processing large result sets efficiently:

async def process_all_documents(client):
    """Process all documents in an index."""
    results = await client.search("*", top=1000)
    
    async for result in results:
        # Process each document
        await process_document(result)
        yield result

Install with Tessl CLI

npx tessl i tessl/pypi-azure-search-documents

docs

async-clients.md

index-management.md

index.md

indexer-management.md

models.md

search-client.md

tile.json