Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
npx @tessl/cli install tessl/pypi-pymilvus@2.6.0PyMilvus is the official Python SDK for Milvus, a cloud-native vector database designed for scalable similarity search and AI applications. It provides comprehensive capabilities for vector and scalar data storage, similarity search, collection management, indexing, and user authentication.
Installation:
pip install pymilvusImport:
import pymilvus
from pymilvus import MilvusClient, Collection, DataTypeVersion: Available via pymilvus.__version__
from pymilvus import MilvusClient, AsyncMilvusClient
# Synchronous client for common operations
client = MilvusClient(uri="http://localhost:19530")
# Asynchronous client for high-concurrency applications
async_client = AsyncMilvusClient(uri="http://localhost:19530")from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
from pymilvus import Index, Partition, Role
from pymilvus import Connections, connections
# Schema definition
schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128),
FieldSchema("metadata", DataType.JSON)
])
# Collection with ORM interface
collection = Collection("my_collection", schema)from pymilvus import SearchResult, Hit, Hits
from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker
# Hybrid search with reranking
requests = [AnnSearchRequest(data=vectors1, anns_field="vector1", param={"metric_type": "L2"}, limit=100)]
results = client.hybrid_search("collection", requests, RRFRanker(), limit=10)from pymilvus import utility
from pymilvus import create_user, delete_user, list_collections
from pymilvus import mkts_from_datetime, hybridts_to_datetime
# Direct utility access
utility.has_collection("my_collection")
mkts_from_datetime(datetime.now())from pymilvus import MilvusClient
# Initialize client
client = MilvusClient(uri="http://localhost:19530")
# Create collection with simple parameters
client.create_collection(
collection_name="quick_setup",
dimension=128,
metric_type="COSINE"
)
# Insert data
data = [
{"id": i, "vector": [0.1] * 128, "text": f"Document {i}"}
for i in range(1000)
]
client.insert("quick_setup", data)
# Search
results = client.search(
collection_name="quick_setup",
data=[[0.1] * 128], # Query vector
limit=5,
output_fields=["text"]
)from pymilvus import Collection, CollectionSchema, FieldSchema, DataType, Function, FunctionType
# Define schema with BM25 function
fields = [
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("text", DataType.VARCHAR, max_length=1000),
FieldSchema("dense_vector", DataType.FLOAT_VECTOR, dim=128),
FieldSchema("sparse_vector", DataType.SPARSE_FLOAT_VECTOR), # BM25 output
]
functions = [
Function("bm25_function", FunctionType.BM25,
input_field_names=["text"],
output_field_names=["sparse_vector"])
]
schema = CollectionSchema(fields, functions=functions, description="Hybrid search collection")
collection = Collection("hybrid_collection", schema)PyMilvus provides two complementary API approaches:
# Automatic schema creation
client.create_collection("simple", dimension=128)
# Direct operations
client.insert("simple", [{"id": 1, "vector": [0.1] * 128}])
results = client.search("simple", [[0.1] * 128], limit=5)# Explicit schema control
schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True, auto_id=False),
FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128),
], enable_dynamic_field=True)
collection = Collection("advanced", schema)
collection.create_index("vector", {"index_type": "IVF_FLAT", "nlist": 1024})Both interfaces can be used together and share the same underlying connection management.
Comprehensive vector database operations with multiple data types and search capabilities.
# Multi-vector hybrid search
from pymilvus import MilvusClient, AnnSearchRequest, RRFRanker
client = MilvusClient()
# Define multiple search requests
req1 = AnnSearchRequest(data=dense_vectors, anns_field="dense_vec",
param={"metric_type": "L2"}, limit=100)
req2 = AnnSearchRequest(data=sparse_vectors, anns_field="sparse_vec",
param={"metric_type": "IP"}, limit=100)
# Hybrid search with RRF reranking
results = client.hybrid_search(
collection_name="multi_vector_collection",
reqs=[req1, req2],
ranker=RRFRanker(k=60),
limit=10,
output_fields=["title", "content"]
)→ See Search Operations for complete search capabilities
Efficient data insertion, updates, and deletion with batch operations and iterators.
# Batch operations with upsert
from pymilvus import MilvusClient
client = MilvusClient()
# Upsert data (insert or update)
data = [
{"id": 1, "vector": [0.1] * 128, "metadata": {"category": "A"}},
{"id": 2, "vector": [0.2] * 128, "metadata": {"category": "B"}},
]
result = client.upsert("my_collection", data)
# Paginated query with iterator
iterator = client.query_iterator(
collection_name="my_collection",
expr="metadata['category'] == 'A'",
output_fields=["id", "metadata"],
batch_size=1000
)
for batch in iterator:
process_batch(batch)→ See Data Management for complete CRUD operations
Flexible schema definition with support for dynamic fields, functions, and partitioning.
# Advanced schema with clustering and partitioning
from pymilvus import CollectionSchema, FieldSchema, DataType, Function, FunctionType
schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("category", DataType.VARCHAR, max_length=100, is_partition_key=True),
FieldSchema("timestamp", DataType.INT64, is_clustering_key=True),
FieldSchema("content", DataType.VARCHAR, max_length=2000),
FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768),
FieldSchema("sparse_embedding", DataType.SPARSE_FLOAT_VECTOR),
], enable_dynamic_field=True, description="Production collection with advanced features")
# Add text embedding function
functions = [
Function("text_embed", FunctionType.TEXTEMBEDDING,
input_field_names=["content"],
output_field_names=["embedding"],
params={"model_name": "sentence-transformers/all-MiniLM-L6-v2"})
]
schema.functions = functions→ See ORM Collection for complete schema management
Advanced indexing strategies for optimal search performance across different vector types.
# Multi-index creation with performance tuning
from pymilvus import Collection
collection = Collection("optimized_collection")
# Vector index with custom parameters
collection.create_index(
field_name="dense_vector",
index_params={
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": 2048,
"m": 16,
"nbits": 8
}
}
)
# Scalar index for filtering
collection.create_index(
field_name="category",
index_params={"index_type": "TRIE"}
)
# Load collection with custom replica and resource group
collection.load(replica_number=2, _resource_groups=["rg1", "rg2"])→ See Index Management for complete indexing strategies
Comprehensive authentication, authorization, and resource management.
# Role-based access control
from pymilvus import MilvusClient
client = MilvusClient()
# Create role with specific privileges
client.create_role("data_analyst")
client.grant_privilege(
role_name="data_analyst",
object_type="Collection",
privilege="Search",
object_name="public_data"
)
# Create user and assign role
client.create_user("analyst1", "secure_password")
client.grant_role("analyst1", "data_analyst")
# Privilege group management
client.create_privilege_group("read_only_group")
client.add_privileges_to_group("read_only_group", ["Query", "Search"])→ See User Management for complete access control
Helper functions for timestamps, progress monitoring, and maintenance operations.
# Timestamp utilities and progress monitoring
from pymilvus import utility, mkts_from_datetime, hybridts_to_datetime
from datetime import datetime
# Create travel timestamp for point-in-time queries
travel_time = mkts_from_datetime(datetime(2024, 1, 1, 12, 0, 0))
# Monitor operations
progress = utility.loading_progress("my_collection")
print(f"Loading progress: {progress['progress']}%")
# Wait for operations to complete
utility.wait_for_loading_complete("my_collection", timeout=300)
# Resource group management
utility.create_resource_group("gpu_group", config={"requests": {"node_num": 2}})
utility.transfer_node("cpu_group", "gpu_group", 1)→ See Utility Functions for complete utility reference
Non-blocking operations for high-concurrency applications with full async/await support.
# Concurrent operations with AsyncMilvusClient
from pymilvus import AsyncMilvusClient
import asyncio
async def concurrent_searches():
client = AsyncMilvusClient()
# Concurrent search operations
tasks = []
for i in range(10):
task = client.search(
collection_name="large_collection",
data=[[0.1] * 128],
limit=100,
output_fields=["metadata"]
)
tasks.append(task)
# Wait for all searches to complete
results = await asyncio.gather(*tasks)
await client.close()
return results
# Run concurrent operations
results = asyncio.run(concurrent_searches())→ See MilvusClient for complete async capabilities
Comprehensive type system with enums for data types, index types, and configuration options.
# Type system and enums
from pymilvus import DataType, IndexType, FunctionType, ConsistencyLevel
# Vector data types
vector_types = [
DataType.FLOAT_VECTOR, # Standard dense vectors
DataType.BINARY_VECTOR, # Binary vectors for efficiency
DataType.FLOAT16_VECTOR, # Half-precision vectors
DataType.BFLOAT16_VECTOR, # BFloat16 vectors
DataType.SPARSE_FLOAT_VECTOR # Sparse vectors for text search
]
# Index algorithms
index_types = [
IndexType.FLAT, # Exact search
IndexType.IVF_FLAT, # Inverted file
IndexType.HNSW, # Hierarchical navigable small world
IndexType.IVF_PQ # Product quantization
]
# Consistency levels
levels = [
ConsistencyLevel.Strong, # Strong consistency
ConsistencyLevel.Eventually, # Eventual consistency
ConsistencyLevel.Bounded, # Bounded staleness
ConsistencyLevel.Session # Session consistency
]→ See Types and Enums for complete type reference
This documentation covers all 136+ public API components in PyMilvus, enabling comprehensive vector database operations without accessing source code.