Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
PyMilvus provides comprehensive index management for optimizing vector and scalar search performance. This covers vector index types, scalar indexes, index parameters, performance tuning, and maintenance operations.
# FLAT index provides exact search results with 100% recall
flat_index_params = {
"index_type": "FLAT",
"metric_type": "L2", # or "IP", "COSINE"
"params": {} # No additional parameters needed
}
client.create_index("collection", "vector_field", flat_index_params)Characteristics:
# IVF_FLAT partitions vectors into clusters for faster search
ivf_flat_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {
"nlist": 1024 # Number of clusters (16-16384, default: 1024)
}
}
client.create_index("large_collection", "embedding", ivf_flat_params)Parameters:
nlist: Number of clusters (more clusters = faster search but more memory)Search Parameters:
# Search parameters for IVF_FLAT
search_params = {
"nprobe": 16 # Number of clusters to search (1 to nlist)
}
results = client.search(
"large_collection",
data=[query_vector],
search_params=search_params,
limit=10
)Characteristics:
# IVF_PQ combines clustering with product quantization for memory efficiency
ivf_pq_params = {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": 2048, # Number of clusters
"m": 16, # Number of PQ segments (dim must be divisible by m)
"nbits": 8 # Bits per PQ centroid (4-16, default: 8)
}
}
client.create_index("huge_collection", "vector", ivf_pq_params)Parameters:
nlist: Number of clusters (similar to IVF_FLAT)m: PQ segments count (vector dimension must be divisible by m)nbits: Bits per segment (4, 6, 8, 10, 12, 16)Search Parameters:
search_params = {"nprobe": 32} # Usually need higher nprobe for good recallCharacteristics:
# HNSW builds a multi-layer graph for fast approximate search
hnsw_params = {
"index_type": "HNSW",
"metric_type": "L2",
"params": {
"M": 16, # Max bidirectional links (4-64, default: 16)
"efConstruction": 200 # Construction time/quality tradeoff (8-512, default: 200)
}
}
client.create_index("fast_search_collection", "embedding", hnsw_params)Parameters:
M: Maximum bidirectional links per node (higher = better recall, more memory)efConstruction: Size of dynamic candidate list (higher = better quality, slower build)Search Parameters:
search_params = {
"ef": 64 # Search scope (ef >= limit, higher = better recall)
}
# ef should be at least equal to the search limit
results = client.search(
"fast_search_collection",
data=[query_vector],
search_params=search_params,
limit=10 # ef should be >= 10
)Characteristics:
# SPARSE_INVERTED_INDEX for sparse vectors (BM25, TF-IDF)
sparse_index_params = {
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP", # Inner Product for sparse vectors
"params": {
"drop_ratio_build": 0.2 # Drop tokens with low frequency during build
}
}
client.create_index("text_collection", "sparse_embedding", sparse_index_params)Search Parameters:
sparse_search_params = {
"drop_ratio_search": 0.1 # Drop low-weight tokens during search
}# TRIE index for VARCHAR fields (prefix matching, equality)
trie_params = {"index_type": "TRIE"}
client.create_index("products", "category", trie_params)
# Examples of TRIE index benefits
results = client.query("products", "category == 'electronics'") # Fast equality
results = client.query("products", "category like 'elect%'") # Fast prefix matching# STL_SORT index for numeric fields (range queries, sorting)
sort_params = {"index_type": "STL_SORT"}
client.create_index("products", "price", sort_params)
# Examples of STL_SORT benefits
results = client.query("products", "price > 100 and price < 500") # Fast range queries
results = client.query("products", "price between 50 and 200") # Optimized range# INVERTED index for JSON fields (key-value queries)
inverted_params = {"index_type": "INVERTED"}
client.create_index("documents", "metadata", inverted_params)
# JSON queries benefit from INVERTED index
results = client.query("documents", "metadata['category'] == 'tech'")
results = client.query("documents", "json_contains(metadata['tags'], 'AI')")# INVERTED index also works for ARRAY fields
client.create_index("articles", "tags", {"index_type": "INVERTED"})
# Array queries with index optimization
results = client.query("articles", "array_contains(tags, 'machine-learning')")
results = client.query("articles", "array_contains_all(tags, ['AI', 'Python'])")from pymilvus import MilvusClient, Collection
# Using MilvusClient
client = MilvusClient()
def create_index(
collection_name: str,
field_name: str,
index_params: Dict[str, Any],
timeout: Optional[float] = None,
**kwargs
) -> None# Using Collection (ORM)
collection = Collection("my_collection")
def create_index(
field_name: str,
index_params: Dict[str, Any],
timeout: Optional[float] = None,
**kwargs
) -> NoneExamples:
# Create multiple indexes on different fields
index_operations = [
("vector_field", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 32, "efConstruction": 400}
}),
("category", {"index_type": "TRIE"}),
("price", {"index_type": "STL_SORT"}),
("metadata", {"index_type": "INVERTED"})
]
for field_name, params in index_operations:
client.create_index("products", field_name, params)
print(f"Created index on {field_name}")# List all indexes for a collection
indexes = client.list_indexes("products")
print(f"Indexes: {indexes}")
# Describe specific index
index_info = client.describe_index("products", "vector_field")
print(f"Index type: {index_info['index_type']}")
print(f"Metric type: {index_info['metric_type']}")
print(f"Parameters: {index_info['params']}")
# Drop index
client.drop_index("products", "old_field")
# Check if index exists (Collection ORM)
collection = Collection("products")
has_vector_index = collection.has_index("vector_field")from pymilvus import utility
# Monitor index building progress
progress = utility.index_building_progress("products", "vector_field")
print(f"Index progress: {progress['indexed_rows']}/{progress['total_rows']} ({progress['progress']}%)")
# Wait for index building to complete
utility.wait_for_index_building_complete("products", "vector_field", timeout=300)
print("Index building completed")
# Alternative: using Collection
collection = Collection("products")
collection.create_index("new_vector", hnsw_params)
# Monitor with polling
import time
while True:
progress = utility.index_building_progress("products", "new_vector")
if progress['progress'] == 100:
print("Index building completed")
break
print(f"Progress: {progress['progress']}%")
time.sleep(5)def optimize_hnsw_parameters(collection_name: str, vector_field: str, dataset_size: int, dimension: int):
"""Optimize HNSW parameters based on dataset characteristics"""
# M parameter guidelines
if dataset_size < 100000:
M = 16 # Default for small datasets
elif dataset_size < 1000000:
M = 32 # Better connectivity for medium datasets
else:
M = 64 # Maximum connectivity for large datasets
# efConstruction guidelines
if dimension < 128:
efConstruction = 200 # Default for low-dimensional data
elif dimension < 512:
efConstruction = 400 # Higher for medium dimensions
else:
efConstruction = 800 # Maximum for high dimensions
params = {
"index_type": "HNSW",
"metric_type": "L2",
"params": {
"M": M,
"efConstruction": efConstruction
}
}
print(f"Optimized HNSW params for {dataset_size} vectors, {dimension}D: M={M}, efConstruction={efConstruction}")
client.create_index(collection_name, vector_field, params)
return params
# Usage
hnsw_params = optimize_hnsw_parameters("large_collection", "embedding", 5000000, 768)def optimize_ivf_parameters(dataset_size: int, memory_constraint: bool = False):
"""Optimize IVF parameters based on dataset size and memory"""
if dataset_size < 10000:
# Use FLAT for small datasets
return {
"index_type": "FLAT",
"metric_type": "L2",
"params": {}
}
# nlist guidelines: sqrt(N) to N/39
nlist = min(16384, max(16, int(dataset_size ** 0.5)))
if memory_constraint and dataset_size > 1000000:
# Use IVF_PQ for memory efficiency
return {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": nlist,
"m": 16, # Adjust based on dimension
"nbits": 8 # 8 bits provides good compression
}
}
else:
# Use IVF_FLAT for better accuracy
return {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": nlist}
}
# Create optimized index
params = optimize_ivf_parameters(dataset_size=2000000, memory_constraint=True)
client.create_index("vectors", "embedding", params)def find_optimal_search_params(collection_name: str, vector_field: str, test_vectors: List[List[float]], target_recall: float = 0.95):
"""Find optimal search parameters for target recall"""
# Get index information
index_info = client.describe_index(collection_name, vector_field)
index_type = index_info['index_type']
if index_type == "HNSW":
# Test different ef values
ef_values = [32, 64, 128, 256, 512]
for ef in ef_values:
start_time = time.time()
results = client.search(
collection_name,
data=test_vectors,
search_params={"ef": ef},
limit=10
)
search_time = (time.time() - start_time) / len(test_vectors)
# In practice, calculate recall against ground truth
# recall = calculate_recall(results, ground_truth)
print(f"ef={ef}: {search_time:.4f}s per query")
# Return first ef that meets target (you'd implement recall calculation)
if ef >= 64: # Placeholder for recall check
return {"ef": ef}
elif index_type in ["IVF_FLAT", "IVF_PQ"]:
# Test different nprobe values
nlist = index_info['params'].get('nlist', 1024)
nprobe_values = [max(1, nlist//64), max(1, nlist//32), max(1, nlist//16), max(1, nlist//8)]
for nprobe in nprobe_values:
start_time = time.time()
results = client.search(
collection_name,
data=test_vectors,
search_params={"nprobe": nprobe},
limit=10
)
search_time = (time.time() - start_time) / len(test_vectors)
print(f"nprobe={nprobe}: {search_time:.4f}s per query")
# Return reasonable nprobe value
if nprobe >= nlist // 32:
return {"nprobe": nprobe}
return {}
# Find optimal parameters
optimal_params = find_optimal_search_params("products", "embedding", test_query_vectors)
print(f"Optimal search params: {optimal_params}")def create_comprehensive_indexes(collection_name: str):
"""Create optimized indexes for all searchable fields"""
# Get collection schema
schema_info = client.describe_collection(collection_name)
index_plan = []
for field in schema_info['schema']['fields']:
field_name = field['name']
field_type = field['type']
if field_type == 'FloatVector':
# Vector field - use HNSW for fast search
index_plan.append((field_name, {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 32, "efConstruction": 400}
}))
elif field_type == 'SparseFloatVector':
# Sparse vector - use sparse index
index_plan.append((field_name, {
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
}))
elif field_type == 'VarChar':
# String field - use TRIE for prefix/equality searches
max_length = field.get('params', {}).get('max_length', 0)
if max_length > 0 and max_length <= 1000: # Don't index very long text
index_plan.append((field_name, {"index_type": "TRIE"}))
elif field_type in ['Int64', 'Int32', 'Float', 'Double']:
# Numeric fields - use STL_SORT for range queries
index_plan.append((field_name, {"index_type": "STL_SORT"}))
elif field_type in ['JSON', 'Array']:
# Complex fields - use INVERTED for key-value/array searches
index_plan.append((field_name, {"index_type": "INVERTED"}))
# Execute index creation plan
for field_name, index_params in index_plan:
try:
client.create_index(collection_name, field_name, index_params)
print(f"✓ Created {index_params['index_type']} index on {field_name}")
except Exception as e:
print(f"✗ Failed to create index on {field_name}: {e}")
return index_plan
# Create all recommended indexes
plan = create_comprehensive_indexes("comprehensive_collection")def maintain_indexes(collection_name: str):
"""Perform index maintenance operations"""
# List current indexes
indexes = client.list_indexes(collection_name)
print(f"Current indexes: {indexes}")
# Check index status and rebuild if necessary
for field_name in indexes:
try:
# Get index information
index_info = client.describe_index(collection_name, field_name)
print(f"Index {field_name}: {index_info['index_type']}")
# Monitor index building progress
progress = utility.index_building_progress(collection_name, field_name)
if progress['state'] == 'Failed':
print(f"Index {field_name} build failed, rebuilding...")
# Drop and recreate failed index
client.drop_index(collection_name, field_name)
# Recreate with same parameters
client.create_index(collection_name, field_name, {
"index_type": index_info['index_type'],
"metric_type": index_info.get('metric_type', 'L2'),
"params": index_info['params']
})
except Exception as e:
print(f"Error checking index {field_name}: {e}")
# Perform maintenance
maintain_indexes("production_collection")def benchmark_index_performance(collection_name: str, test_queries: List[List[float]]):
"""Benchmark search performance with different index configurations"""
# Get current index info
index_info = client.describe_index(collection_name, "vector")
current_type = index_info['index_type']
print(f"Benchmarking {current_type} index...")
# Test different search parameter values
if current_type == "HNSW":
ef_values = [32, 64, 128, 256]
for ef in ef_values:
start_time = time.time()
# Run multiple test queries
total_results = 0
for query_vector in test_queries:
results = client.search(
collection_name,
data=[query_vector],
search_params={"ef": ef},
limit=10
)
total_results += len(results[0])
avg_time = (time.time() - start_time) / len(test_queries)
print(f"ef={ef}: {avg_time:.4f}s per query, {total_results} total results")
elif current_type in ["IVF_FLAT", "IVF_PQ"]:
nlist = index_info['params'].get('nlist', 1024)
nprobe_values = [max(1, nlist//64), max(1, nlist//32), max(1, nlist//16)]
for nprobe in nprobe_values:
start_time = time.time()
for query_vector in test_queries:
results = client.search(
collection_name,
data=[query_vector],
search_params={"nprobe": nprobe},
limit=10
)
avg_time = (time.time() - start_time) / len(test_queries)
print(f"nprobe={nprobe}: {avg_time:.4f}s per query")
# Generate test queries for benchmarking
import numpy as np
test_vectors = [np.random.rand(768).tolist() for _ in range(100)]
benchmark_index_performance("benchmark_collection", test_vectors)def setup_hybrid_collection_indexes(collection_name: str):
"""Setup optimized indexes for hybrid search collection"""
# Dense vector index for semantic similarity
client.create_index(
collection_name,
"dense_vector",
{
"index_type": "HNSW",
"metric_type": "COSINE", # Good for normalized embeddings
"params": {"M": 48, "efConstruction": 500}
}
)
# Sparse vector index for lexical matching
client.create_index(
collection_name,
"sparse_vector",
{
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.3}
}
)
# Scalar indexes for filtering
client.create_index(collection_name, "category", {"index_type": "TRIE"})
client.create_index(collection_name, "timestamp", {"index_type": "STL_SORT"})
client.create_index(collection_name, "metadata", {"index_type": "INVERTED"})
print("Hybrid collection indexes created successfully")
# Setup indexes for multi-vector search
setup_hybrid_collection_indexes("documents")Index management in PyMilvus provides powerful capabilities for optimizing search performance across different data types and access patterns. Proper index selection and tuning are crucial for achieving optimal performance in production vector database applications.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus