CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Pending
Overview
Eval results
Files

orm-collection.mddocs/

ORM Collection, Schema, and Field Classes

The ORM (Object-Relational Mapping) classes provide advanced control over collection lifecycle, schema definition, and field configuration. These classes are ideal for production applications requiring fine-grained control over collection properties, indexing strategies, and data validation.

Collection

The Collection class is the primary interface for advanced collection operations with explicit schema control.

Constructor

from pymilvus import Collection

def __init__(
    self,
    name: str,
    schema: Optional[CollectionSchema] = None,
    using: str = "default",
    shards_num: int = 1,
    consistency_level: str = "Bounded",
    properties: Optional[Dict[str, str]] = None,
    **kwargs
) -> None

Parameters:

  • name: Collection name
  • schema: CollectionSchema object defining structure
  • using: Connection alias (default: "default")
  • shards_num: Number of shards for data distribution
  • consistency_level: "Strong", "Bounded", "Eventually", or "Session"
  • properties: Custom collection properties
  • **kwargs: Additional configuration options

Examples:

# Create collection with existing schema
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType

schema = CollectionSchema([
    FieldSchema("id", DataType.INT64, is_primary=True),
    FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768),
    FieldSchema("metadata", DataType.JSON)
])

collection = Collection(
    name="documents",
    schema=schema,
    shards_num=2,
    consistency_level="Strong"
)

# Load existing collection
existing = Collection("existing_collection")

Class Method: construct_from_dataframe

@classmethod
def construct_from_dataframe(
    cls,
    name: str,
    dataframe: pd.DataFrame,
    primary_field: str = "id",
    auto_id: bool = False,
    **kwargs
) -> Collection

Parameters:

  • dataframe: pandas DataFrame with data
  • primary_field: Column name for primary key
  • auto_id: Enable auto-generated IDs

Example:

import pandas as pd

# Create collection from DataFrame
df = pd.DataFrame({
    "id": [1, 2, 3],
    "vector": [[0.1]*128, [0.2]*128, [0.3]*128], 
    "text": ["doc1", "doc2", "doc3"]
})

collection = Collection.construct_from_dataframe(
    "dataframe_collection",
    df,
    primary_field="id"
)

Properties

# Schema information
collection.schema: CollectionSchema          # Collection schema
collection.name: str                         # Collection name  
collection.description: str                  # Collection description

# Data statistics
collection.is_empty: bool                    # True if collection has no data
collection.num_entities: int                 # Total entity count
collection.num_shards: int                   # Number of shards

# Field access
collection.primary_field: FieldSchema        # Primary key field schema
collection.aliases: List[str]                # List of collection aliases

# Related objects
collection.partitions: List[Partition]       # List of partition objects
collection.indexes: List[Index]              # List of index objects

Memory Management

def load(
    self,
    partition_names: Optional[List[str]] = None,
    replica_number: int = 1,
    timeout: Optional[float] = None,
    **kwargs
) -> None

Parameters:

  • partition_names: Specific partitions to load (default: all)
  • replica_number: Number of replicas for high availability
  • **kwargs: Additional loading options like _resource_groups
def release(
    self,
    timeout: Optional[float] = None
) -> None

Examples:

# Load entire collection with multiple replicas
collection.load(replica_number=2)

# Load specific partitions
collection.load(partition_names=["2024", "2023"])

# Load with resource group assignment
collection.load(replica_number=2, _resource_groups=["gpu_group"])

# Release from memory
collection.release()

Data Operations

def insert(
    self,
    data: Union[List[List], List[Dict], pd.DataFrame],
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> MutationResult
def upsert(
    self,
    data: Union[List[List], List[Dict], pd.DataFrame], 
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> MutationResult
def delete(
    self,
    expr: str,
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> MutationResult

Examples:

# Insert data as list of dictionaries
data = [
    {"id": 1, "embedding": [0.1] * 768, "category": "A"},
    {"id": 2, "embedding": [0.2] * 768, "category": "B"}
]
result = collection.insert(data)

# Insert into specific partition
collection.insert(data, partition_name="recent")

# Delete by expression
collection.delete("category == 'obsolete'")

# Upsert (insert or update)
collection.upsert(updated_data)

Query and Search Operations

def query(
    self,
    expr: str,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    limit: int = 16384,
    offset: int = 0,
    timeout: Optional[float] = None,
    **kwargs
) -> List[Dict[str, Any]]
def search(
    self,
    data: Union[List[List[float]], List[Dict]],
    anns_field: str,
    param: Dict[str, Any],
    limit: int = 10,
    expr: Optional[str] = None,
    partition_names: Optional[List[str]] = None,
    output_fields: Optional[List[str]] = None,
    timeout: Optional[float] = None,
    round_decimal: int = -1,
    **kwargs
) -> SearchResult
def hybrid_search(
    self,
    reqs: List[AnnSearchRequest],
    rerank: Union[RRFRanker, WeightedRanker],
    limit: int = 10,
    partition_names: Optional[List[str]] = None,
    output_fields: Optional[List[str]] = None, 
    timeout: Optional[float] = None,
    **kwargs
) -> SearchResult

Examples:

# Query with filtering
results = collection.query(
    expr="category in ['A', 'B'] and score > 0.5",
    output_fields=["id", "category", "metadata"],
    limit=100
)

# Vector search
search_results = collection.search(
    data=[[0.1] * 768],
    anns_field="embedding", 
    param={"metric_type": "L2", "params": {"nprobe": 16}},
    limit=10,
    expr="category == 'active'",
    output_fields=["id", "title"]
)

# Hybrid search with multiple vector fields
from pymilvus import AnnSearchRequest, RRFRanker

req1 = AnnSearchRequest(
    data=dense_vectors,
    anns_field="dense_embedding", 
    param={"metric_type": "L2"},
    limit=100
)

req2 = AnnSearchRequest(
    data=sparse_vectors,
    anns_field="sparse_embedding",
    param={"metric_type": "IP"},
    limit=100
)

hybrid_results = collection.hybrid_search(
    reqs=[req1, req2],
    rerank=RRFRanker(k=60),
    limit=10
)

Iterator Operations

def query_iterator(
    self,
    batch_size: int = 1000,
    limit: Optional[int] = None,
    expr: Optional[str] = None,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> QueryIterator
def search_iterator(
    self,
    data: Union[List[List[float]], List[Dict]],
    anns_field: str,
    param: Dict[str, Any], 
    batch_size: int = 1000,
    limit: Optional[int] = None,
    expr: Optional[str] = None,
    partition_names: Optional[List[str]] = None,
    output_fields: Optional[List[str]] = None,
    **kwargs
) -> SearchIterator

Partition Management

def create_partition(
    self,
    partition_name: str,
    description: str = "",
    timeout: Optional[float] = None
) -> Partition
def drop_partition(
    self,
    partition_name: str,
    timeout: Optional[float] = None
) -> None
def has_partition(
    self,
    partition_name: str,
    timeout: Optional[float] = None
) -> bool
def partition(
    self,
    partition_name: str
) -> Partition

Examples:

# Create partition
partition = collection.create_partition("2024_q1", "Q1 2024 data")

# Access existing partition
existing_partition = collection.partition("2024_q1")

# Check partition existence
if collection.has_partition("old_data"):
    collection.drop_partition("old_data")

# List all partitions
for partition in collection.partitions:
    print(f"Partition: {partition.name}, Entities: {partition.num_entities}")

Index Management

def create_index(
    self,
    field_name: str,
    index_params: Dict[str, Any],
    timeout: Optional[float] = None,
    **kwargs
) -> None
def drop_index(
    self,
    field_name: Optional[str] = None,
    index_name: Optional[str] = None,
    timeout: Optional[float] = None
) -> None
def has_index(
    self,
    field_name: Optional[str] = None,
    index_name: Optional[str] = None,
    timeout: Optional[float] = None
) -> bool
def index(
    self,
    field_name: Optional[str] = None,
    index_name: Optional[str] = None
) -> Index

Examples:

# Create vector index
collection.create_index(
    "embedding",
    {
        "index_type": "IVF_PQ",
        "metric_type": "L2",
        "params": {
            "nlist": 2048,
            "m": 16, 
            "nbits": 8
        }
    }
)

# Create scalar index
collection.create_index("category", {"index_type": "TRIE"})

# Access index information
if collection.has_index("embedding"):
    idx = collection.index("embedding")
    print(f"Index type: {idx.index_type}")

Collection Management

def flush(
    self,
    timeout: Optional[float] = None,
    **kwargs
) -> None
def drop(
    self,
    timeout: Optional[float] = None
) -> None
def compact(
    self,
    timeout: Optional[float] = None,
    **kwargs
) -> int
def describe(
    self,
    timeout: Optional[float] = None
) -> Dict[str, Any]

CollectionSchema

Defines the structure and configuration of a collection including fields, functions, and properties.

Constructor

from pymilvus import CollectionSchema

def __init__(
    self,
    fields: List[FieldSchema],
    description: str = "",
    functions: Optional[List[Function]] = None,
    **kwargs
) -> None

Parameters:

  • fields: List of FieldSchema objects defining collection structure
  • description: Human-readable description
  • functions: List of Function objects for computed fields
  • **kwargs: Schema configuration options

Key Kwargs:

  • auto_id: Enable auto-generated primary keys (bool)
  • enable_dynamic_field: Allow dynamic fields not in schema (bool)
  • primary_field: Primary key field name (str)
  • partition_key_field: Partition key field name (str)
  • clustering_key_field_name: Clustering key field name (str)

Examples:

from pymilvus import CollectionSchema, FieldSchema, DataType, Function, FunctionType

# Basic schema
basic_schema = CollectionSchema([
    FieldSchema("id", DataType.INT64, is_primary=True),
    FieldSchema("vector", DataType.FLOAT_VECTOR, dim=768),
    FieldSchema("text", DataType.VARCHAR, max_length=1000)
], description="Simple document collection")

# Advanced schema with all features
advanced_fields = [
    FieldSchema("doc_id", DataType.VARCHAR, max_length=100, is_primary=True),
    FieldSchema("category", DataType.VARCHAR, max_length=50, is_partition_key=True),
    FieldSchema("timestamp", DataType.INT64, is_clustering_key=True),
    FieldSchema("content", DataType.VARCHAR, max_length=5000),
    FieldSchema("dense_vector", DataType.FLOAT_VECTOR, dim=768),
    FieldSchema("sparse_vector", DataType.SPARSE_FLOAT_VECTOR),
    FieldSchema("metadata", DataType.JSON)
]

# BM25 function for sparse vectors
bm25_function = Function(
    name="bm25_sparse",
    function_type=FunctionType.BM25,
    input_field_names=["content"],
    output_field_names=["sparse_vector"],
    params={"language": "en"}
)

advanced_schema = CollectionSchema(
    fields=advanced_fields,
    functions=[bm25_function],
    description="Production document collection",
    enable_dynamic_field=True,
    partition_key_field="category",
    clustering_key_field_name="timestamp"
)

Properties

schema.fields: List[FieldSchema]              # All field definitions
schema.description: str                       # Schema description
schema.functions: Optional[List[Function]]    # Computed field functions

# Special field access
schema.primary_field: Optional[FieldSchema]   # Primary key field
schema.partition_key_field: Optional[FieldSchema]  # Partition key field  
schema.clustering_key_field: Optional[FieldSchema] # Clustering key field

# Configuration flags
schema.enable_dynamic_field: bool             # Dynamic fields enabled
schema.auto_id: bool                         # Auto ID generation enabled

Methods

def add_field(
    self,
    field_name: str,
    datatype: DataType,
    **kwargs
) -> None
def to_dict(self) -> Dict[str, Any]

Example:

# Add field to existing schema
schema.add_field("score", DataType.DOUBLE, default_value=0.0)

# Convert to dictionary for inspection
schema_dict = schema.to_dict()
print(f"Fields: {len(schema_dict['fields'])}")

FieldSchema

Defines individual field properties including data type, constraints, and metadata.

Constructor

from pymilvus import FieldSchema, DataType

def __init__(
    self,
    name: str,
    dtype: DataType,
    description: str = "",
    **kwargs
) -> None

Parameters:

  • name: Field name (must be unique within schema)
  • dtype: DataType enum value
  • description: Human-readable field description
  • **kwargs: Field-specific configuration options

Key Kwargs:

  • is_primary: Mark as primary key field (bool)
  • auto_id: Enable auto-generated values for primary key (bool)
  • max_length: Maximum length for VARCHAR fields (int)
  • dim: Dimension for vector fields (int)
  • max_capacity: Maximum capacity for ARRAY fields (int)
  • element_type: Element data type for ARRAY fields (DataType)
  • is_partition_key: Mark as partition key (bool)
  • is_clustering_key: Mark as clustering key (bool)
  • nullable: Allow null values (bool)
  • default_value: Default field value
  • mmap_enabled: Enable memory mapping for large fields (bool)
  • is_function_output: Mark as function output field (bool)

Field Type Examples

# Primary key fields
id_field = FieldSchema("id", DataType.INT64, is_primary=True, auto_id=True)
uuid_field = FieldSchema("uuid", DataType.VARCHAR, max_length=36, is_primary=True)

# Vector fields
dense_vector = FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768)
binary_vector = FieldSchema("hash", DataType.BINARY_VECTOR, dim=128)
sparse_vector = FieldSchema("sparse", DataType.SPARSE_FLOAT_VECTOR)

# Half-precision vectors for memory efficiency
fp16_vector = FieldSchema("fp16_embed", DataType.FLOAT16_VECTOR, dim=512)
bf16_vector = FieldSchema("bf16_embed", DataType.BFLOAT16_VECTOR, dim=512)

# Scalar fields
text_field = FieldSchema("title", DataType.VARCHAR, max_length=200)
json_field = FieldSchema("metadata", DataType.JSON)
bool_field = FieldSchema("active", DataType.BOOL, default_value=True)
int_field = FieldSchema("count", DataType.INT32, default_value=0)
float_field = FieldSchema("score", DataType.DOUBLE, nullable=True)

# Array fields
tag_array = FieldSchema(
    "tags", 
    DataType.ARRAY,
    max_capacity=10,
    element_type=DataType.VARCHAR
)

# Special purpose fields
partition_key = FieldSchema(
    "category", 
    DataType.VARCHAR, 
    max_length=50, 
    is_partition_key=True
)

clustering_key = FieldSchema(
    "timestamp", 
    DataType.INT64, 
    is_clustering_key=True
)

# Memory-mapped field for large data
large_field = FieldSchema(
    "large_data", 
    DataType.VARCHAR, 
    max_length=10000,
    mmap_enabled=True
)

Properties

field.name: str                    # Field name
field.dtype: DataType             # Data type
field.description: str            # Field description

# Special properties
field.is_primary: bool            # Primary key flag
field.is_dynamic: bool            # Dynamic field flag  
field.auto_id: bool              # Auto ID generation flag
field.nullable: bool             # Nullable flag
field.is_partition_key: bool     # Partition key flag
field.is_clustering_key: bool    # Clustering key flag
field.is_function_output: bool   # Function output flag

# Type-specific properties
field.max_length: Optional[int]   # VARCHAR max length
field.dim: Optional[int]         # Vector dimension
field.max_capacity: Optional[int] # ARRAY max capacity
field.element_type: Optional[DataType] # ARRAY element type
field.default_value: Any        # Default field value
field.mmap_enabled: Optional[bool] # Memory mapping enabled

Function

Defines computed fields that are automatically generated from input fields using built-in functions.

Constructor

from pymilvus import Function, FunctionType

def __init__(
    self,
    name: str,
    function_type: FunctionType,
    input_field_names: Union[str, List[str]],
    output_field_names: Optional[Union[str, List[str]]] = None,
    description: str = "",
    params: Optional[Dict] = None
)

Parameters:

  • name: Function name (must be unique)
  • function_type: FunctionType enum (BM25, TEXTEMBEDDING, RERANK)
  • input_field_names: Source field name(s)
  • output_field_names: Target field name(s)
  • description: Function description
  • params: Function-specific parameters

Function Types

# BM25 sparse vector generation
bm25_func = Function(
    name="content_bm25",
    function_type=FunctionType.BM25,
    input_field_names=["content"],
    output_field_names=["bm25_sparse"],
    params={"language": "en"}
)

# Text embedding generation
embed_func = Function(
    name="title_embedding", 
    function_type=FunctionType.TEXTEMBEDDING,
    input_field_names=["title", "description"],
    output_field_names=["text_embedding"],
    params={
        "model_name": "sentence-transformers/all-MiniLM-L6-v2",
        "model_config": {"device": "gpu"}
    }
)

# Reranking function
rerank_func = Function(
    name="relevance_rerank",
    function_type=FunctionType.RERANK,
    input_field_names=["query", "document"],
    output_field_names=["relevance_score"],
    params={"model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2"}
)

Properties

func.name: str                           # Function name
func.function_type: FunctionType        # Function type enum
func.input_field_names: List[str]       # Input field names
func.output_field_names: List[str]      # Output field names  
func.description: str                   # Function description
func.params: Optional[Dict]             # Function parameters

Advanced Schema Patterns

Multi-Vector Collection with Functions

from pymilvus import CollectionSchema, FieldSchema, DataType, Function, FunctionType

# Define fields including function outputs
fields = [
    # Primary key
    FieldSchema("doc_id", DataType.VARCHAR, max_length=100, is_primary=True),
    
    # Partitioning and clustering
    FieldSchema("category", DataType.VARCHAR, max_length=50, is_partition_key=True),
    FieldSchema("created_at", DataType.INT64, is_clustering_key=True),
    
    # Input text fields
    FieldSchema("title", DataType.VARCHAR, max_length=500),
    FieldSchema("content", DataType.VARCHAR, max_length=10000),
    
    # Vector fields (function outputs)
    FieldSchema("title_embedding", DataType.FLOAT_VECTOR, dim=384, is_function_output=True),
    FieldSchema("content_sparse", DataType.SPARSE_FLOAT_VECTOR, is_function_output=True),
    
    # Metadata
    FieldSchema("metadata", DataType.JSON),
    FieldSchema("tags", DataType.ARRAY, max_capacity=20, element_type=DataType.VARCHAR)
]

# Define functions
functions = [
    Function(
        "title_embed",
        FunctionType.TEXTEMBEDDING,
        input_field_names=["title"],
        output_field_names=["title_embedding"],
        params={"model_name": "sentence-transformers/all-MiniLM-L6-v2"}
    ),
    Function(
        "content_bm25",
        FunctionType.BM25,
        input_field_names=["content"], 
        output_field_names=["content_sparse"],
        params={"language": "en"}
    )
]

# Create comprehensive schema
schema = CollectionSchema(
    fields=fields,
    functions=functions,
    description="Multi-vector document collection with automatic embeddings",
    enable_dynamic_field=True,
    partition_key_field="category",
    clustering_key_field_name="created_at"
)

# Create collection
collection = Collection("documents", schema)

Schema Validation and Best Practices

# Validate schema before collection creation
def validate_schema(schema: CollectionSchema) -> bool:
    """Validate schema configuration"""
    
    # Check for primary key
    if not schema.primary_field:
        raise ValueError("Schema must have a primary key field")
    
    # Validate vector dimensions
    for field in schema.fields:
        if field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]:
            if not hasattr(field, 'dim') or field.dim <= 0:
                raise ValueError(f"Vector field {field.name} must have valid dimension")
    
    # Validate function input/output fields exist
    if schema.functions:
        field_names = {f.name for f in schema.fields}
        for func in schema.functions:
            for input_name in func.input_field_names:
                if input_name not in field_names:
                    raise ValueError(f"Function {func.name} input field {input_name} not found")
            for output_name in func.output_field_names:
                if output_name not in field_names:
                    raise ValueError(f"Function {func.name} output field {output_name} not found")
    
    return True

# Use validation
try:
    validate_schema(schema)
    collection = Collection("validated_collection", schema)
except ValueError as e:
    print(f"Schema validation failed: {e}")

The ORM classes provide comprehensive control over collection structure and behavior, enabling sophisticated data modeling patterns while maintaining type safety and validation.

Install with Tessl CLI

npx tessl i tessl/pypi-pymilvus

docs

data-management.md

index-management.md

index.md

milvus-client.md

orm-collection.md

search-operations.md

types-enums.md

user-management.md

utility-functions.md

tile.json