Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
PyMilvus provides a comprehensive type system including data types, enums for configuration options, status codes, and complex type definitions. This reference covers all enumeration values, type constants, and data structures used throughout the API.
from pymilvus import DataType
class DataType(IntEnum):
NONE = 0
BOOL = 1
INT8 = 2
INT16 = 3
INT32 = 4
INT64 = 5
FLOAT = 10
DOUBLE = 11
STRING = 20 # Deprecated, use VARCHAR
VARCHAR = 21
ARRAY = 22
JSON = 23
GEOMETRY = 24 # For geospatial data
BINARY_VECTOR = 100
FLOAT_VECTOR = 101
FLOAT16_VECTOR = 102 # Half precision vectors
BFLOAT16_VECTOR = 103 # Brain float 16 vectors
SPARSE_FLOAT_VECTOR = 104 # Sparse vectors for text search
INT8_VECTOR = 105 # Quantized vectors
UNKNOWN = 999from pymilvus import FieldSchema, DataType
# Scalar data types
bool_field = FieldSchema("active", DataType.BOOL)
int_field = FieldSchema("count", DataType.INT64)
float_field = FieldSchema("score", DataType.DOUBLE)
text_field = FieldSchema("content", DataType.VARCHAR, max_length=1000)
# Vector data types
dense_vector = FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768)
binary_vector = FieldSchema("hash", DataType.BINARY_VECTOR, dim=128)
sparse_vector = FieldSchema("sparse_embed", DataType.SPARSE_FLOAT_VECTOR)
# Half-precision vectors for memory efficiency
fp16_vector = FieldSchema("fp16_embed", DataType.FLOAT16_VECTOR, dim=512)
bf16_vector = FieldSchema("bf16_embed", DataType.BFLOAT16_VECTOR, dim=512)
# Quantized vectors for storage efficiency
int8_vector = FieldSchema("quantized", DataType.INT8_VECTOR, dim=256)
# Complex data types
json_field = FieldSchema("metadata", DataType.JSON)
array_field = FieldSchema("tags", DataType.ARRAY,
max_capacity=20, element_type=DataType.VARCHAR)
# Geospatial data (experimental)
geo_field = FieldSchema("location", DataType.GEOMETRY)# Vector type memory usage comparison (per vector)
vector_memory_usage = {
DataType.FLOAT_VECTOR: "dimension * 4 bytes (32-bit floats)",
DataType.FLOAT16_VECTOR: "dimension * 2 bytes (16-bit floats)",
DataType.BFLOAT16_VECTOR: "dimension * 2 bytes (bfloat16)",
DataType.INT8_VECTOR: "dimension * 1 byte (quantized)",
DataType.BINARY_VECTOR: "dimension / 8 bytes (packed bits)",
DataType.SPARSE_FLOAT_VECTOR: "variable (only non-zero values stored)"
}
# Example: 768-dimension vectors
dimension = 768
for vector_type, formula in vector_memory_usage.items():
if "dimension *" in formula:
multiplier = float(formula.split(" * ")[1].split(" ")[0])
memory_bytes = dimension * multiplier
print(f"{vector_type.name}: {memory_bytes} bytes per vector")
else:
print(f"{vector_type.name}: {formula}")from pymilvus import IndexType
class IndexType(IntEnum):
INVALID = 0
FLAT = 1 # Exact search, 100% recall
IVFLAT = 2 # Alias: IVF_FLAT
IVF_SQ8 = 3 # IVF with scalar quantization
RNSG = 4 # Random Navigable Small Graph (deprecated)
IVF_SQ8H = 5 # Alias: IVF_SQ8_H
IVF_PQ = 6 # IVF with product quantization
HNSW = 11 # Hierarchical Navigable Small World
ANNOY = 12 # Approximate Nearest Neighbors Oh Yeah
AUTOINDEX = 13 # Automatic index selection
SPARSE_INVERTED_INDEX = 14 # For sparse vectors
SPARSE_WAND = 15 # Weak AND for sparse vectors
TRIE = 16 # For string prefix matching
STL_SORT = 17 # For numeric range queries
INVERTED = 18 # For JSON and array fields
GPU_IVF_FLAT = 19 # GPU-accelerated IVF_FLAT
GPU_IVF_PQ = 20 # GPU-accelerated IVF_PQ
GPU_BRUTE_FORCE = 21 # GPU exact search# Vector field indexes
vector_indexes = {
DataType.FLOAT_VECTOR: [
IndexType.FLAT, # Exact search
IndexType.IVF_FLAT, # Good accuracy/speed balance
IndexType.IVF_PQ, # Memory efficient
IndexType.HNSW, # Fast search
IndexType.ANNOY, # Memory efficient
IndexType.AUTOINDEX # Automatic selection
],
DataType.BINARY_VECTOR: [
IndexType.FLAT, # Exact Hamming distance
IndexType.IVF_FLAT # Approximate Hamming search
],
DataType.SPARSE_FLOAT_VECTOR: [
IndexType.SPARSE_INVERTED_INDEX, # Standard for sparse vectors
IndexType.SPARSE_WAND # Optimized sparse search
]
}
# Scalar field indexes
scalar_indexes = {
DataType.VARCHAR: [IndexType.TRIE], # String prefix/equality
DataType.INT64: [IndexType.STL_SORT], # Numeric range queries
DataType.DOUBLE: [IndexType.STL_SORT], # Numeric range queries
DataType.JSON: [IndexType.INVERTED], # Key-value queries
DataType.ARRAY: [IndexType.INVERTED] # Array containment queries
}
# GPU-accelerated indexes (requires GPU-enabled Milvus)
gpu_indexes = {
DataType.FLOAT_VECTOR: [
IndexType.GPU_IVF_FLAT,
IndexType.GPU_IVF_PQ,
IndexType.GPU_BRUTE_FORCE
]
}from pymilvus import FunctionType
class FunctionType(IntEnum):
UNKNOWN = 0
BM25 = 1 # Sparse vector generation from text
TEXTEMBEDDING = 2 # Dense vector generation from text
RERANK = 3 # Relevance scoring for rerankingfrom pymilvus import Function, FunctionType
# BM25 function for sparse text vectors
bm25_function = Function(
name="text_bm25",
function_type=FunctionType.BM25,
input_field_names=["content"],
output_field_names=["bm25_sparse"],
params={
"language": "en",
"k1": 1.2, # BM25 parameter
"b": 0.75 # BM25 parameter
}
)
# Text embedding function for dense vectors
embedding_function = Function(
name="text_embedding",
function_type=FunctionType.TEXTEMBEDDING,
input_field_names=["title", "description"],
output_field_names=["text_vector"],
params={
"model_name": "sentence-transformers/all-MiniLM-L6-v2",
"model_config": {
"device": "cuda:0",
"normalize_embeddings": True
}
}
)
# Reranking function for relevance scoring
rerank_function = Function(
name="cross_encoder_rerank",
function_type=FunctionType.RERANK,
input_field_names=["query", "document"],
output_field_names=["relevance_score"],
params={
"model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2",
"top_k": 100
}
)# Distance metrics for vector similarity
class MetricType:
L2 = "L2" # Euclidean distance
IP = "IP" # Inner Product (cosine for normalized vectors)
COSINE = "COSINE" # Cosine similarity (auto-normalized)
HAMMING = "HAMMING" # Hamming distance (for binary vectors)
JACCARD = "JACCARD" # Jaccard similarity (for binary vectors)
TANIMOTO = "TANIMOTO" # Tanimoto coefficient
SUBSTRUCTURE = "SUBSTRUCTURE" # Chemical substructure matching
SUPERSTRUCTURE = "SUPERSTRUCTURE" # Chemical superstructure matching# Metric selection by use case
metric_guidelines = {
"general_embeddings": "L2", # Most common for embeddings
"normalized_embeddings": "COSINE", # For unit vectors
"dot_product_similarity": "IP", # When vectors aren't normalized
"binary_hashes": "HAMMING", # For binary vectors
"molecular_fingerprints": "TANIMOTO", # Chemical similarity
"sparse_vectors": "IP" # For BM25/TF-IDF vectors
}
# Example index creation with different metrics
from pymilvus import MilvusClient
client = MilvusClient()
# L2 distance for general embeddings
client.create_index("documents", "embedding", {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 32, "efConstruction": 400}
})
# Cosine similarity for normalized text embeddings
client.create_index("articles", "text_vector", {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 1024}
})
# Inner product for sparse vectors
client.create_index("bm25_collection", "sparse_vector", {
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
})class LoadState:
NotExist = "NotExist" # Collection doesn't exist
NotLoad = "NotLoad" # Collection not loaded into memory
Loading = "Loading" # Currently loading
Loaded = "Loaded" # Fully loaded and readyclass IndexState:
IndexStateNone = "IndexStateNone" # No index information
Unissued = "Unissued" # Index build not started
InProgress = "InProgress" # Index building in progress
Finished = "Finished" # Index build completed
Failed = "Failed" # Index build failed
Deleted = "Deleted" # Index was deletedclass ConsistencyLevel:
Strong = "Strong" # Read your own writes immediately
Bounded = "Bounded" # Bounded staleness (default)
Eventually = "Eventually" # Eventual consistency
Session = "Session" # Session consistency
Customized = "Customized" # Custom consistency levelfrom pymilvus import MilvusClient, utility
client = MilvusClient()
# Check loading state
load_state = client.get_load_state("my_collection")
state = load_state.get("state", "Unknown")
if state == "NotLoad":
print("Collection not loaded, loading now...")
client.load_collection("my_collection")
elif state == "Loading":
print("Collection is currently loading...")
utility.wait_for_loading_complete("my_collection")
elif state == "Loaded":
print("Collection ready for search")
# Check index building state
index_progress = utility.index_building_progress("my_collection", "vector_field")
index_state = index_progress.get("index_state", "Unknown")
state_messages = {
"Unissued": "Index build queued",
"InProgress": f"Building index: {index_progress.get('progress', 0)}%",
"Finished": "Index build completed successfully",
"Failed": "Index build failed - check logs"
}
print(state_messages.get(index_state, f"Unknown state: {index_state}"))
# Use different consistency levels
search_results = client.search(
"my_collection",
data=[[0.1] * 768],
limit=10,
consistency_level="Strong" # Ensure latest data
)from pymilvus.client.types import Status
class Status:
SUCCESS = 0
UNEXPECTED_ERROR = 1
CONNECT_FAILED = 2
PERMISSION_DENIED = 3
COLLECTION_NOT_EXISTS = 4
ILLEGAL_ARGUMENT = 5
ILLEGAL_DIMENSION = 7
ILLEGAL_INDEX_TYPE = 8
ILLEGAL_COLLECTION_NAME = 9
ILLEGAL_TOPK = 10
ILLEGAL_ROWRECORD = 11
ILLEGAL_VECTOR_ID = 12
ILLEGAL_SEARCH_RESULT = 13
FILE_NOT_FOUND = 14
META_FAILED = 15
CACHE_FAILED = 16
CANNOT_CREATE_FOLDER = 17
CANNOT_CREATE_FILE = 18
CANNOT_DELETE_FOLDER = 19
CANNOT_DELETE_FILE = 20
BUILD_INDEX_ERROR = 21
ILLEGAL_NLIST = 22
ILLEGAL_METRIC_TYPE = 23
OUT_OF_MEMORY = 24
def __init__(self, code: int = SUCCESS, message: str = ""):
self.code = code
self.message = message
def OK(self) -> bool:
"""Return True if status indicates success"""
return self.code == Status.SUCCESSclass BulkInsertState:
def __init__(self):
self.task_id: int = 0
self.state: str = "" # "pending", "importing", "completed", "failed"
self.row_count: int = 0
self.id_list: List[int] = []
self.infos: Dict[str, Any] = {}
self.create_time: int = 0
self.progress: float = 0.0class Replica:
def __init__(self):
self.id: int = 0
self.collection_id: int = 0
self.partition_ids: List[int] = []
self.shard_replicas: List[Shard] = []
self.node_ids: List[int] = []
self.resource_group: str = ""
class Shard:
def __init__(self):
self.channel_name: str = ""
self.shard_leader: int = 0
self.shard_nodes: List[int] = []
self.dm_channel_name: str = ""class ResourceGroupInfo:
def __init__(self):
self.name: str = ""
self.capacity: int = 0
self.num_available_node: int = 0
self.num_loaded_replica: Dict[str, int] = {}
self.num_outgoing_node: Dict[str, int] = {}
self.num_incoming_node: Dict[str, int] = {}
self.config: Dict[str, Any] = {}
self.nodes: List[int] = []def validate_data_type_compatibility(field_type: DataType, value: Any) -> bool:
"""Validate if a value is compatible with a field type"""
type_validators = {
DataType.BOOL: lambda x: isinstance(x, bool),
DataType.INT8: lambda x: isinstance(x, int) and -128 <= x <= 127,
DataType.INT16: lambda x: isinstance(x, int) and -32768 <= x <= 32767,
DataType.INT32: lambda x: isinstance(x, int) and -2147483648 <= x <= 2147483647,
DataType.INT64: lambda x: isinstance(x, int),
DataType.FLOAT: lambda x: isinstance(x, (int, float)),
DataType.DOUBLE: lambda x: isinstance(x, (int, float)),
DataType.VARCHAR: lambda x: isinstance(x, str),
DataType.JSON: lambda x: isinstance(x, (dict, list, str, int, float, bool, type(None))),
DataType.ARRAY: lambda x: isinstance(x, list),
DataType.FLOAT_VECTOR: lambda x: isinstance(x, list) and all(isinstance(v, (int, float)) for v in x),
DataType.BINARY_VECTOR: lambda x: isinstance(x, (list, bytes)),
DataType.SPARSE_FLOAT_VECTOR: lambda x: isinstance(x, dict) or isinstance(x, list)
}
validator = type_validators.get(field_type)
if validator:
return validator(value)
return False
# Usage examples
test_values = [
(DataType.INT32, 12345, True),
(DataType.INT32, 3000000000, False), # Too large for INT32
(DataType.FLOAT_VECTOR, [0.1, 0.2, 0.3], True),
(DataType.FLOAT_VECTOR, [1, 2, "3"], False), # Invalid vector element
(DataType.VARCHAR, "hello", True),
(DataType.JSON, {"key": "value"}, True)
]
for field_type, value, expected in test_values:
result = validate_data_type_compatibility(field_type, value)
status = "✓" if result == expected else "✗"
print(f"{status} {field_type.name}: {value} -> {result}")def get_compatible_index_types(field_type: DataType) -> List[IndexType]:
"""Get compatible index types for a field type"""
compatibility_map = {
DataType.FLOAT_VECTOR: [
IndexType.FLAT, IndexType.IVF_FLAT, IndexType.IVF_PQ,
IndexType.HNSW, IndexType.ANNOY, IndexType.AUTOINDEX
],
DataType.BINARY_VECTOR: [
IndexType.FLAT, IndexType.IVF_FLAT
],
DataType.SPARSE_FLOAT_VECTOR: [
IndexType.SPARSE_INVERTED_INDEX, IndexType.SPARSE_WAND
],
DataType.VARCHAR: [IndexType.TRIE],
DataType.INT64: [IndexType.STL_SORT],
DataType.INT32: [IndexType.STL_SORT],
DataType.DOUBLE: [IndexType.STL_SORT],
DataType.FLOAT: [IndexType.STL_SORT],
DataType.JSON: [IndexType.INVERTED],
DataType.ARRAY: [IndexType.INVERTED]
}
return compatibility_map.get(field_type, [])
# Check index compatibility
field_types = [DataType.FLOAT_VECTOR, DataType.VARCHAR, DataType.JSON]
for field_type in field_types:
compatible_indexes = get_compatible_index_types(field_type)
print(f"{field_type.name} compatible indexes:")
for index_type in compatible_indexes:
print(f" - {index_type.name}")def get_compatible_metrics(field_type: DataType) -> List[str]:
"""Get compatible metric types for a field type"""
metric_compatibility = {
DataType.FLOAT_VECTOR: ["L2", "IP", "COSINE"],
DataType.BINARY_VECTOR: ["HAMMING", "JACCARD", "TANIMOTO", "SUBSTRUCTURE", "SUPERSTRUCTURE"],
DataType.SPARSE_FLOAT_VECTOR: ["IP"],
DataType.FLOAT16_VECTOR: ["L2", "IP", "COSINE"],
DataType.BFLOAT16_VECTOR: ["L2", "IP", "COSINE"],
DataType.INT8_VECTOR: ["L2", "IP", "COSINE"]
}
return metric_compatibility.get(field_type, [])
# Validate metric compatibility
vector_fields = [
(DataType.FLOAT_VECTOR, "L2"),
(DataType.BINARY_VECTOR, "HAMMING"),
(DataType.SPARSE_FLOAT_VECTOR, "IP"),
(DataType.FLOAT_VECTOR, "HAMMING") # Invalid combination
]
for field_type, metric in vector_fields:
compatible_metrics = get_compatible_metrics(field_type)
is_compatible = metric in compatible_metrics
status = "✓" if is_compatible else "✗"
print(f"{status} {field_type.name} + {metric}: {is_compatible}")class DefaultConfig:
# Connection defaults
DEFAULT_HOST = "localhost"
DEFAULT_PORT = "19530"
DEFAULT_URI = f"http://{DEFAULT_HOST}:{DEFAULT_PORT}"
MILVUS_CONN_ALIAS = "default"
MILVUS_CONN_TIMEOUT = 10.0
# Data limits
MaxVarCharLength = 65535
MaxArrayCapacity = 4096
MaxDimension = 32768
# Encoding
EncodeProtocol = "utf-8"
# Index defaults
DefaultIndexType = IndexType.AUTOINDEX
DefaultMetricType = "L2"
# Search defaults
DefaultSearchLimit = 10
DefaultQueryLimit = 16384
DefaultBatchSize = 1000import os
# Environment-based configuration
def get_milvus_config():
"""Get Milvus configuration from environment variables"""
config = {
"uri": os.getenv("MILVUS_URI", DefaultConfig.DEFAULT_URI),
"user": os.getenv("MILVUS_USER", ""),
"password": os.getenv("MILVUS_PASSWORD", ""),
"db_name": os.getenv("MILVUS_DB_NAME", ""),
"timeout": float(os.getenv("MILVUS_TIMEOUT", DefaultConfig.MILVUS_CONN_TIMEOUT))
}
return config
# Use environment configuration
config = get_milvus_config()
client = MilvusClient(**config)PyMilvus types and enums provide a comprehensive type system for vector database operations, ensuring type safety and providing clear configuration options for all aspects of data management, indexing, and search operations.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus