or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-management.mdindex-management.mdindex.mdmilvus-client.mdorm-collection.mdsearch-operations.mdtypes-enums.mduser-management.mdutility-functions.md
tile.json

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pymilvus@2.6.x

To install, run

npx @tessl/cli install tessl/pypi-pymilvus@2.6.0

index.mddocs/

PyMilvus - Python SDK for Milvus Vector Database

PyMilvus is the official Python SDK for Milvus, a cloud-native vector database designed for scalable similarity search and AI applications. It provides comprehensive capabilities for vector and scalar data storage, similarity search, collection management, indexing, and user authentication.

Package Information

Installation:

pip install pymilvus

Import:

import pymilvus
from pymilvus import MilvusClient, Collection, DataType

Version: Available via pymilvus.__version__

Core Imports

Primary Client Interface

from pymilvus import MilvusClient, AsyncMilvusClient

# Synchronous client for common operations
client = MilvusClient(uri="http://localhost:19530")

# Asynchronous client for high-concurrency applications  
async_client = AsyncMilvusClient(uri="http://localhost:19530")

ORM Classes for Advanced Usage

from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
from pymilvus import Index, Partition, Role
from pymilvus import Connections, connections

# Schema definition
schema = CollectionSchema([
    FieldSchema("id", DataType.INT64, is_primary=True),
    FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128),
    FieldSchema("metadata", DataType.JSON)
])

# Collection with ORM interface
collection = Collection("my_collection", schema)

Search and Results

from pymilvus import SearchResult, Hit, Hits
from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker

# Hybrid search with reranking
requests = [AnnSearchRequest(data=vectors1, anns_field="vector1", param={"metric_type": "L2"}, limit=100)]
results = client.hybrid_search("collection", requests, RRFRanker(), limit=10)

Utility Functions

from pymilvus import utility
from pymilvus import create_user, delete_user, list_collections
from pymilvus import mkts_from_datetime, hybridts_to_datetime

# Direct utility access
utility.has_collection("my_collection")
mkts_from_datetime(datetime.now())

Basic Usage

Simple Collection Creation and Search

from pymilvus import MilvusClient

# Initialize client
client = MilvusClient(uri="http://localhost:19530")

# Create collection with simple parameters
client.create_collection(
    collection_name="quick_setup", 
    dimension=128,
    metric_type="COSINE"
)

# Insert data
data = [
    {"id": i, "vector": [0.1] * 128, "text": f"Document {i}"} 
    for i in range(1000)
]
client.insert("quick_setup", data)

# Search
results = client.search(
    collection_name="quick_setup",
    data=[[0.1] * 128],  # Query vector
    limit=5,
    output_fields=["text"]
)

Advanced Schema with Functions

from pymilvus import Collection, CollectionSchema, FieldSchema, DataType, Function, FunctionType

# Define schema with BM25 function
fields = [
    FieldSchema("id", DataType.INT64, is_primary=True),
    FieldSchema("text", DataType.VARCHAR, max_length=1000),
    FieldSchema("dense_vector", DataType.FLOAT_VECTOR, dim=128),
    FieldSchema("sparse_vector", DataType.SPARSE_FLOAT_VECTOR),  # BM25 output
]

functions = [
    Function("bm25_function", FunctionType.BM25, 
            input_field_names=["text"], 
            output_field_names=["sparse_vector"])
]

schema = CollectionSchema(fields, functions=functions, description="Hybrid search collection")
collection = Collection("hybrid_collection", schema)

Architecture

PyMilvus provides two complementary API approaches:

1. MilvusClient - Simplified Interface

  • Purpose: Streamlined operations for common use cases
  • Best for: Quick prototyping, simple applications, beginners
  • Key features: Auto-generated schemas, simplified method signatures, built-in defaults
# Automatic schema creation
client.create_collection("simple", dimension=128)

# Direct operations
client.insert("simple", [{"id": 1, "vector": [0.1] * 128}])
results = client.search("simple", [[0.1] * 128], limit=5)

2. ORM Classes - Advanced Interface

  • Purpose: Full control over collection lifecycle and configuration
  • Best for: Production applications, complex schemas, fine-tuned operations
  • Key features: Explicit schema definition, advanced indexing, partition management
# Explicit schema control
schema = CollectionSchema([
    FieldSchema("id", DataType.INT64, is_primary=True, auto_id=False),
    FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128),
], enable_dynamic_field=True)

collection = Collection("advanced", schema)
collection.create_index("vector", {"index_type": "IVF_FLAT", "nlist": 1024})

Both interfaces can be used together and share the same underlying connection management.

Capabilities

Vector Operations

Comprehensive vector database operations with multiple data types and search capabilities.

# Multi-vector hybrid search
from pymilvus import MilvusClient, AnnSearchRequest, RRFRanker

client = MilvusClient()

# Define multiple search requests
req1 = AnnSearchRequest(data=dense_vectors, anns_field="dense_vec", 
                       param={"metric_type": "L2"}, limit=100)
req2 = AnnSearchRequest(data=sparse_vectors, anns_field="sparse_vec",
                       param={"metric_type": "IP"}, limit=100)

# Hybrid search with RRF reranking
results = client.hybrid_search(
    collection_name="multi_vector_collection",
    reqs=[req1, req2],
    ranker=RRFRanker(k=60),
    limit=10,
    output_fields=["title", "content"]
)

→ See Search Operations for complete search capabilities

Data Management

Efficient data insertion, updates, and deletion with batch operations and iterators.

# Batch operations with upsert
from pymilvus import MilvusClient

client = MilvusClient()

# Upsert data (insert or update)
data = [
    {"id": 1, "vector": [0.1] * 128, "metadata": {"category": "A"}},
    {"id": 2, "vector": [0.2] * 128, "metadata": {"category": "B"}},
]
result = client.upsert("my_collection", data)

# Paginated query with iterator
iterator = client.query_iterator(
    collection_name="my_collection",
    expr="metadata['category'] == 'A'",
    output_fields=["id", "metadata"],
    batch_size=1000
)

for batch in iterator:
    process_batch(batch)

→ See Data Management for complete CRUD operations

Schema and Collections

Flexible schema definition with support for dynamic fields, functions, and partitioning.

# Advanced schema with clustering and partitioning
from pymilvus import CollectionSchema, FieldSchema, DataType, Function, FunctionType

schema = CollectionSchema([
    FieldSchema("id", DataType.INT64, is_primary=True),
    FieldSchema("category", DataType.VARCHAR, max_length=100, is_partition_key=True),
    FieldSchema("timestamp", DataType.INT64, is_clustering_key=True),
    FieldSchema("content", DataType.VARCHAR, max_length=2000),
    FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768),
    FieldSchema("sparse_embedding", DataType.SPARSE_FLOAT_VECTOR),
], enable_dynamic_field=True, description="Production collection with advanced features")

# Add text embedding function
functions = [
    Function("text_embed", FunctionType.TEXTEMBEDDING,
            input_field_names=["content"],
            output_field_names=["embedding"],
            params={"model_name": "sentence-transformers/all-MiniLM-L6-v2"})
]

schema.functions = functions

→ See ORM Collection for complete schema management

Index Management

Advanced indexing strategies for optimal search performance across different vector types.

# Multi-index creation with performance tuning
from pymilvus import Collection

collection = Collection("optimized_collection")

# Vector index with custom parameters
collection.create_index(
    field_name="dense_vector",
    index_params={
        "index_type": "IVF_PQ",
        "metric_type": "L2", 
        "params": {
            "nlist": 2048,
            "m": 16,
            "nbits": 8
        }
    }
)

# Scalar index for filtering
collection.create_index(
    field_name="category",
    index_params={"index_type": "TRIE"}
)

# Load collection with custom replica and resource group
collection.load(replica_number=2, _resource_groups=["rg1", "rg2"])

→ See Index Management for complete indexing strategies

User Management

Comprehensive authentication, authorization, and resource management.

# Role-based access control
from pymilvus import MilvusClient

client = MilvusClient()

# Create role with specific privileges
client.create_role("data_analyst")
client.grant_privilege(
    role_name="data_analyst",
    object_type="Collection",
    privilege="Search",
    object_name="public_data"
)

# Create user and assign role  
client.create_user("analyst1", "secure_password")
client.grant_role("analyst1", "data_analyst")

# Privilege group management
client.create_privilege_group("read_only_group")
client.add_privileges_to_group("read_only_group", ["Query", "Search"])

→ See User Management for complete access control

Utility Functions

Helper functions for timestamps, progress monitoring, and maintenance operations.

# Timestamp utilities and progress monitoring
from pymilvus import utility, mkts_from_datetime, hybridts_to_datetime
from datetime import datetime

# Create travel timestamp for point-in-time queries
travel_time = mkts_from_datetime(datetime(2024, 1, 1, 12, 0, 0))

# Monitor operations
progress = utility.loading_progress("my_collection")
print(f"Loading progress: {progress['progress']}%")

# Wait for operations to complete
utility.wait_for_loading_complete("my_collection", timeout=300)

# Resource group management
utility.create_resource_group("gpu_group", config={"requests": {"node_num": 2}})
utility.transfer_node("cpu_group", "gpu_group", 1)

→ See Utility Functions for complete utility reference

Async Operations

Non-blocking operations for high-concurrency applications with full async/await support.

# Concurrent operations with AsyncMilvusClient
from pymilvus import AsyncMilvusClient
import asyncio

async def concurrent_searches():
    client = AsyncMilvusClient()
    
    # Concurrent search operations
    tasks = []
    for i in range(10):
        task = client.search(
            collection_name="large_collection",
            data=[[0.1] * 128],
            limit=100,
            output_fields=["metadata"]
        )
        tasks.append(task)
    
    # Wait for all searches to complete
    results = await asyncio.gather(*tasks)
    await client.close()
    return results

# Run concurrent operations
results = asyncio.run(concurrent_searches())

→ See MilvusClient for complete async capabilities

Types and Enums

Comprehensive type system with enums for data types, index types, and configuration options.

# Type system and enums
from pymilvus import DataType, IndexType, FunctionType, ConsistencyLevel

# Vector data types
vector_types = [
    DataType.FLOAT_VECTOR,      # Standard dense vectors
    DataType.BINARY_VECTOR,     # Binary vectors for efficiency  
    DataType.FLOAT16_VECTOR,    # Half-precision vectors
    DataType.BFLOAT16_VECTOR,   # BFloat16 vectors
    DataType.SPARSE_FLOAT_VECTOR # Sparse vectors for text search
]

# Index algorithms
index_types = [
    IndexType.FLAT,       # Exact search
    IndexType.IVF_FLAT,   # Inverted file
    IndexType.HNSW,       # Hierarchical navigable small world
    IndexType.IVF_PQ      # Product quantization
]

# Consistency levels
levels = [
    ConsistencyLevel.Strong,      # Strong consistency
    ConsistencyLevel.Eventually,  # Eventual consistency  
    ConsistencyLevel.Bounded,     # Bounded staleness
    ConsistencyLevel.Session      # Session consistency
]

→ See Types and Enums for complete type reference

Sub-Documentation

This documentation covers all 136+ public API components in PyMilvus, enabling comprehensive vector database operations without accessing source code.