CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Pending
Overview
Eval results
Files

search-operations.mddocs/

Search Operations

PyMilvus provides comprehensive search capabilities including vector similarity search, hybrid multi-vector search, scalar filtering, and advanced result handling. This covers single-vector ANN search, multi-vector hybrid search with reranking, and sophisticated result processing.

Basic Vector Search

Single Vector Search

from pymilvus import MilvusClient

client = MilvusClient()

def search(
    collection_name: str,
    data: Union[List[List[float]], List[Dict]],
    anns_field: str = "vector",
    search_params: Optional[Dict] = None,
    limit: int = 10,
    expr: Optional[str] = None,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    round_decimal: int = -1,
    timeout: Optional[float] = None,
    consistency_level: Optional[str] = None,
    **kwargs
) -> List[List[Dict[str, Any]]]

Parameters:

  • data: Query vectors as list of float lists or dict with vector field
  • anns_field: Name of vector field to search against
  • search_params: Algorithm-specific parameters (e.g., {"nprobe": 16})
  • limit: Maximum results per query vector
  • expr: Boolean filter expression
  • output_fields: Fields to include in results
  • partition_names: Target partitions for search
  • round_decimal: Precision for distance values (-1 for no rounding)
  • consistency_level: "Strong", "Eventually", "Bounded", "Session"

Examples:

# Basic similarity search
query_vector = [0.1, 0.2, 0.3] * 256
results = client.search(
    collection_name="documents",
    data=[query_vector],
    limit=5,
    output_fields=["id", "title", "content"]
)

# Search with filtering
results = client.search(
    collection_name="products",
    data=[embedding],
    expr="category == 'electronics' and price < 1000",
    limit=10,
    output_fields=["id", "name", "price", "description"]
)

# Multi-query search
query_vectors = [[0.1] * 768, [0.2] * 768, [0.3] * 768]
results = client.search(
    collection_name="embeddings",
    data=query_vectors,
    search_params={"nprobe": 32, "ef": 64},
    limit=20
)

# Search specific partitions
results = client.search(
    collection_name="time_series", 
    data=[query_vector],
    partition_names=["2024_q1", "2024_q2"],
    expr="status == 'active'",
    limit=15
)

Search Parameters by Index Type

# FLAT index (exact search)
flat_params = {}  # No additional parameters needed

# IVF_FLAT index  
ivf_flat_params = {
    "nprobe": 16  # Number of clusters to search (1 to nlist)
}

# IVF_PQ index
ivf_pq_params = {
    "nprobe": 32,  # Number of clusters to search
}

# HNSW index
hnsw_params = {
    "ef": 64  # Search scope (ef >= limit, higher = more accurate but slower)
}

# Example usage
results = client.search(
    "hnsw_collection",
    data=[query_vector],
    search_params=hnsw_params,
    limit=10
)

Hybrid Multi-Vector Search

AnnSearchRequest

from pymilvus import AnnSearchRequest

def __init__(
    self,
    data: List,
    anns_field: str,
    param: Dict,
    limit: int,
    expr: Optional[str] = None,
    partition_names: Optional[List[str]] = None,
    ignore_growing: bool = False
)

Parameters:

  • data: Query vectors for this field
  • anns_field: Vector field name to search
  • param: Search parameters including metric_type and algorithm params
  • limit: Maximum results for this search request
  • expr: Filter expression for this search
  • partition_names: Target partitions
  • ignore_growing: Skip growing segments for consistency

Hybrid Search with Reranking

def hybrid_search(
    collection_name: str,
    reqs: List[AnnSearchRequest],
    ranker: Union[RRFRanker, WeightedRanker],
    limit: int = 10,
    partition_names: Optional[List[str]] = None,
    output_fields: Optional[List[str]] = None,
    timeout: Optional[float] = None,
    round_decimal: int = -1,
    **kwargs
) -> List[List[Dict[str, Any]]]

Parameters:

  • reqs: List of AnnSearchRequest objects for different vector fields
  • ranker: Ranking algorithm to combine results
  • limit: Final number of results after reranking

RRF (Reciprocal Rank Fusion) Ranker

from pymilvus import RRFRanker

def __init__(self, k: int = 60)

Parameters:

  • k: RRF parameter controlling rank fusion (default: 60)

RRF Formula: score = Σ(1 / (k + rank_i)) for each search result

WeightedRanker

from pymilvus import WeightedRanker

def __init__(self, *nums, norm_score: bool = True)

Parameters:

  • *nums: Weight values for each search request (must match number of requests)
  • norm_score: Whether to normalize scores before weighting

Hybrid Search Examples

from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker

# Dense + Sparse hybrid search
dense_req = AnnSearchRequest(
    data=dense_vectors,  # [[0.1, 0.2, ...]]
    anns_field="dense_embedding",
    param={
        "metric_type": "L2",
        "params": {"nprobe": 16}
    },
    limit=100,
    expr="status == 'published'"
)

sparse_req = AnnSearchRequest(
    data=sparse_vectors,  # Sparse vectors from BM25/TF-IDF
    anns_field="sparse_embedding", 
    param={
        "metric_type": "IP",  # Inner Product for sparse
        "params": {"drop_ratio_build": 0.2}
    },
    limit=100,
    expr="status == 'published'"
)

# RRF hybrid search - good for combining different vector types
rrf_results = client.hybrid_search(
    collection_name="hybrid_documents",
    reqs=[dense_req, sparse_req],
    ranker=RRFRanker(k=60),
    limit=10,
    output_fields=["id", "title", "content", "score"]
)

# Weighted hybrid search - control contribution of each vector type
weighted_results = client.hybrid_search(
    collection_name="hybrid_documents", 
    reqs=[dense_req, sparse_req],
    ranker=WeightedRanker(0.7, 0.3, norm_score=True),  # 70% dense, 30% sparse
    limit=10,
    output_fields=["id", "title", "content"]
)

# Multi-modal search (text + image + audio)
text_req = AnnSearchRequest(
    data=text_embeddings,
    anns_field="text_vector",
    param={"metric_type": "COSINE", "params": {"nprobe": 20}},
    limit=50
)

image_req = AnnSearchRequest(
    data=image_embeddings,
    anns_field="image_vector", 
    param={"metric_type": "L2", "params": {"ef": 100}},
    limit=50
)

audio_req = AnnSearchRequest(
    data=audio_embeddings,
    anns_field="audio_vector",
    param={"metric_type": "IP", "params": {"nprobe": 10}},
    limit=50
)

multimodal_results = client.hybrid_search(
    collection_name="multimodal_content",
    reqs=[text_req, image_req, audio_req],
    ranker=WeightedRanker(0.5, 0.3, 0.2),  # Text dominant
    limit=15,
    output_fields=["id", "title", "type", "metadata"]
)

Advanced Hybrid Search Patterns

# Query-time vector generation with different strategies per field
def multi_strategy_search(query_text: str, query_image_path: str):
    # Generate embeddings for different modalities
    text_dense = text_encoder.encode(query_text)
    text_sparse = bm25_encoder.encode(query_text) 
    image_vector = image_encoder.encode(query_image_path)
    
    # Different search strategies
    requests = [
        # Semantic text search
        AnnSearchRequest(
            data=[text_dense],
            anns_field="text_dense_vector",
            param={"metric_type": "COSINE", "params": {"ef": 200}},
            limit=200,
            expr="content_type in ['article', 'blog']"
        ),
        
        # Lexical text search
        AnnSearchRequest(
            data=[text_sparse],
            anns_field="text_sparse_vector", 
            param={"metric_type": "IP"},
            limit=200,
            expr="content_type in ['article', 'blog']"
        ),
        
        # Visual similarity
        AnnSearchRequest(
            data=[image_vector],
            anns_field="image_vector",
            param={"metric_type": "L2", "params": {"nprobe": 50}},
            limit=100,
            expr="content_type in ['image', 'video']"
        )
    ]
    
    # Combine with RRF for balanced results
    return client.hybrid_search(
        "multimedia_collection",
        reqs=requests,
        ranker=RRFRanker(k=100),
        limit=20,
        output_fields=["id", "title", "content_type", "url", "metadata"]
    )

Search Result Handling

SearchResult Structure

# SearchResult contains results for all query vectors
from pymilvus.client.search_result import SearchResult, Hits, Hit

class SearchResult:
    hits: List[Hits]           # One Hits object per query vector
    distances: List[List[float]]  # Nested distances [query][result]
    ids: List[List]            # Nested primary keys [query][result]
    
    def __len__(self) -> int   # Number of queries
    def __getitem__(self, index: int) -> Hits  # Access query results

Hits Object

class Hits:
    ids: List                  # Primary key values for this query
    distances: List[float]     # Distance/similarity scores
    
    def __len__(self) -> int   # Number of results
    def __getitem__(self, index: int) -> Hit  # Access individual result
    def __iter__(self) -> Iterator[Hit]  # Iterate over results

Hit Object

class Hit:
    id: Any                    # Primary key value
    distance: float           # Distance/similarity score
    score: float              # Alias for distance
    entity: Dict[str, Any]    # Returned field values
    
    def get(self, field: str, default=None) -> Any  # Get field with default
    def to_dict(self) -> Dict[str, Any]  # Convert to dictionary

Result Processing Examples

# Process search results
results = client.search(
    "documents",
    data=[query_vector],
    limit=5,
    output_fields=["id", "title", "content", "score"]
)

# Access first query results (single query)
first_query_hits = results[0]
print(f"Found {len(first_query_hits)} results")

# Process individual hits
for hit in first_query_hits:
    print(f"Document ID: {hit.id}")
    print(f"Similarity Score: {hit.score:.4f}")
    print(f"Title: {hit.entity.get('title', 'No title')}")
    print(f"Content: {hit.entity.get('content', '')[:100]}...")
    print("---")

# Multi-query result processing
multi_results = client.search(
    "products", 
    data=[vector1, vector2, vector3],
    limit=10,
    output_fields=["id", "name", "category", "price"]
)

for query_idx, hits in enumerate(multi_results):
    print(f"Query {query_idx + 1} results:")
    for rank, hit in enumerate(hits):
        product_name = hit.entity.get('name', 'Unknown')
        price = hit.entity.get('price', 0)
        print(f"  {rank + 1}. {product_name} - ${price:.2f} (score: {hit.score:.3f})")

Advanced Result Analysis

def analyze_search_results(results: SearchResult) -> Dict[str, Any]:
    """Analyze search result quality and distribution"""
    analysis = {
        "total_queries": len(results),
        "query_stats": []
    }
    
    for query_idx, hits in enumerate(results):
        if len(hits) == 0:
            continue
            
        scores = [hit.score for hit in hits]
        query_analysis = {
            "query_index": query_idx,
            "result_count": len(hits),
            "score_stats": {
                "min": min(scores),
                "max": max(scores),
                "avg": sum(scores) / len(scores),
                "range": max(scores) - min(scores)
            },
            "categories": {}
        }
        
        # Analyze result categories
        for hit in hits:
            category = hit.entity.get('category', 'unknown')
            query_analysis["categories"][category] = query_analysis["categories"].get(category, 0) + 1
        
        analysis["query_stats"].append(query_analysis)
    
    return analysis

# Use analysis
search_results = client.search("products", [query_vector], limit=20, output_fields=["category"])
stats = analyze_search_results(search_results)
print(f"Search returned results across {len(stats['query_stats'][0]['categories'])} categories")

Paginated Search with Iterators

search_iterator

def search_iterator(
    collection_name: str,
    data: Union[List[List[float]], List[Dict]],
    anns_field: str = "vector",
    batch_size: int = 1000,
    limit: Optional[int] = None,
    search_params: Optional[Dict] = None,
    expr: Optional[str] = None,
    output_fields: Optional[List[str]] = None,
    **kwargs
) -> SearchIterator

Parameters:

  • batch_size: Results per iteration batch
  • limit: Total maximum results across all batches
# Large-scale similarity search with pagination
iterator = client.search_iterator(
    collection_name="large_embeddings",
    data=[query_vector],
    anns_field="embedding",
    batch_size=1000,
    limit=10000,  # Process up to 10K results
    output_fields=["id", "metadata", "score"],
    expr="status == 'active'"
)

# Process results in batches
total_processed = 0
for batch in iterator:
    # batch is a list of Hit objects
    print(f"Processing batch of {len(batch)} results")
    
    # Process each result in batch
    for hit in batch:
        # Custom processing logic
        if hit.score > 0.8:  # High similarity threshold
            process_high_similarity(hit)
        
        total_processed += 1
    
    # Optional: stop early based on conditions
    if total_processed >= 5000:
        break

print(f"Total processed: {total_processed} results")

Query Operations (Scalar Search)

Basic Query

def query(
    collection_name: str,
    filter: str,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    limit: int = 16384,
    offset: int = 0,
    timeout: Optional[float] = None,
    consistency_level: Optional[str] = None,
    **kwargs
) -> List[Dict[str, Any]]

Query Iterator

def query_iterator(
    collection_name: str,
    filter: str,
    output_fields: Optional[List[str]] = None,
    batch_size: int = 1000,
    limit: Optional[int] = None,
    **kwargs
) -> QueryIterator

Query Expression Syntax

# Comparison operators
"age > 25"
"price <= 100.0"
"category == 'electronics'"
"status != 'inactive'"

# Logical operators
"age > 18 and age < 65"
"category == 'books' or category == 'ebooks'"
"not (status == 'deleted')"

# List operations
"category in ['electronics', 'computers', 'mobile']"
"tag_id not in [1, 2, 3]"

# JSON field queries
"metadata['color'] == 'red'"
"metadata['specs']['weight'] < 1.5"
"json_contains(metadata['tags'], 'premium')"

# Array field queries  
"array_contains(tags, 'new')"
"array_contains_all(categories, ['tech', 'gadget'])"
"array_contains_any(features, ['bluetooth', 'wifi'])"
"array_length(tags) > 2"

# String operations
"title like 'Python%'"  # Starts with 'Python'
"description like '%machine learning%'"  # Contains 'machine learning'

# Complex expressions
"(category == 'books' and price < 50) or (category == 'ebooks' and price < 20)"
"json_contains(metadata['tags'], 'bestseller') and rating >= 4.5"
"array_contains(features, 'wireless') and price between 50 and 200"

Query Examples

# Basic filtering
products = client.query(
    "products",
    filter="category == 'electronics' and price < 500",
    output_fields=["id", "name", "price", "rating"],
    limit=50
)

# Complex JSON queries
documents = client.query(
    "documents", 
    filter="metadata['author'] == 'Smith' and metadata['year'] >= 2020",
    output_fields=["id", "title", "metadata"],
    offset=100,
    limit=25
)

# Array field filtering
articles = client.query(
    "articles",
    filter="array_contains_all(tags, ['AI', 'machine-learning']) and status == 'published'",
    output_fields=["id", "title", "tags", "publish_date"]
)

# Paginated query processing
iterator = client.query_iterator(
    "large_dataset",
    filter="created_at > 1640995200",  # After 2022-01-01
    output_fields=["id", "data", "timestamp"],
    batch_size=2000
)

for batch in iterator:
    # Process each batch
    process_data_batch(batch)

Performance Optimization

Search Parameter Tuning

# HNSW index optimization
def optimize_hnsw_search(collection_name: str, query_vectors: List[List[float]], target_recall: float = 0.95):
    """Optimize HNSW search parameters for target recall"""
    
    # Start with conservative parameters
    base_ef = max(50, len(query_vectors[0]))  # ef >= dimension recommended
    
    # Test different ef values
    ef_values = [base_ef, base_ef * 2, base_ef * 4]
    
    best_params = None
    best_latency = float('inf')
    
    for ef in ef_values:
        start_time = time.time()
        results = client.search(
            collection_name,
            data=query_vectors,
            search_params={"ef": ef},
            limit=10
        )
        latency = time.time() - start_time
        
        # In practice, measure recall against ground truth
        recall = measure_recall(results, ground_truth)  # Custom function
        
        if recall >= target_recall and latency < best_latency:
            best_params = {"ef": ef}
            best_latency = latency
    
    return best_params

# IVF index optimization  
def optimize_ivf_search(collection_name: str, nlist: int):
    """Find optimal nprobe for IVF index"""
    
    # Rule of thumb: nprobe = sqrt(nlist) to nlist/8
    nprobe_candidates = [
        max(1, int(nlist ** 0.5)),  # sqrt(nlist)
        max(1, nlist // 32),
        max(1, nlist // 16), 
        max(1, nlist // 8)
    ]
    
    best_nprobe = nprobe_candidates[0]
    
    for nprobe in nprobe_candidates:
        # Test search performance
        start_time = time.time()
        results = client.search(
            collection_name,
            data=[test_vector],
            search_params={"nprobe": nprobe},
            limit=100
        )
        latency = time.time() - start_time
        
        print(f"nprobe={nprobe}, latency={latency:.4f}s")
        
        # Choose based on latency/accuracy tradeoff
        if latency < target_latency:
            best_nprobe = nprobe
            break
    
    return {"nprobe": best_nprobe}

Batch Search Optimization

def batch_search_optimize(collection_name: str, query_vectors: List[List[float]], batch_size: int = 100):
    """Optimize batch search for large query sets"""
    
    all_results = []
    
    # Process queries in batches to optimize memory usage
    for i in range(0, len(query_vectors), batch_size):
        batch_vectors = query_vectors[i:i + batch_size]
        
        batch_results = client.search(
            collection_name,
            data=batch_vectors,
            search_params={"nprobe": 16},  # Adjust based on index
            limit=10,
            round_decimal=4  # Reduce precision for network efficiency
        )
        
        all_results.extend(batch_results)
        
        # Optional: progress tracking
        print(f"Processed {min(i + batch_size, len(query_vectors))}/{len(query_vectors)} queries")
    
    return all_results

PyMilvus search operations provide powerful capabilities for similarity search, hybrid retrieval, and scalar filtering, enabling sophisticated search applications with fine-tuned performance optimization.

Install with Tessl CLI

npx tessl i tessl/pypi-pymilvus

docs

data-management.md

index-management.md

index.md

milvus-client.md

orm-collection.md

search-operations.md

types-enums.md

user-management.md

utility-functions.md

tile.json