Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
PyMilvus provides comprehensive data management capabilities including insertion, updates, deletion, and retrieval operations. This covers batch operations, data validation, transaction handling, and efficient iteration over large datasets.
from pymilvus import MilvusClient
client = MilvusClient()
def insert(
collection_name: str,
data: Union[List[Dict], pd.DataFrame],
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> Dict[str, Any]Parameters:
data: Data as list of dictionaries or pandas DataFramepartition_name: Target partition (optional)timeout: Operation timeout in seconds**kwargs: Additional insertion parametersReturns: Dictionary with insert_count and primary_keys (if not auto_id)
# Insert list of dictionaries
data = [
{
"id": 1,
"vector": [0.1, 0.2, 0.3] * 256,
"title": "First Document",
"metadata": {"category": "tech", "year": 2024}
},
{
"id": 2,
"vector": [0.4, 0.5, 0.6] * 256,
"title": "Second Document",
"metadata": {"category": "science", "year": 2024}
}
]
result = client.insert("documents", data)
print(f"Inserted {result['insert_count']} entities")
if 'primary_keys' in result:
print(f"Primary keys: {result['primary_keys']}")
# Insert pandas DataFrame
import pandas as pd
import numpy as np
df = pd.DataFrame({
"id": range(1000),
"vector": [np.random.rand(768).tolist() for _ in range(1000)],
"category": np.random.choice(["A", "B", "C"], 1000),
"score": np.random.rand(1000),
"active": np.random.choice([True, False], 1000)
})
result = client.insert("products", df)
print(f"Inserted {result['insert_count']} products from DataFrame")
# Insert into specific partition
seasonal_data = [
{"id": i, "vector": [0.1] * 128, "season": "winter"}
for i in range(100, 200)
]
client.insert(
"seasonal_collection",
seasonal_data,
partition_name="winter_2024"
)# For collections with auto_id=True, omit primary key field
auto_id_data = [
{
"vector": [0.1, 0.2] * 384,
"content": "Auto-generated ID document",
"tags": ["auto", "generated"]
},
{
"vector": [0.3, 0.4] * 384,
"content": "Another auto-ID document",
"tags": ["automatic", "id"]
}
]
result = client.insert("auto_id_collection", auto_id_data)
print(f"Generated primary keys: {result['primary_keys']}")def batch_insert_large_dataset(collection_name: str, data_generator, batch_size: int = 1000):
"""Insert large dataset in batches to manage memory"""
total_inserted = 0
batch = []
for record in data_generator():
batch.append(record)
if len(batch) >= batch_size:
result = client.insert(collection_name, batch)
total_inserted += result['insert_count']
print(f"Inserted batch: {result['insert_count']}, Total: {total_inserted}")
batch = [] # Clear batch
# Insert remaining records
if batch:
result = client.insert(collection_name, batch)
total_inserted += result['insert_count']
print(f"Final batch: {result['insert_count']}, Total: {total_inserted}")
return total_inserted
# Example generator for large dataset
def generate_documents(count: int):
"""Generator for memory-efficient data creation"""
for i in range(count):
yield {
"id": i,
"vector": np.random.rand(768).tolist(),
"title": f"Document {i}",
"content": f"Content for document {i}" * 50, # Larger text content
"metadata": {
"created_at": int(time.time()) - random.randint(0, 86400),
"category": random.choice(["tech", "science", "arts", "sports"]),
"priority": random.randint(1, 10)
}
}
# Use batch insertion
total = batch_insert_large_dataset("large_collection", lambda: generate_documents(100000), batch_size=2000)
print(f"Successfully inserted {total} documents")def upsert(
collection_name: str,
data: Union[List[Dict], pd.DataFrame],
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> Dict[str, Any]Upsert performs insert-or-update based on primary key matching. If primary key exists, the entity is updated; otherwise, it's inserted.
# Initial data
initial_data = [
{"id": 1, "vector": [0.1] * 768, "title": "Original Title", "status": "draft"},
{"id": 2, "vector": [0.2] * 768, "title": "Another Document", "status": "draft"}
]
client.insert("documents", initial_data)
# Upsert: update existing and insert new
upsert_data = [
{"id": 1, "vector": [0.15] * 768, "title": "Updated Title", "status": "published"}, # Update
{"id": 3, "vector": [0.3] * 768, "title": "New Document", "status": "draft"} # Insert
]
result = client.upsert("documents", upsert_data)
print(f"Upsert count: {result.get('upsert_count', 0)}")
# Verify changes
updated = client.query("documents", "id in [1, 3]", output_fields=["id", "title", "status"])
for doc in updated:
print(f"ID {doc['id']}: {doc['title']} - {doc['status']}")def conditional_upsert(collection_name: str, updates: List[Dict], condition_field: str):
"""Upsert only if condition is met"""
# Get existing entities
primary_keys = [update['id'] for update in updates]
existing = client.get(
collection_name,
ids=primary_keys,
output_fields=[condition_field, "id"]
)
# Create mapping of existing entities
existing_map = {entity['id']: entity for entity in existing}
# Filter updates based on conditions
valid_updates = []
for update in updates:
entity_id = update['id']
if entity_id in existing_map:
# Apply update condition (example: only update if timestamp is newer)
existing_timestamp = existing_map[entity_id].get('timestamp', 0)
new_timestamp = update.get('timestamp', 0)
if new_timestamp > existing_timestamp:
valid_updates.append(update)
else:
# New entity, always include
valid_updates.append(update)
if valid_updates:
return client.upsert(collection_name, valid_updates)
return {"upsert_count": 0}def delete(
collection_name: str,
pks: Optional[Union[List, str, int]] = None,
filter: Optional[str] = None,
partition_name: Optional[str] = None,
timeout: Optional[float] = None,
**kwargs
) -> Dict[str, Any]Parameters:
pks: Primary key values (mutually exclusive with filter)filter: Boolean expression (mutually exclusive with pks)partition_name: Target partitiontimeout: Operation timeout# Delete by primary keys
result = client.delete("documents", pks=[1, 2, 3])
print(f"Deleted {result.get('delete_count', 0)} entities")
# Delete single entity
client.delete("products", pks=12345)
# Delete by string primary keys
client.delete("users", pks=["user_001", "user_002", "user_003"])
# Delete from specific partition
client.delete("logs", pks=[100, 101, 102], partition_name="old_logs")# Delete by filter conditions
result = client.delete("products", filter="category == 'discontinued'")
print(f"Deleted {result['delete_count']} discontinued products")
# Delete old records
client.delete("events", filter="timestamp < 1640995200") # Before 2022-01-01
# Delete with complex conditions
client.delete(
"documents",
filter="status == 'draft' and created_at < 1577836800 and author == 'system'"
)
# Delete from specific partitions with conditions
client.delete(
"user_activity",
filter="action_type == 'login' and success == false",
partition_name="failed_attempts"
)def safe_batch_delete(collection_name: str, delete_condition: str, batch_size: int = 1000):
"""Safely delete large numbers of entities in batches"""
total_deleted = 0
while True:
# Query entities matching delete condition
to_delete = client.query(
collection_name,
filter=delete_condition,
output_fields=["id"], # Only need primary keys
limit=batch_size
)
if not to_delete:
break # No more entities to delete
# Extract primary keys
pks = [entity['id'] for entity in to_delete]
# Delete batch
result = client.delete(collection_name, pks=pks)
deleted_count = result.get('delete_count', 0)
total_deleted += deleted_count
print(f"Deleted batch of {deleted_count} entities, total: {total_deleted}")
# If we deleted fewer than batch_size, we're done
if deleted_count < batch_size:
break
return total_deleted
# Example: Delete old inactive users
deleted_count = safe_batch_delete(
"users",
"last_login < 1609459200 and status == 'inactive'", # Before 2021-01-01
batch_size=500
)
print(f"Total deleted: {deleted_count} inactive users")def get(
collection_name: str,
ids: Union[List, str, int],
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> List[Dict[str, Any]]# Get single entity
entity = client.get("documents", ids=1, output_fields=["id", "title", "content"])
if entity:
print(f"Document: {entity[0]['title']}")
# Get multiple entities
entities = client.get(
"products",
ids=[100, 101, 102],
output_fields=["id", "name", "price", "category"]
)
for product in entities:
print(f"{product['name']}: ${product['price']}")
# Get with string primary keys
user_profiles = client.get(
"users",
ids=["user_001", "user_002"],
output_fields=["user_id", "name", "email", "profile"]
)
# Get from specific partitions
recent_data = client.get(
"time_series",
ids=range(1000, 1100),
partition_names=["2024_q4"],
output_fields=["id", "timestamp", "value"]
)def safe_get_entities(collection_name: str, ids: List, output_fields: List[str]) -> List[Dict]:
"""Safely retrieve entities with error handling"""
try:
entities = client.get(
collection_name,
ids=ids,
output_fields=output_fields
)
# Check if all requested entities were found
found_ids = {entity['id'] for entity in entities}
missing_ids = set(ids) - found_ids
if missing_ids:
print(f"Warning: {len(missing_ids)} entities not found: {list(missing_ids)[:5]}...")
return entities
except Exception as e:
print(f"Error retrieving entities: {e}")
return []
# Usage
products = safe_get_entities("products", [1, 2, 999999], ["id", "name", "price"])def query_iterator(
collection_name: str,
filter: str,
output_fields: Optional[List[str]] = None,
batch_size: int = 1000,
limit: Optional[int] = None,
partition_names: Optional[List[str]] = None,
timeout: Optional[float] = None,
**kwargs
) -> QueryIterator# Process large result sets efficiently
iterator = client.query_iterator(
"large_collection",
filter="status == 'active'",
output_fields=["id", "data", "timestamp"],
batch_size=2000
)
processed_count = 0
for batch in iterator:
print(f"Processing batch of {len(batch)} records")
# Process each record in the batch
for record in batch:
# Custom processing logic
process_record(record)
processed_count += 1
# Optional: limit processing
if processed_count >= 50000:
print("Reached processing limit")
break
print(f"Total processed: {processed_count} records")def export_collection_to_csv(collection_name: str, output_file: str, filter_expr: str = "", batch_size: int = 5000):
"""Export collection data to CSV file"""
import csv
# Get collection schema to determine fields
collection_info = client.describe_collection(collection_name)
field_names = [field['name'] for field in collection_info['schema']['fields']]
# Remove vector fields for CSV export (too large)
scalar_fields = [name for name in field_names if not name.endswith('_vector')]
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=scalar_fields)
writer.writeheader()
# Use iterator for memory-efficient export
iterator = client.query_iterator(
collection_name,
filter=filter_expr or "id >= 0", # Get all if no filter
output_fields=scalar_fields,
batch_size=batch_size
)
total_exported = 0
for batch in iterator:
# Convert batch to CSV rows
for record in batch:
# Handle JSON fields by converting to string
csv_row = {}
for field in scalar_fields:
value = record.get(field)
if isinstance(value, (dict, list)):
csv_row[field] = json.dumps(value)
else:
csv_row[field] = value
writer.writerow(csv_row)
total_exported += 1
print(f"Exported {total_exported} records...")
print(f"Export completed: {total_exported} records saved to {output_file}")
# Export active products to CSV
export_collection_to_csv(
"products",
"active_products.csv",
filter_expr="status == 'active' and price > 0"
)def data_migration_pipeline(source_collection: str, target_collection: str, transform_func):
"""Migrate data between collections with transformation"""
# Process in batches
iterator = client.query_iterator(
source_collection,
filter="id >= 0", # All records
batch_size=1000
)
migrated_count = 0
errors = []
for batch in iterator:
try:
# Transform batch data
transformed_batch = []
for record in batch:
transformed = transform_func(record)
if transformed: # Skip None results
transformed_batch.append(transformed)
# Insert into target collection
if transformed_batch:
result = client.insert(target_collection, transformed_batch)
migrated_count += result['insert_count']
print(f"Migrated {result['insert_count']} records, total: {migrated_count}")
except Exception as e:
error_msg = f"Error processing batch: {e}"
errors.append(error_msg)
print(error_msg)
return {"migrated": migrated_count, "errors": errors}
# Example transformation function
def modernize_document(old_record):
"""Transform old document format to new format"""
return {
"id": old_record["id"],
"title": old_record["title"],
"content": old_record["content"],
"vector": old_record["embedding"], # Rename field
"metadata": {
"category": old_record.get("category", "general"),
"created_at": old_record.get("timestamp", 0),
"migrated_at": int(time.time())
},
"status": "migrated"
}
# Run migration
result = data_migration_pipeline("old_documents", "new_documents", modernize_document)
print(f"Migration completed: {result}")def validate_and_insert(collection_name: str, data: List[Dict], schema_info: Dict) -> Dict:
"""Validate data against collection schema before insertion"""
# Extract field information from schema
required_fields = set()
field_types = {}
vector_dims = {}
for field in schema_info['schema']['fields']:
field_name = field['name']
field_type = field['type']
if not field.get('autoID', False):
required_fields.add(field_name)
field_types[field_name] = field_type
if field_type in ['FloatVector', 'BinaryVector']:
vector_dims[field_name] = field.get('params', {}).get('dim', 0)
validated_data = []
errors = []
for i, record in enumerate(data):
record_errors = []
# Check required fields
missing_fields = required_fields - set(record.keys())
if missing_fields:
record_errors.append(f"Missing fields: {missing_fields}")
# Validate vector dimensions
for field_name, expected_dim in vector_dims.items():
if field_name in record:
vector = record[field_name]
if isinstance(vector, list) and len(vector) != expected_dim:
record_errors.append(f"{field_name} dimension mismatch: expected {expected_dim}, got {len(vector)}")
# Validate data types (basic validation)
for field_name, value in record.items():
if field_name in field_types:
field_type = field_types[field_name]
if field_type == 'VarChar' and not isinstance(value, str):
record_errors.append(f"{field_name} must be string, got {type(value)}")
elif field_type in ['Int64', 'Int32'] and not isinstance(value, int):
record_errors.append(f"{field_name} must be integer, got {type(value)}")
if record_errors:
errors.append(f"Record {i}: {'; '.join(record_errors)}")
else:
validated_data.append(record)
# Insert valid data
result = {"insert_count": 0, "errors": errors}
if validated_data:
insert_result = client.insert(collection_name, validated_data)
result["insert_count"] = insert_result["insert_count"]
return result
# Usage
collection_schema = client.describe_collection("products")
data_to_insert = [
{"id": 1, "name": "Product 1", "vector": [0.1] * 128},
{"id": 2, "vector": [0.2] * 64}, # Wrong dimension - will be flagged
]
validation_result = validate_and_insert("products", data_to_insert, collection_schema)
print(f"Inserted: {validation_result['insert_count']}")
if validation_result['errors']:
print("Validation errors:", validation_result['errors'])PyMilvus data management operations provide robust capabilities for handling large-scale vector and scalar data with efficient batch processing, validation, and error handling mechanisms.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus