Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
The MilvusClient provides a simplified, high-level interface for common Milvus operations, while AsyncMilvusClient offers the same functionality with async/await support for non-blocking operations in high-concurrency applications.
from pymilvus import MilvusClient
def __init__(
self,
uri: str = "http://localhost:19530",
user: str = "",
password: str = "",
db_name: str = "",
token: str = "",
timeout: Optional[float] = None,
**kwargs
) -> NoneParameters:
uri: Milvus server URI (default: "http://localhost:19530")user: Username for authenticationpassword: Password for authenticationdb_name: Database name to connect totoken: Authentication token (alternative to user/password)timeout: Connection timeout in seconds**kwargs: Additional connection parametersExample:
# Local connection
client = MilvusClient()
# Remote connection with authentication
client = MilvusClient(
uri="https://milvus.example.com:19530",
user="admin",
password="password123",
db_name="production"
)def create_collection(
self,
collection_name: str,
dimension: Optional[int] = None,
primary_field_name: str = "id",
id_type: str = "int",
vector_field_name: str = "vector",
metric_type: str = "COSINE",
auto_id: bool = False,
timeout: Optional[float] = None,
schema: Optional[CollectionSchema] = None,
index_params: Optional[IndexParams] = None,
**kwargs
) -> NoneParameters:
collection_name: Name of the collection to createdimension: Vector dimension (required if schema not provided)primary_field_name: Name of primary key field (default: "id")id_type: Primary key type - "int" or "string" (default: "int")vector_field_name: Name of vector field (default: "vector")metric_type: Distance metric - "L2", "IP", "COSINE" (default: "COSINE")auto_id: Enable auto-generated IDs (default: False)timeout: Operation timeout in secondsschema: Pre-built CollectionSchema objectindex_params: Index parameters for automatic index creation**kwargs: Additional collection propertiesExamples:
# Simple collection creation
client.create_collection(
collection_name="documents",
dimension=768,
metric_type="COSINE"
)
# Collection with string IDs
client.create_collection(
collection_name="products",
dimension=512,
id_type="string",
primary_field_name="product_id",
vector_field_name="embedding"
)
# With pre-built schema
from pymilvus import CollectionSchema, FieldSchema, DataType
schema = CollectionSchema([
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("title", DataType.VARCHAR, max_length=200),
FieldSchema("vector", DataType.FLOAT_VECTOR, dim=768),
FieldSchema("metadata", DataType.JSON)
], description="Document collection")
client.create_collection("advanced_docs", schema=schema)def drop_collection(
self,
collection_name: str,
timeout: Optional[float] = None
) -> Nonedef describe_collection(
self,
collection_name: str,
timeout: Optional[float] = None
) -> Dict[str, Any]Returns: Dictionary containing collection metadata including schema, indexes, and properties.
def has_collection(
self,
collection_name: str,
timeout: Optional[float] = None
) -> booldef list_collections(
self,
timeout: Optional[float] = None
) -> List[str]Returns: List of collection names in the database.
def rename_collection(
self,
old_name: str,
new_name: str,
timeout: Optional[float] = None
) -> Nonedef get_collection_stats(
self,
collection_name: str,
timeout: Optional[float] = None
) -> Dict[str, Any]Returns: Statistics including row count, data size, and index information.
@classmethod
def create_schema(
cls,
auto_id: bool = False,
enable_dynamic_field: bool = False,
partition_key_field: Optional[str] = None,
clustering_key_field: Optional[str] = None,
**kwargs
) -> CollectionSchema@classmethod
def create_field_schema(
cls,
field_name: str,
datatype: DataType,
is_primary: bool = False,
**kwargs
) -> FieldSchema@classmethod
def prepare_index_params(cls) -> IndexParamsReturns: Empty IndexParams object for building index configurations.
def insert(
self,
collection_name: str,
data: Union[List[Dict], pd.DataFrame],
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> Dict[str, Any]Parameters:
collection_name: Target collection namedata: Data to insert as list of dictionaries or pandas DataFramepartition_name: Target partition (optional)timeout: Operation timeout**kwargs: Additional insertion parametersReturns: Dictionary with insert_count and primary_keys (if not auto_id).
Examples:
# Insert list of dictionaries
data = [
{"id": 1, "vector": [0.1] * 768, "title": "Document 1"},
{"id": 2, "vector": [0.2] * 768, "title": "Document 2"}
]
result = client.insert("documents", data)
# Insert pandas DataFrame
import pandas as pd
df = pd.DataFrame({
"id": [1, 2, 3],
"vector": [[0.1]*768, [0.2]*768, [0.3]*768],
"category": ["A", "B", "A"]
})
result = client.insert("products", df)def upsert(
self,
collection_name: str,
data: Union[List[Dict], pd.DataFrame],
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> Dict[str, Any]Note: Upsert will insert new entities or update existing ones based on primary key.
def delete(
self,
collection_name: str,
pks: Optional[Union[List, str, int]] = None,
filter: Optional[str] = None,
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> Dict[str, Any]Parameters:
pks: Primary key values to delete (mutually exclusive with filter)filter: Boolean expression for filtering entities to deletepartition_name: Target partitiontimeout: Operation timeoutExamples:
# Delete by primary keys
client.delete("documents", pks=[1, 2, 3])
# Delete by filter expression
client.delete("products", filter="category == 'discontinued'")
# Delete from specific partition
client.delete("logs", filter="timestamp < 1640995200", partition_name="old_data")def get(
self,
collection_name: str,
ids: Union[List, str, int],
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> List[Dict[str, Any]]Parameters:
ids: Primary key values to retrieveoutput_fields: Fields to return (default: all fields)partition_names: Partitions to search intimeout: Operation timeoutReturns: List of entity dictionaries.
def query(
self,
collection_name: str,
filter: str,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
limit: int = 16384,
offset: int = 0,
timeout: Optional[float] = None,
consistency_level: Optional[str] = None,
**kwargs
) -> List[Dict[str, Any]]Parameters:
filter: Boolean expression for filteringoutput_fields: Fields to returnpartition_names: Target partitionslimit: Maximum number of resultsoffset: Number of results to skipconsistency_level: "Strong", "Eventually", "Bounded", or "Session"Examples:
# Basic query
results = client.query(
"products",
filter="price > 100 and category == 'electronics'",
output_fields=["id", "name", "price"],
limit=50
)
# Query with pagination
results = client.query(
"documents",
filter="status == 'published'",
output_fields=["id", "title", "content"],
offset=100,
limit=20
)def query_iterator(
self,
collection_name: str,
filter: str,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
batch_size: int = 1000,
limit: Optional[int] = None,
timeout: Optional[float] = None,
**kwargs
) -> QueryIteratorParameters:
batch_size: Number of results per batchlimit: Total maximum results across all batchesReturns: Iterator that yields batches of results.
Example:
# Process large result set in batches
iterator = client.query_iterator(
"large_collection",
filter="category == 'active'",
output_fields=["id", "data"],
batch_size=1000
)
for batch in iterator:
process_batch(batch)
print(f"Processed {len(batch)} records")def search(
self,
collection_name: str,
data: Union[List[List[float]], List[Dict]],
anns_field: str = "vector",
search_params: Optional[Dict] = None,
limit: int = 10,
expr: Optional[str] = None,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
round_decimal: int = -1,
timeout: Optional[float] = None,
consistency_level: Optional[str] = None,
**kwargs
) -> List[List[Dict[str, Any]]]Parameters:
data: Query vectors as list of lists or list of dictionaries with vector fieldanns_field: Name of vector field to searchsearch_params: Search algorithm parameters (e.g., {"nprobe": 10})limit: Maximum results per queryexpr: Filter expressionoutput_fields: Fields to return in resultsround_decimal: Decimal precision for distances (-1 for no rounding)Returns: List of result lists (one per query vector).
Examples:
# Single vector search
query_vector = [0.1] * 768
results = client.search(
"documents",
data=[query_vector],
limit=5,
output_fields=["id", "title", "content"],
expr="category == 'news'"
)
# Multiple vector search
query_vectors = [[0.1] * 768, [0.2] * 768]
results = client.search(
"embeddings",
data=query_vectors,
search_params={"nprobe": 16},
limit=10,
round_decimal=4
)def search_iterator(
self,
collection_name: str,
data: Union[List[List[float]], List[Dict]],
anns_field: str = "vector",
batch_size: int = 1000,
limit: Optional[int] = None,
search_params: Optional[Dict] = None,
expr: Optional[str] = None,
output_fields: Optional[List[str]] = None,
**kwargs
) -> SearchIteratordef hybrid_search(
self,
collection_name: str,
reqs: List[AnnSearchRequest],
ranker: Union[RRFRanker, WeightedRanker],
limit: int = 10,
partition_names: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
timeout: Optional[float] = None,
round_decimal: int = -1,
**kwargs
) -> List[List[Dict[str, Any]]]Parameters:
reqs: List of AnnSearchRequest objects for different vector fieldsranker: Ranking algorithm (RRFRanker or WeightedRanker)limit: Final result count after rerankingExample:
from pymilvus import AnnSearchRequest, RRFRanker
# Multiple vector search requests
req1 = AnnSearchRequest(
data=dense_vectors,
anns_field="dense_vector",
param={"metric_type": "L2", "params": {"nprobe": 16}},
limit=100
)
req2 = AnnSearchRequest(
data=sparse_vectors,
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=100
)
# Hybrid search with RRF ranking
results = client.hybrid_search(
"multi_vector_collection",
reqs=[req1, req2],
ranker=RRFRanker(k=60),
limit=10,
output_fields=["id", "title", "content"]
)def create_index(
self,
collection_name: str,
field_name: str,
index_params: Dict[str, Any],
timeout: Optional[float] = None,
**kwargs
) -> NoneParameters:
field_name: Field to create index onindex_params: Index configuration dictionaryExamples:
# Vector index
client.create_index(
"documents",
"vector",
{
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 1024}
}
)
# Scalar index
client.create_index(
"products",
"category",
{"index_type": "TRIE"}
)def drop_index(
self,
collection_name: str,
field_name: str,
timeout: Optional[float] = None
) -> Nonedef list_indexes(
self,
collection_name: str,
field_name: Optional[str] = None,
timeout: Optional[float] = None
) -> List[str]def describe_index(
self,
collection_name: str,
field_name: str,
timeout: Optional[float] = None
) -> Dict[str, Any]def load_collection(
self,
collection_name: str,
timeout: Optional[float] = None,
replica_number: int = 1,
resource_groups: Optional[List[str]] = None,
**kwargs
) -> Nonedef release_collection(
self,
collection_name: str,
timeout: Optional[float] = None
) -> Nonedef get_load_state(
self,
collection_name: str,
partition_name: Optional[str] = None,
timeout: Optional[float] = None
) -> Dict[str, Any]Returns: Dictionary with state ("NotExist", "NotLoad", "Loading", "Loaded") and progress information.
def refresh_load(
self,
collection_name: str,
timeout: Optional[float] = None
) -> NoneThe AsyncMilvusClient provides identical functionality to MilvusClient but with async/await support for non-blocking operations.
from pymilvus import AsyncMilvusClient
import asyncio
async def async_operations():
# Initialize async client
client = AsyncMilvusClient(uri="http://localhost:19530")
try:
# All methods are async and must be awaited
await client.create_collection("async_collection", dimension=768)
# Concurrent operations
tasks = [
client.insert("async_collection", batch1),
client.insert("async_collection", batch2),
client.insert("async_collection", batch3)
]
results = await asyncio.gather(*tasks)
# Search operations
search_results = await client.search(
"async_collection",
data=[[0.1] * 768],
limit=10
)
finally:
# Always close the client
await client.close()
# Run async operations
asyncio.run(async_operations())asyncio.gather() for parallel operationsawait client.close()# Async data operations
await client.insert(collection_name, data)
await client.upsert(collection_name, data)
await client.delete(collection_name, pks=[1, 2, 3])
# Async search operations
results = await client.search(collection_name, query_vectors, limit=10)
results = await client.query(collection_name, filter="category == 'A'")
# Async collection management
await client.create_collection(name, dimension=768)
collections = await client.list_collections()
await client.load_collection(name)def close(self) -> NoneCloses the client connection and cleans up resources. For AsyncMilvusClient, this method is async:
async def close(self) -> None # AsyncMilvusClient versionBest Practice:
# Synchronous client
try:
client = MilvusClient()
# ... operations ...
finally:
client.close()
# Asynchronous client
try:
client = AsyncMilvusClient()
# ... operations ...
finally:
await client.close()
# Or use context manager (if supported)
async with AsyncMilvusClient() as client:
await client.search(...)Both MilvusClient and AsyncMilvusClient raise the same exception types. Common exceptions include:
from pymilvus import MilvusException, MilvusUnavailableException
try:
client = MilvusClient(uri="invalid://host:port")
client.search("nonexistent", [[0.1] * 768])
except MilvusUnavailableException:
print("Milvus server unavailable")
except MilvusException as e:
print(f"Milvus error: {e.code} - {e.message}")
except Exception as e:
print(f"General error: {e}")The MilvusClient interface provides a streamlined way to interact with Milvus, abstracting away many of the complexities while still providing access to advanced features when needed.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus