Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
PyMilvus provides comprehensive search capabilities including vector similarity search, hybrid multi-vector search, scalar filtering, and advanced result handling. This covers single-vector ANN search, multi-vector hybrid search with reranking, and sophisticated result processing.
from pymilvus import MilvusClient
client = MilvusClient()
def search(
collection_name: str,
data: Union[List[List[float]], List[Dict]],
anns_field: str = "vector",
search_params: Optional[Dict] = None,
limit: int = 10,
expr: Optional[str] = None,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
round_decimal: int = -1,
timeout: Optional[float] = None,
consistency_level: Optional[str] = None,
**kwargs
) -> List[List[Dict[str, Any]]]Parameters:
data: Query vectors as list of float lists or dict with vector fieldanns_field: Name of vector field to search againstsearch_params: Algorithm-specific parameters (e.g., {"nprobe": 16})limit: Maximum results per query vectorexpr: Boolean filter expressionoutput_fields: Fields to include in resultspartition_names: Target partitions for searchround_decimal: Precision for distance values (-1 for no rounding)consistency_level: "Strong", "Eventually", "Bounded", "Session"Examples:
# Basic similarity search
query_vector = [0.1, 0.2, 0.3] * 256
results = client.search(
collection_name="documents",
data=[query_vector],
limit=5,
output_fields=["id", "title", "content"]
)
# Search with filtering
results = client.search(
collection_name="products",
data=[embedding],
expr="category == 'electronics' and price < 1000",
limit=10,
output_fields=["id", "name", "price", "description"]
)
# Multi-query search
query_vectors = [[0.1] * 768, [0.2] * 768, [0.3] * 768]
results = client.search(
collection_name="embeddings",
data=query_vectors,
search_params={"nprobe": 32, "ef": 64},
limit=20
)
# Search specific partitions
results = client.search(
collection_name="time_series",
data=[query_vector],
partition_names=["2024_q1", "2024_q2"],
expr="status == 'active'",
limit=15
)# FLAT index (exact search)
flat_params = {} # No additional parameters needed
# IVF_FLAT index
ivf_flat_params = {
"nprobe": 16 # Number of clusters to search (1 to nlist)
}
# IVF_PQ index
ivf_pq_params = {
"nprobe": 32, # Number of clusters to search
}
# HNSW index
hnsw_params = {
"ef": 64 # Search scope (ef >= limit, higher = more accurate but slower)
}
# Example usage
results = client.search(
"hnsw_collection",
data=[query_vector],
search_params=hnsw_params,
limit=10
)from pymilvus import AnnSearchRequest
def __init__(
self,
data: List,
anns_field: str,
param: Dict,
limit: int,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
ignore_growing: bool = False
)Parameters:
data: Query vectors for this fieldanns_field: Vector field name to searchparam: Search parameters including metric_type and algorithm paramslimit: Maximum results for this search requestexpr: Filter expression for this searchpartition_names: Target partitionsignore_growing: Skip growing segments for consistencydef hybrid_search(
collection_name: str,
reqs: List[AnnSearchRequest],
ranker: Union[RRFRanker, WeightedRanker],
limit: int = 10,
partition_names: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
timeout: Optional[float] = None,
round_decimal: int = -1,
**kwargs
) -> List[List[Dict[str, Any]]]Parameters:
reqs: List of AnnSearchRequest objects for different vector fieldsranker: Ranking algorithm to combine resultslimit: Final number of results after rerankingfrom pymilvus import RRFRanker
def __init__(self, k: int = 60)Parameters:
k: RRF parameter controlling rank fusion (default: 60)RRF Formula: score = Σ(1 / (k + rank_i)) for each search result
from pymilvus import WeightedRanker
def __init__(self, *nums, norm_score: bool = True)Parameters:
*nums: Weight values for each search request (must match number of requests)norm_score: Whether to normalize scores before weightingfrom pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker
# Dense + Sparse hybrid search
dense_req = AnnSearchRequest(
data=dense_vectors, # [[0.1, 0.2, ...]]
anns_field="dense_embedding",
param={
"metric_type": "L2",
"params": {"nprobe": 16}
},
limit=100,
expr="status == 'published'"
)
sparse_req = AnnSearchRequest(
data=sparse_vectors, # Sparse vectors from BM25/TF-IDF
anns_field="sparse_embedding",
param={
"metric_type": "IP", # Inner Product for sparse
"params": {"drop_ratio_build": 0.2}
},
limit=100,
expr="status == 'published'"
)
# RRF hybrid search - good for combining different vector types
rrf_results = client.hybrid_search(
collection_name="hybrid_documents",
reqs=[dense_req, sparse_req],
ranker=RRFRanker(k=60),
limit=10,
output_fields=["id", "title", "content", "score"]
)
# Weighted hybrid search - control contribution of each vector type
weighted_results = client.hybrid_search(
collection_name="hybrid_documents",
reqs=[dense_req, sparse_req],
ranker=WeightedRanker(0.7, 0.3, norm_score=True), # 70% dense, 30% sparse
limit=10,
output_fields=["id", "title", "content"]
)
# Multi-modal search (text + image + audio)
text_req = AnnSearchRequest(
data=text_embeddings,
anns_field="text_vector",
param={"metric_type": "COSINE", "params": {"nprobe": 20}},
limit=50
)
image_req = AnnSearchRequest(
data=image_embeddings,
anns_field="image_vector",
param={"metric_type": "L2", "params": {"ef": 100}},
limit=50
)
audio_req = AnnSearchRequest(
data=audio_embeddings,
anns_field="audio_vector",
param={"metric_type": "IP", "params": {"nprobe": 10}},
limit=50
)
multimodal_results = client.hybrid_search(
collection_name="multimodal_content",
reqs=[text_req, image_req, audio_req],
ranker=WeightedRanker(0.5, 0.3, 0.2), # Text dominant
limit=15,
output_fields=["id", "title", "type", "metadata"]
)# Query-time vector generation with different strategies per field
def multi_strategy_search(query_text: str, query_image_path: str):
# Generate embeddings for different modalities
text_dense = text_encoder.encode(query_text)
text_sparse = bm25_encoder.encode(query_text)
image_vector = image_encoder.encode(query_image_path)
# Different search strategies
requests = [
# Semantic text search
AnnSearchRequest(
data=[text_dense],
anns_field="text_dense_vector",
param={"metric_type": "COSINE", "params": {"ef": 200}},
limit=200,
expr="content_type in ['article', 'blog']"
),
# Lexical text search
AnnSearchRequest(
data=[text_sparse],
anns_field="text_sparse_vector",
param={"metric_type": "IP"},
limit=200,
expr="content_type in ['article', 'blog']"
),
# Visual similarity
AnnSearchRequest(
data=[image_vector],
anns_field="image_vector",
param={"metric_type": "L2", "params": {"nprobe": 50}},
limit=100,
expr="content_type in ['image', 'video']"
)
]
# Combine with RRF for balanced results
return client.hybrid_search(
"multimedia_collection",
reqs=requests,
ranker=RRFRanker(k=100),
limit=20,
output_fields=["id", "title", "content_type", "url", "metadata"]
)# SearchResult contains results for all query vectors
from pymilvus.client.search_result import SearchResult, Hits, Hit
class SearchResult:
hits: List[Hits] # One Hits object per query vector
distances: List[List[float]] # Nested distances [query][result]
ids: List[List] # Nested primary keys [query][result]
def __len__(self) -> int # Number of queries
def __getitem__(self, index: int) -> Hits # Access query resultsclass Hits:
ids: List # Primary key values for this query
distances: List[float] # Distance/similarity scores
def __len__(self) -> int # Number of results
def __getitem__(self, index: int) -> Hit # Access individual result
def __iter__(self) -> Iterator[Hit] # Iterate over resultsclass Hit:
id: Any # Primary key value
distance: float # Distance/similarity score
score: float # Alias for distance
entity: Dict[str, Any] # Returned field values
def get(self, field: str, default=None) -> Any # Get field with default
def to_dict(self) -> Dict[str, Any] # Convert to dictionary# Process search results
results = client.search(
"documents",
data=[query_vector],
limit=5,
output_fields=["id", "title", "content", "score"]
)
# Access first query results (single query)
first_query_hits = results[0]
print(f"Found {len(first_query_hits)} results")
# Process individual hits
for hit in first_query_hits:
print(f"Document ID: {hit.id}")
print(f"Similarity Score: {hit.score:.4f}")
print(f"Title: {hit.entity.get('title', 'No title')}")
print(f"Content: {hit.entity.get('content', '')[:100]}...")
print("---")
# Multi-query result processing
multi_results = client.search(
"products",
data=[vector1, vector2, vector3],
limit=10,
output_fields=["id", "name", "category", "price"]
)
for query_idx, hits in enumerate(multi_results):
print(f"Query {query_idx + 1} results:")
for rank, hit in enumerate(hits):
product_name = hit.entity.get('name', 'Unknown')
price = hit.entity.get('price', 0)
print(f" {rank + 1}. {product_name} - ${price:.2f} (score: {hit.score:.3f})")def analyze_search_results(results: SearchResult) -> Dict[str, Any]:
"""Analyze search result quality and distribution"""
analysis = {
"total_queries": len(results),
"query_stats": []
}
for query_idx, hits in enumerate(results):
if len(hits) == 0:
continue
scores = [hit.score for hit in hits]
query_analysis = {
"query_index": query_idx,
"result_count": len(hits),
"score_stats": {
"min": min(scores),
"max": max(scores),
"avg": sum(scores) / len(scores),
"range": max(scores) - min(scores)
},
"categories": {}
}
# Analyze result categories
for hit in hits:
category = hit.entity.get('category', 'unknown')
query_analysis["categories"][category] = query_analysis["categories"].get(category, 0) + 1
analysis["query_stats"].append(query_analysis)
return analysis
# Use analysis
search_results = client.search("products", [query_vector], limit=20, output_fields=["category"])
stats = analyze_search_results(search_results)
print(f"Search returned results across {len(stats['query_stats'][0]['categories'])} categories")def search_iterator(
collection_name: str,
data: Union[List[List[float]], List[Dict]],
anns_field: str = "vector",
batch_size: int = 1000,
limit: Optional[int] = None,
search_params: Optional[Dict] = None,
expr: Optional[str] = None,
output_fields: Optional[List[str]] = None,
**kwargs
) -> SearchIteratorParameters:
batch_size: Results per iteration batchlimit: Total maximum results across all batches# Large-scale similarity search with pagination
iterator = client.search_iterator(
collection_name="large_embeddings",
data=[query_vector],
anns_field="embedding",
batch_size=1000,
limit=10000, # Process up to 10K results
output_fields=["id", "metadata", "score"],
expr="status == 'active'"
)
# Process results in batches
total_processed = 0
for batch in iterator:
# batch is a list of Hit objects
print(f"Processing batch of {len(batch)} results")
# Process each result in batch
for hit in batch:
# Custom processing logic
if hit.score > 0.8: # High similarity threshold
process_high_similarity(hit)
total_processed += 1
# Optional: stop early based on conditions
if total_processed >= 5000:
break
print(f"Total processed: {total_processed} results")def query(
collection_name: str,
filter: str,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
limit: int = 16384,
offset: int = 0,
timeout: Optional[float] = None,
consistency_level: Optional[str] = None,
**kwargs
) -> List[Dict[str, Any]]def query_iterator(
collection_name: str,
filter: str,
output_fields: Optional[List[str]] = None,
batch_size: int = 1000,
limit: Optional[int] = None,
**kwargs
) -> QueryIterator# Comparison operators
"age > 25"
"price <= 100.0"
"category == 'electronics'"
"status != 'inactive'"
# Logical operators
"age > 18 and age < 65"
"category == 'books' or category == 'ebooks'"
"not (status == 'deleted')"
# List operations
"category in ['electronics', 'computers', 'mobile']"
"tag_id not in [1, 2, 3]"
# JSON field queries
"metadata['color'] == 'red'"
"metadata['specs']['weight'] < 1.5"
"json_contains(metadata['tags'], 'premium')"
# Array field queries
"array_contains(tags, 'new')"
"array_contains_all(categories, ['tech', 'gadget'])"
"array_contains_any(features, ['bluetooth', 'wifi'])"
"array_length(tags) > 2"
# String operations
"title like 'Python%'" # Starts with 'Python'
"description like '%machine learning%'" # Contains 'machine learning'
# Complex expressions
"(category == 'books' and price < 50) or (category == 'ebooks' and price < 20)"
"json_contains(metadata['tags'], 'bestseller') and rating >= 4.5"
"array_contains(features, 'wireless') and price between 50 and 200"# Basic filtering
products = client.query(
"products",
filter="category == 'electronics' and price < 500",
output_fields=["id", "name", "price", "rating"],
limit=50
)
# Complex JSON queries
documents = client.query(
"documents",
filter="metadata['author'] == 'Smith' and metadata['year'] >= 2020",
output_fields=["id", "title", "metadata"],
offset=100,
limit=25
)
# Array field filtering
articles = client.query(
"articles",
filter="array_contains_all(tags, ['AI', 'machine-learning']) and status == 'published'",
output_fields=["id", "title", "tags", "publish_date"]
)
# Paginated query processing
iterator = client.query_iterator(
"large_dataset",
filter="created_at > 1640995200", # After 2022-01-01
output_fields=["id", "data", "timestamp"],
batch_size=2000
)
for batch in iterator:
# Process each batch
process_data_batch(batch)# HNSW index optimization
def optimize_hnsw_search(collection_name: str, query_vectors: List[List[float]], target_recall: float = 0.95):
"""Optimize HNSW search parameters for target recall"""
# Start with conservative parameters
base_ef = max(50, len(query_vectors[0])) # ef >= dimension recommended
# Test different ef values
ef_values = [base_ef, base_ef * 2, base_ef * 4]
best_params = None
best_latency = float('inf')
for ef in ef_values:
start_time = time.time()
results = client.search(
collection_name,
data=query_vectors,
search_params={"ef": ef},
limit=10
)
latency = time.time() - start_time
# In practice, measure recall against ground truth
recall = measure_recall(results, ground_truth) # Custom function
if recall >= target_recall and latency < best_latency:
best_params = {"ef": ef}
best_latency = latency
return best_params
# IVF index optimization
def optimize_ivf_search(collection_name: str, nlist: int):
"""Find optimal nprobe for IVF index"""
# Rule of thumb: nprobe = sqrt(nlist) to nlist/8
nprobe_candidates = [
max(1, int(nlist ** 0.5)), # sqrt(nlist)
max(1, nlist // 32),
max(1, nlist // 16),
max(1, nlist // 8)
]
best_nprobe = nprobe_candidates[0]
for nprobe in nprobe_candidates:
# Test search performance
start_time = time.time()
results = client.search(
collection_name,
data=[test_vector],
search_params={"nprobe": nprobe},
limit=100
)
latency = time.time() - start_time
print(f"nprobe={nprobe}, latency={latency:.4f}s")
# Choose based on latency/accuracy tradeoff
if latency < target_latency:
best_nprobe = nprobe
break
return {"nprobe": best_nprobe}def batch_search_optimize(collection_name: str, query_vectors: List[List[float]], batch_size: int = 100):
"""Optimize batch search for large query sets"""
all_results = []
# Process queries in batches to optimize memory usage
for i in range(0, len(query_vectors), batch_size):
batch_vectors = query_vectors[i:i + batch_size]
batch_results = client.search(
collection_name,
data=batch_vectors,
search_params={"nprobe": 16}, # Adjust based on index
limit=10,
round_decimal=4 # Reduce precision for network efficiency
)
all_results.extend(batch_results)
# Optional: progress tracking
print(f"Processed {min(i + batch_size, len(query_vectors))}/{len(query_vectors)} queries")
return all_resultsPyMilvus search operations provide powerful capabilities for similarity search, hybrid retrieval, and scalar filtering, enabling sophisticated search applications with fine-tuned performance optimization.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus