Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
The ORM (Object-Relational Mapping) classes provide advanced control over collection lifecycle, schema definition, and field configuration. These classes are ideal for production applications requiring fine-grained control over collection properties, indexing strategies, and data validation.
The Collection class is the primary interface for advanced collection operations with explicit schema control.
from pymilvus import Collection
def __init__(
self,
name: str,
schema: Optional[CollectionSchema] = None,
using: str = "default",
shards_num: int = 1,
consistency_level: str = "Bounded",
properties: Optional[Dict[str, str]] = None,
**kwargs
) -> NoneParameters:
name: Collection nameschema: CollectionSchema object defining structureusing: Connection alias (default: "default")shards_num: Number of shards for data distributionconsistency_level: "Strong", "Bounded", "Eventually", or "Session"properties: Custom collection properties**kwargs: Additional configuration optionsExamples:
# Create collection with existing schema
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768),
FieldSchema("metadata", DataType.JSON)
])
collection = Collection(
name="documents",
schema=schema,
shards_num=2,
consistency_level="Strong"
)
# Load existing collection
existing = Collection("existing_collection")@classmethod
def construct_from_dataframe(
cls,
name: str,
dataframe: pd.DataFrame,
primary_field: str = "id",
auto_id: bool = False,
**kwargs
) -> CollectionParameters:
dataframe: pandas DataFrame with dataprimary_field: Column name for primary keyauto_id: Enable auto-generated IDsExample:
import pandas as pd
# Create collection from DataFrame
df = pd.DataFrame({
"id": [1, 2, 3],
"vector": [[0.1]*128, [0.2]*128, [0.3]*128],
"text": ["doc1", "doc2", "doc3"]
})
collection = Collection.construct_from_dataframe(
"dataframe_collection",
df,
primary_field="id"
)# Schema information
collection.schema: CollectionSchema # Collection schema
collection.name: str # Collection name
collection.description: str # Collection description
# Data statistics
collection.is_empty: bool # True if collection has no data
collection.num_entities: int # Total entity count
collection.num_shards: int # Number of shards
# Field access
collection.primary_field: FieldSchema # Primary key field schema
collection.aliases: List[str] # List of collection aliases
# Related objects
collection.partitions: List[Partition] # List of partition objects
collection.indexes: List[Index] # List of index objectsdef load(
self,
partition_names: Optional[List[str]] = None,
replica_number: int = 1,
timeout: Optional[float] = None,
**kwargs
) -> NoneParameters:
partition_names: Specific partitions to load (default: all)replica_number: Number of replicas for high availability**kwargs: Additional loading options like _resource_groupsdef release(
self,
timeout: Optional[float] = None
) -> NoneExamples:
# Load entire collection with multiple replicas
collection.load(replica_number=2)
# Load specific partitions
collection.load(partition_names=["2024", "2023"])
# Load with resource group assignment
collection.load(replica_number=2, _resource_groups=["gpu_group"])
# Release from memory
collection.release()def insert(
self,
data: Union[List[List], List[Dict], pd.DataFrame],
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> MutationResultdef upsert(
self,
data: Union[List[List], List[Dict], pd.DataFrame],
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> MutationResultdef delete(
self,
expr: str,
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> MutationResultExamples:
# Insert data as list of dictionaries
data = [
{"id": 1, "embedding": [0.1] * 768, "category": "A"},
{"id": 2, "embedding": [0.2] * 768, "category": "B"}
]
result = collection.insert(data)
# Insert into specific partition
collection.insert(data, partition_name="recent")
# Delete by expression
collection.delete("category == 'obsolete'")
# Upsert (insert or update)
collection.upsert(updated_data)def query(
self,
expr: str,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
limit: int = 16384,
offset: int = 0,
timeout: Optional[float] = None,
**kwargs
) -> List[Dict[str, Any]]def search(
self,
data: Union[List[List[float]], List[Dict]],
anns_field: str,
param: Dict[str, Any],
limit: int = 10,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
timeout: Optional[float] = None,
round_decimal: int = -1,
**kwargs
) -> SearchResultdef hybrid_search(
self,
reqs: List[AnnSearchRequest],
rerank: Union[RRFRanker, WeightedRanker],
limit: int = 10,
partition_names: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
timeout: Optional[float] = None,
**kwargs
) -> SearchResultExamples:
# Query with filtering
results = collection.query(
expr="category in ['A', 'B'] and score > 0.5",
output_fields=["id", "category", "metadata"],
limit=100
)
# Vector search
search_results = collection.search(
data=[[0.1] * 768],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 16}},
limit=10,
expr="category == 'active'",
output_fields=["id", "title"]
)
# Hybrid search with multiple vector fields
from pymilvus import AnnSearchRequest, RRFRanker
req1 = AnnSearchRequest(
data=dense_vectors,
anns_field="dense_embedding",
param={"metric_type": "L2"},
limit=100
)
req2 = AnnSearchRequest(
data=sparse_vectors,
anns_field="sparse_embedding",
param={"metric_type": "IP"},
limit=100
)
hybrid_results = collection.hybrid_search(
reqs=[req1, req2],
rerank=RRFRanker(k=60),
limit=10
)def query_iterator(
self,
batch_size: int = 1000,
limit: Optional[int] = None,
expr: Optional[str] = None,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
timeout: Optional[float] = None,
**kwargs
) -> QueryIteratordef search_iterator(
self,
data: Union[List[List[float]], List[Dict]],
anns_field: str,
param: Dict[str, Any],
batch_size: int = 1000,
limit: Optional[int] = None,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
**kwargs
) -> SearchIteratordef create_partition(
self,
partition_name: str,
description: str = "",
timeout: Optional[float] = None
) -> Partitiondef drop_partition(
self,
partition_name: str,
timeout: Optional[float] = None
) -> Nonedef has_partition(
self,
partition_name: str,
timeout: Optional[float] = None
) -> booldef partition(
self,
partition_name: str
) -> PartitionExamples:
# Create partition
partition = collection.create_partition("2024_q1", "Q1 2024 data")
# Access existing partition
existing_partition = collection.partition("2024_q1")
# Check partition existence
if collection.has_partition("old_data"):
collection.drop_partition("old_data")
# List all partitions
for partition in collection.partitions:
print(f"Partition: {partition.name}, Entities: {partition.num_entities}")def create_index(
self,
field_name: str,
index_params: Dict[str, Any],
timeout: Optional[float] = None,
**kwargs
) -> Nonedef drop_index(
self,
field_name: Optional[str] = None,
index_name: Optional[str] = None,
timeout: Optional[float] = None
) -> Nonedef has_index(
self,
field_name: Optional[str] = None,
index_name: Optional[str] = None,
timeout: Optional[float] = None
) -> booldef index(
self,
field_name: Optional[str] = None,
index_name: Optional[str] = None
) -> IndexExamples:
# Create vector index
collection.create_index(
"embedding",
{
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": 2048,
"m": 16,
"nbits": 8
}
}
)
# Create scalar index
collection.create_index("category", {"index_type": "TRIE"})
# Access index information
if collection.has_index("embedding"):
idx = collection.index("embedding")
print(f"Index type: {idx.index_type}")def flush(
self,
timeout: Optional[float] = None,
**kwargs
) -> Nonedef drop(
self,
timeout: Optional[float] = None
) -> Nonedef compact(
self,
timeout: Optional[float] = None,
**kwargs
) -> intdef describe(
self,
timeout: Optional[float] = None
) -> Dict[str, Any]Defines the structure and configuration of a collection including fields, functions, and properties.
from pymilvus import CollectionSchema
def __init__(
self,
fields: List[FieldSchema],
description: str = "",
functions: Optional[List[Function]] = None,
**kwargs
) -> NoneParameters:
fields: List of FieldSchema objects defining collection structuredescription: Human-readable descriptionfunctions: List of Function objects for computed fields**kwargs: Schema configuration optionsKey Kwargs:
auto_id: Enable auto-generated primary keys (bool)enable_dynamic_field: Allow dynamic fields not in schema (bool)primary_field: Primary key field name (str)partition_key_field: Partition key field name (str)clustering_key_field_name: Clustering key field name (str)Examples:
from pymilvus import CollectionSchema, FieldSchema, DataType, Function, FunctionType
# Basic schema
basic_schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("vector", DataType.FLOAT_VECTOR, dim=768),
FieldSchema("text", DataType.VARCHAR, max_length=1000)
], description="Simple document collection")
# Advanced schema with all features
advanced_fields = [
FieldSchema("doc_id", DataType.VARCHAR, max_length=100, is_primary=True),
FieldSchema("category", DataType.VARCHAR, max_length=50, is_partition_key=True),
FieldSchema("timestamp", DataType.INT64, is_clustering_key=True),
FieldSchema("content", DataType.VARCHAR, max_length=5000),
FieldSchema("dense_vector", DataType.FLOAT_VECTOR, dim=768),
FieldSchema("sparse_vector", DataType.SPARSE_FLOAT_VECTOR),
FieldSchema("metadata", DataType.JSON)
]
# BM25 function for sparse vectors
bm25_function = Function(
name="bm25_sparse",
function_type=FunctionType.BM25,
input_field_names=["content"],
output_field_names=["sparse_vector"],
params={"language": "en"}
)
advanced_schema = CollectionSchema(
fields=advanced_fields,
functions=[bm25_function],
description="Production document collection",
enable_dynamic_field=True,
partition_key_field="category",
clustering_key_field_name="timestamp"
)schema.fields: List[FieldSchema] # All field definitions
schema.description: str # Schema description
schema.functions: Optional[List[Function]] # Computed field functions
# Special field access
schema.primary_field: Optional[FieldSchema] # Primary key field
schema.partition_key_field: Optional[FieldSchema] # Partition key field
schema.clustering_key_field: Optional[FieldSchema] # Clustering key field
# Configuration flags
schema.enable_dynamic_field: bool # Dynamic fields enabled
schema.auto_id: bool # Auto ID generation enableddef add_field(
self,
field_name: str,
datatype: DataType,
**kwargs
) -> Nonedef to_dict(self) -> Dict[str, Any]Example:
# Add field to existing schema
schema.add_field("score", DataType.DOUBLE, default_value=0.0)
# Convert to dictionary for inspection
schema_dict = schema.to_dict()
print(f"Fields: {len(schema_dict['fields'])}")Defines individual field properties including data type, constraints, and metadata.
from pymilvus import FieldSchema, DataType
def __init__(
self,
name: str,
dtype: DataType,
description: str = "",
**kwargs
) -> NoneParameters:
name: Field name (must be unique within schema)dtype: DataType enum valuedescription: Human-readable field description**kwargs: Field-specific configuration optionsKey Kwargs:
is_primary: Mark as primary key field (bool)auto_id: Enable auto-generated values for primary key (bool)max_length: Maximum length for VARCHAR fields (int)dim: Dimension for vector fields (int)max_capacity: Maximum capacity for ARRAY fields (int)element_type: Element data type for ARRAY fields (DataType)is_partition_key: Mark as partition key (bool)is_clustering_key: Mark as clustering key (bool)nullable: Allow null values (bool)default_value: Default field valuemmap_enabled: Enable memory mapping for large fields (bool)is_function_output: Mark as function output field (bool)# Primary key fields
id_field = FieldSchema("id", DataType.INT64, is_primary=True, auto_id=True)
uuid_field = FieldSchema("uuid", DataType.VARCHAR, max_length=36, is_primary=True)
# Vector fields
dense_vector = FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768)
binary_vector = FieldSchema("hash", DataType.BINARY_VECTOR, dim=128)
sparse_vector = FieldSchema("sparse", DataType.SPARSE_FLOAT_VECTOR)
# Half-precision vectors for memory efficiency
fp16_vector = FieldSchema("fp16_embed", DataType.FLOAT16_VECTOR, dim=512)
bf16_vector = FieldSchema("bf16_embed", DataType.BFLOAT16_VECTOR, dim=512)
# Scalar fields
text_field = FieldSchema("title", DataType.VARCHAR, max_length=200)
json_field = FieldSchema("metadata", DataType.JSON)
bool_field = FieldSchema("active", DataType.BOOL, default_value=True)
int_field = FieldSchema("count", DataType.INT32, default_value=0)
float_field = FieldSchema("score", DataType.DOUBLE, nullable=True)
# Array fields
tag_array = FieldSchema(
"tags",
DataType.ARRAY,
max_capacity=10,
element_type=DataType.VARCHAR
)
# Special purpose fields
partition_key = FieldSchema(
"category",
DataType.VARCHAR,
max_length=50,
is_partition_key=True
)
clustering_key = FieldSchema(
"timestamp",
DataType.INT64,
is_clustering_key=True
)
# Memory-mapped field for large data
large_field = FieldSchema(
"large_data",
DataType.VARCHAR,
max_length=10000,
mmap_enabled=True
)field.name: str # Field name
field.dtype: DataType # Data type
field.description: str # Field description
# Special properties
field.is_primary: bool # Primary key flag
field.is_dynamic: bool # Dynamic field flag
field.auto_id: bool # Auto ID generation flag
field.nullable: bool # Nullable flag
field.is_partition_key: bool # Partition key flag
field.is_clustering_key: bool # Clustering key flag
field.is_function_output: bool # Function output flag
# Type-specific properties
field.max_length: Optional[int] # VARCHAR max length
field.dim: Optional[int] # Vector dimension
field.max_capacity: Optional[int] # ARRAY max capacity
field.element_type: Optional[DataType] # ARRAY element type
field.default_value: Any # Default field value
field.mmap_enabled: Optional[bool] # Memory mapping enabledDefines computed fields that are automatically generated from input fields using built-in functions.
from pymilvus import Function, FunctionType
def __init__(
self,
name: str,
function_type: FunctionType,
input_field_names: Union[str, List[str]],
output_field_names: Optional[Union[str, List[str]]] = None,
description: str = "",
params: Optional[Dict] = None
)Parameters:
name: Function name (must be unique)function_type: FunctionType enum (BM25, TEXTEMBEDDING, RERANK)input_field_names: Source field name(s)output_field_names: Target field name(s)description: Function descriptionparams: Function-specific parameters# BM25 sparse vector generation
bm25_func = Function(
name="content_bm25",
function_type=FunctionType.BM25,
input_field_names=["content"],
output_field_names=["bm25_sparse"],
params={"language": "en"}
)
# Text embedding generation
embed_func = Function(
name="title_embedding",
function_type=FunctionType.TEXTEMBEDDING,
input_field_names=["title", "description"],
output_field_names=["text_embedding"],
params={
"model_name": "sentence-transformers/all-MiniLM-L6-v2",
"model_config": {"device": "gpu"}
}
)
# Reranking function
rerank_func = Function(
name="relevance_rerank",
function_type=FunctionType.RERANK,
input_field_names=["query", "document"],
output_field_names=["relevance_score"],
params={"model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2"}
)func.name: str # Function name
func.function_type: FunctionType # Function type enum
func.input_field_names: List[str] # Input field names
func.output_field_names: List[str] # Output field names
func.description: str # Function description
func.params: Optional[Dict] # Function parametersfrom pymilvus import CollectionSchema, FieldSchema, DataType, Function, FunctionType
# Define fields including function outputs
fields = [
# Primary key
FieldSchema("doc_id", DataType.VARCHAR, max_length=100, is_primary=True),
# Partitioning and clustering
FieldSchema("category", DataType.VARCHAR, max_length=50, is_partition_key=True),
FieldSchema("created_at", DataType.INT64, is_clustering_key=True),
# Input text fields
FieldSchema("title", DataType.VARCHAR, max_length=500),
FieldSchema("content", DataType.VARCHAR, max_length=10000),
# Vector fields (function outputs)
FieldSchema("title_embedding", DataType.FLOAT_VECTOR, dim=384, is_function_output=True),
FieldSchema("content_sparse", DataType.SPARSE_FLOAT_VECTOR, is_function_output=True),
# Metadata
FieldSchema("metadata", DataType.JSON),
FieldSchema("tags", DataType.ARRAY, max_capacity=20, element_type=DataType.VARCHAR)
]
# Define functions
functions = [
Function(
"title_embed",
FunctionType.TEXTEMBEDDING,
input_field_names=["title"],
output_field_names=["title_embedding"],
params={"model_name": "sentence-transformers/all-MiniLM-L6-v2"}
),
Function(
"content_bm25",
FunctionType.BM25,
input_field_names=["content"],
output_field_names=["content_sparse"],
params={"language": "en"}
)
]
# Create comprehensive schema
schema = CollectionSchema(
fields=fields,
functions=functions,
description="Multi-vector document collection with automatic embeddings",
enable_dynamic_field=True,
partition_key_field="category",
clustering_key_field_name="created_at"
)
# Create collection
collection = Collection("documents", schema)# Validate schema before collection creation
def validate_schema(schema: CollectionSchema) -> bool:
"""Validate schema configuration"""
# Check for primary key
if not schema.primary_field:
raise ValueError("Schema must have a primary key field")
# Validate vector dimensions
for field in schema.fields:
if field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]:
if not hasattr(field, 'dim') or field.dim <= 0:
raise ValueError(f"Vector field {field.name} must have valid dimension")
# Validate function input/output fields exist
if schema.functions:
field_names = {f.name for f in schema.fields}
for func in schema.functions:
for input_name in func.input_field_names:
if input_name not in field_names:
raise ValueError(f"Function {func.name} input field {input_name} not found")
for output_name in func.output_field_names:
if output_name not in field_names:
raise ValueError(f"Function {func.name} output field {output_name} not found")
return True
# Use validation
try:
validate_schema(schema)
collection = Collection("validated_collection", schema)
except ValueError as e:
print(f"Schema validation failed: {e}")The ORM classes provide comprehensive control over collection structure and behavior, enabling sophisticated data modeling patterns while maintaining type safety and validation.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus