CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Pending
Overview
Eval results
Files

milvus-client.mddocs/

MilvusClient and AsyncMilvusClient

The MilvusClient provides a simplified, high-level interface for common Milvus operations, while AsyncMilvusClient offers the same functionality with async/await support for non-blocking operations in high-concurrency applications.

MilvusClient

Constructor

from pymilvus import MilvusClient

def __init__(
    self,
    uri: str = "http://localhost:19530",
    user: str = "",
    password: str = "", 
    db_name: str = "",
    token: str = "",
    timeout: Optional[float] = None,
    **kwargs
) -> None

Parameters:

  • uri: Milvus server URI (default: "http://localhost:19530")
  • user: Username for authentication
  • password: Password for authentication
  • db_name: Database name to connect to
  • token: Authentication token (alternative to user/password)
  • timeout: Connection timeout in seconds
  • **kwargs: Additional connection parameters

Example:

# Local connection
client = MilvusClient()

# Remote connection with authentication
client = MilvusClient(
    uri="https://milvus.example.com:19530",
    user="admin",
    password="password123",
    db_name="production"
)

Collection Management

create_collection

def create_collection(
    self,
    collection_name: str,
    dimension: Optional[int] = None,
    primary_field_name: str = "id",
    id_type: str = "int",
    vector_field_name: str = "vector",
    metric_type: str = "COSINE", 
    auto_id: bool = False,
    timeout: Optional[float] = None,
    schema: Optional[CollectionSchema] = None,
    index_params: Optional[IndexParams] = None,
    **kwargs
) -> None

Parameters:

  • collection_name: Name of the collection to create
  • dimension: Vector dimension (required if schema not provided)
  • primary_field_name: Name of primary key field (default: "id")
  • id_type: Primary key type - "int" or "string" (default: "int")
  • vector_field_name: Name of vector field (default: "vector")
  • metric_type: Distance metric - "L2", "IP", "COSINE" (default: "COSINE")
  • auto_id: Enable auto-generated IDs (default: False)
  • timeout: Operation timeout in seconds
  • schema: Pre-built CollectionSchema object
  • index_params: Index parameters for automatic index creation
  • **kwargs: Additional collection properties

Examples:

# Simple collection creation
client.create_collection(
    collection_name="documents",
    dimension=768,
    metric_type="COSINE"
)

# Collection with string IDs
client.create_collection(
    collection_name="products", 
    dimension=512,
    id_type="string",
    primary_field_name="product_id",
    vector_field_name="embedding"
)

# With pre-built schema
from pymilvus import CollectionSchema, FieldSchema, DataType

schema = CollectionSchema([
    FieldSchema("id", DataType.INT64, is_primary=True),
    FieldSchema("title", DataType.VARCHAR, max_length=200),
    FieldSchema("vector", DataType.FLOAT_VECTOR, dim=768),
    FieldSchema("metadata", DataType.JSON)
], description="Document collection")

client.create_collection("advanced_docs", schema=schema)

drop_collection

def drop_collection(
    self,
    collection_name: str,
    timeout: Optional[float] = None
) -> None

describe_collection

def describe_collection(
    self,
    collection_name: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]

Returns: Dictionary containing collection metadata including schema, indexes, and properties.

has_collection

def has_collection(
    self,
    collection_name: str, 
    timeout: Optional[float] = None
) -> bool

list_collections

def list_collections(
    self,
    timeout: Optional[float] = None
) -> List[str]

Returns: List of collection names in the database.

rename_collection

def rename_collection(
    self,
    old_name: str,
    new_name: str,
    timeout: Optional[float] = None
) -> None

get_collection_stats

def get_collection_stats(
    self,
    collection_name: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]

Returns: Statistics including row count, data size, and index information.

Schema Creation Helpers

create_schema

@classmethod
def create_schema(
    cls,
    auto_id: bool = False,
    enable_dynamic_field: bool = False,
    partition_key_field: Optional[str] = None,
    clustering_key_field: Optional[str] = None,
    **kwargs
) -> CollectionSchema

create_field_schema

@classmethod
def create_field_schema(
    cls,
    field_name: str,
    datatype: DataType,
    is_primary: bool = False,
    **kwargs
) -> FieldSchema

prepare_index_params

@classmethod 
def prepare_index_params(cls) -> IndexParams

Returns: Empty IndexParams object for building index configurations.

Data Operations

insert

def insert(
    self,
    collection_name: str,
    data: Union[List[Dict], pd.DataFrame],
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> Dict[str, Any]

Parameters:

  • collection_name: Target collection name
  • data: Data to insert as list of dictionaries or pandas DataFrame
  • partition_name: Target partition (optional)
  • timeout: Operation timeout
  • **kwargs: Additional insertion parameters

Returns: Dictionary with insert_count and primary_keys (if not auto_id).

Examples:

# Insert list of dictionaries
data = [
    {"id": 1, "vector": [0.1] * 768, "title": "Document 1"},
    {"id": 2, "vector": [0.2] * 768, "title": "Document 2"}
]
result = client.insert("documents", data)

# Insert pandas DataFrame
import pandas as pd
df = pd.DataFrame({
    "id": [1, 2, 3],
    "vector": [[0.1]*768, [0.2]*768, [0.3]*768],
    "category": ["A", "B", "A"]
})
result = client.insert("products", df)

upsert

def upsert(
    self,
    collection_name: str,
    data: Union[List[Dict], pd.DataFrame],
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs  
) -> Dict[str, Any]

Note: Upsert will insert new entities or update existing ones based on primary key.

delete

def delete(
    self,
    collection_name: str,
    pks: Optional[Union[List, str, int]] = None,
    filter: Optional[str] = None,
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> Dict[str, Any]

Parameters:

  • pks: Primary key values to delete (mutually exclusive with filter)
  • filter: Boolean expression for filtering entities to delete
  • partition_name: Target partition
  • timeout: Operation timeout

Examples:

# Delete by primary keys
client.delete("documents", pks=[1, 2, 3])

# Delete by filter expression
client.delete("products", filter="category == 'discontinued'")

# Delete from specific partition
client.delete("logs", filter="timestamp < 1640995200", partition_name="old_data")

get

def get(
    self,
    collection_name: str,
    ids: Union[List, str, int],
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    timeout: Optional[float] = None
) -> List[Dict[str, Any]]

Parameters:

  • ids: Primary key values to retrieve
  • output_fields: Fields to return (default: all fields)
  • partition_names: Partitions to search in
  • timeout: Operation timeout

Returns: List of entity dictionaries.

Query Operations

query

def query(
    self,
    collection_name: str,
    filter: str,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    limit: int = 16384,
    offset: int = 0,
    timeout: Optional[float] = None,
    consistency_level: Optional[str] = None,
    **kwargs
) -> List[Dict[str, Any]]

Parameters:

  • filter: Boolean expression for filtering
  • output_fields: Fields to return
  • partition_names: Target partitions
  • limit: Maximum number of results
  • offset: Number of results to skip
  • consistency_level: "Strong", "Eventually", "Bounded", or "Session"

Examples:

# Basic query
results = client.query(
    "products",
    filter="price > 100 and category == 'electronics'",
    output_fields=["id", "name", "price"],
    limit=50
)

# Query with pagination
results = client.query(
    "documents", 
    filter="status == 'published'",
    output_fields=["id", "title", "content"],
    offset=100,
    limit=20
)

query_iterator

def query_iterator(
    self,
    collection_name: str,
    filter: str,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    batch_size: int = 1000,
    limit: Optional[int] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> QueryIterator

Parameters:

  • batch_size: Number of results per batch
  • limit: Total maximum results across all batches

Returns: Iterator that yields batches of results.

Example:

# Process large result set in batches
iterator = client.query_iterator(
    "large_collection",
    filter="category == 'active'",
    output_fields=["id", "data"],
    batch_size=1000
)

for batch in iterator:
    process_batch(batch)
    print(f"Processed {len(batch)} records")

Search Operations

search

def search(
    self,
    collection_name: str,
    data: Union[List[List[float]], List[Dict]],
    anns_field: str = "vector",
    search_params: Optional[Dict] = None,
    limit: int = 10,
    expr: Optional[str] = None,
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    round_decimal: int = -1,
    timeout: Optional[float] = None,
    consistency_level: Optional[str] = None,
    **kwargs
) -> List[List[Dict[str, Any]]]

Parameters:

  • data: Query vectors as list of lists or list of dictionaries with vector field
  • anns_field: Name of vector field to search
  • search_params: Search algorithm parameters (e.g., {"nprobe": 10})
  • limit: Maximum results per query
  • expr: Filter expression
  • output_fields: Fields to return in results
  • round_decimal: Decimal precision for distances (-1 for no rounding)

Returns: List of result lists (one per query vector).

Examples:

# Single vector search
query_vector = [0.1] * 768
results = client.search(
    "documents",
    data=[query_vector],
    limit=5,
    output_fields=["id", "title", "content"],
    expr="category == 'news'"
)

# Multiple vector search
query_vectors = [[0.1] * 768, [0.2] * 768]
results = client.search(
    "embeddings", 
    data=query_vectors,
    search_params={"nprobe": 16},
    limit=10,
    round_decimal=4
)

search_iterator

def search_iterator(
    self,
    collection_name: str,
    data: Union[List[List[float]], List[Dict]],
    anns_field: str = "vector",
    batch_size: int = 1000,
    limit: Optional[int] = None,
    search_params: Optional[Dict] = None,
    expr: Optional[str] = None,
    output_fields: Optional[List[str]] = None,
    **kwargs
) -> SearchIterator

hybrid_search

def hybrid_search(
    self,
    collection_name: str,
    reqs: List[AnnSearchRequest],
    ranker: Union[RRFRanker, WeightedRanker],
    limit: int = 10,
    partition_names: Optional[List[str]] = None,
    output_fields: Optional[List[str]] = None,
    timeout: Optional[float] = None,
    round_decimal: int = -1,
    **kwargs
) -> List[List[Dict[str, Any]]]

Parameters:

  • reqs: List of AnnSearchRequest objects for different vector fields
  • ranker: Ranking algorithm (RRFRanker or WeightedRanker)
  • limit: Final result count after reranking

Example:

from pymilvus import AnnSearchRequest, RRFRanker

# Multiple vector search requests
req1 = AnnSearchRequest(
    data=dense_vectors, 
    anns_field="dense_vector",
    param={"metric_type": "L2", "params": {"nprobe": 16}},
    limit=100
)

req2 = AnnSearchRequest(
    data=sparse_vectors,
    anns_field="sparse_vector", 
    param={"metric_type": "IP"},
    limit=100
)

# Hybrid search with RRF ranking
results = client.hybrid_search(
    "multi_vector_collection",
    reqs=[req1, req2],
    ranker=RRFRanker(k=60),
    limit=10,
    output_fields=["id", "title", "content"]
)

Index Management

create_index

def create_index(
    self,
    collection_name: str,
    field_name: str,
    index_params: Dict[str, Any],
    timeout: Optional[float] = None,
    **kwargs
) -> None

Parameters:

  • field_name: Field to create index on
  • index_params: Index configuration dictionary

Examples:

# Vector index
client.create_index(
    "documents", 
    "vector",
    {
        "index_type": "IVF_FLAT",
        "metric_type": "L2", 
        "params": {"nlist": 1024}
    }
)

# Scalar index
client.create_index(
    "products",
    "category", 
    {"index_type": "TRIE"}
)

drop_index

def drop_index(
    self,
    collection_name: str,
    field_name: str,
    timeout: Optional[float] = None
) -> None

list_indexes

def list_indexes(
    self,
    collection_name: str,
    field_name: Optional[str] = None,
    timeout: Optional[float] = None
) -> List[str]

describe_index

def describe_index(
    self,
    collection_name: str,
    field_name: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]

Loading and Memory Management

load_collection

def load_collection(
    self,
    collection_name: str,
    timeout: Optional[float] = None,
    replica_number: int = 1,
    resource_groups: Optional[List[str]] = None,
    **kwargs
) -> None

release_collection

def release_collection(
    self,
    collection_name: str,
    timeout: Optional[float] = None
) -> None

get_load_state

def get_load_state(
    self,
    collection_name: str,
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None
) -> Dict[str, Any]

Returns: Dictionary with state ("NotExist", "NotLoad", "Loading", "Loaded") and progress information.

refresh_load

def refresh_load(
    self,
    collection_name: str,
    timeout: Optional[float] = None
) -> None

AsyncMilvusClient

The AsyncMilvusClient provides identical functionality to MilvusClient but with async/await support for non-blocking operations.

Usage Pattern

from pymilvus import AsyncMilvusClient
import asyncio

async def async_operations():
    # Initialize async client
    client = AsyncMilvusClient(uri="http://localhost:19530")
    
    try:
        # All methods are async and must be awaited
        await client.create_collection("async_collection", dimension=768)
        
        # Concurrent operations
        tasks = [
            client.insert("async_collection", batch1),
            client.insert("async_collection", batch2),
            client.insert("async_collection", batch3)
        ]
        results = await asyncio.gather(*tasks)
        
        # Search operations
        search_results = await client.search(
            "async_collection",
            data=[[0.1] * 768],
            limit=10
        )
        
    finally:
        # Always close the client
        await client.close()

# Run async operations
asyncio.run(async_operations())

Key Differences from MilvusClient

  1. All methods are coroutines - Must be awaited
  2. Concurrent execution - Use asyncio.gather() for parallel operations
  3. Resource management - Always call await client.close()
  4. Same method signatures - Parameters and return types identical to sync version

Async Method Examples

# Async data operations
await client.insert(collection_name, data)
await client.upsert(collection_name, data)  
await client.delete(collection_name, pks=[1, 2, 3])

# Async search operations
results = await client.search(collection_name, query_vectors, limit=10)
results = await client.query(collection_name, filter="category == 'A'")

# Async collection management
await client.create_collection(name, dimension=768)
collections = await client.list_collections()
await client.load_collection(name)

Connection Management

close

def close(self) -> None

Closes the client connection and cleans up resources. For AsyncMilvusClient, this method is async:

async def close(self) -> None  # AsyncMilvusClient version

Best Practice:

# Synchronous client
try:
    client = MilvusClient()
    # ... operations ...
finally:
    client.close()

# Asynchronous client
try:
    client = AsyncMilvusClient()
    # ... operations ...
finally:
    await client.close()

# Or use context manager (if supported)
async with AsyncMilvusClient() as client:
    await client.search(...)

Error Handling

Both MilvusClient and AsyncMilvusClient raise the same exception types. Common exceptions include:

from pymilvus import MilvusException, MilvusUnavailableException

try:
    client = MilvusClient(uri="invalid://host:port")
    client.search("nonexistent", [[0.1] * 768])
except MilvusUnavailableException:
    print("Milvus server unavailable")
except MilvusException as e:
    print(f"Milvus error: {e.code} - {e.message}")
except Exception as e:
    print(f"General error: {e}")

The MilvusClient interface provides a streamlined way to interact with Milvus, abstracting away many of the complexities while still providing access to advanced features when needed.

Install with Tessl CLI

npx tessl i tessl/pypi-pymilvus

docs

data-management.md

index-management.md

index.md

milvus-client.md

orm-collection.md

search-operations.md

types-enums.md

user-management.md

utility-functions.md

tile.json