CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Pending
Overview
Eval results
Files

data-management.mddocs/

Data Management

PyMilvus provides comprehensive data management capabilities including insertion, updates, deletion, and retrieval operations. This covers batch operations, data validation, transaction handling, and efficient iteration over large datasets.

Data Insertion

Basic Insert Operations

from pymilvus import MilvusClient

client = MilvusClient()

def insert(
    collection_name: str,
    data: Union[List[Dict], pd.DataFrame],
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> Dict[str, Any]

Parameters:

  • data: Data as list of dictionaries or pandas DataFrame
  • partition_name: Target partition (optional)
  • timeout: Operation timeout in seconds
  • **kwargs: Additional insertion parameters

Returns: Dictionary with insert_count and primary_keys (if not auto_id)

Insert Examples

# Insert list of dictionaries
data = [
    {
        "id": 1,
        "vector": [0.1, 0.2, 0.3] * 256,
        "title": "First Document",
        "metadata": {"category": "tech", "year": 2024}
    },
    {
        "id": 2,
        "vector": [0.4, 0.5, 0.6] * 256,
        "title": "Second Document", 
        "metadata": {"category": "science", "year": 2024}
    }
]

result = client.insert("documents", data)
print(f"Inserted {result['insert_count']} entities")
if 'primary_keys' in result:
    print(f"Primary keys: {result['primary_keys']}")

# Insert pandas DataFrame
import pandas as pd
import numpy as np

df = pd.DataFrame({
    "id": range(1000),
    "vector": [np.random.rand(768).tolist() for _ in range(1000)],
    "category": np.random.choice(["A", "B", "C"], 1000),
    "score": np.random.rand(1000),
    "active": np.random.choice([True, False], 1000)
})

result = client.insert("products", df)
print(f"Inserted {result['insert_count']} products from DataFrame")

# Insert into specific partition
seasonal_data = [
    {"id": i, "vector": [0.1] * 128, "season": "winter"}
    for i in range(100, 200)
]

client.insert(
    "seasonal_collection", 
    seasonal_data, 
    partition_name="winter_2024"
)

Auto-ID Collections

# For collections with auto_id=True, omit primary key field
auto_id_data = [
    {
        "vector": [0.1, 0.2] * 384,
        "content": "Auto-generated ID document",
        "tags": ["auto", "generated"]
    },
    {
        "vector": [0.3, 0.4] * 384,
        "content": "Another auto-ID document",
        "tags": ["automatic", "id"]
    }
]

result = client.insert("auto_id_collection", auto_id_data)
print(f"Generated primary keys: {result['primary_keys']}")

Large Batch Insertion

def batch_insert_large_dataset(collection_name: str, data_generator, batch_size: int = 1000):
    """Insert large dataset in batches to manage memory"""
    
    total_inserted = 0
    batch = []
    
    for record in data_generator():
        batch.append(record)
        
        if len(batch) >= batch_size:
            result = client.insert(collection_name, batch)
            total_inserted += result['insert_count']
            print(f"Inserted batch: {result['insert_count']}, Total: {total_inserted}")
            
            batch = []  # Clear batch
    
    # Insert remaining records
    if batch:
        result = client.insert(collection_name, batch)
        total_inserted += result['insert_count']
        print(f"Final batch: {result['insert_count']}, Total: {total_inserted}")
    
    return total_inserted

# Example generator for large dataset
def generate_documents(count: int):
    """Generator for memory-efficient data creation"""
    for i in range(count):
        yield {
            "id": i,
            "vector": np.random.rand(768).tolist(),
            "title": f"Document {i}",
            "content": f"Content for document {i}" * 50,  # Larger text content
            "metadata": {
                "created_at": int(time.time()) - random.randint(0, 86400),
                "category": random.choice(["tech", "science", "arts", "sports"]),
                "priority": random.randint(1, 10)
            }
        }

# Use batch insertion
total = batch_insert_large_dataset("large_collection", lambda: generate_documents(100000), batch_size=2000)
print(f"Successfully inserted {total} documents")

Data Updates (Upsert)

Upsert Operations

def upsert(
    collection_name: str,
    data: Union[List[Dict], pd.DataFrame],
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> Dict[str, Any]

Upsert performs insert-or-update based on primary key matching. If primary key exists, the entity is updated; otherwise, it's inserted.

# Initial data
initial_data = [
    {"id": 1, "vector": [0.1] * 768, "title": "Original Title", "status": "draft"},
    {"id": 2, "vector": [0.2] * 768, "title": "Another Document", "status": "draft"}
]
client.insert("documents", initial_data)

# Upsert: update existing and insert new
upsert_data = [
    {"id": 1, "vector": [0.15] * 768, "title": "Updated Title", "status": "published"},  # Update
    {"id": 3, "vector": [0.3] * 768, "title": "New Document", "status": "draft"}        # Insert
]

result = client.upsert("documents", upsert_data)
print(f"Upsert count: {result.get('upsert_count', 0)}")

# Verify changes
updated = client.query("documents", "id in [1, 3]", output_fields=["id", "title", "status"])
for doc in updated:
    print(f"ID {doc['id']}: {doc['title']} - {doc['status']}")

Conditional Upserts

def conditional_upsert(collection_name: str, updates: List[Dict], condition_field: str):
    """Upsert only if condition is met"""
    
    # Get existing entities
    primary_keys = [update['id'] for update in updates]
    existing = client.get(
        collection_name, 
        ids=primary_keys,
        output_fields=[condition_field, "id"]
    )
    
    # Create mapping of existing entities
    existing_map = {entity['id']: entity for entity in existing}
    
    # Filter updates based on conditions
    valid_updates = []
    for update in updates:
        entity_id = update['id']
        
        if entity_id in existing_map:
            # Apply update condition (example: only update if timestamp is newer)
            existing_timestamp = existing_map[entity_id].get('timestamp', 0)
            new_timestamp = update.get('timestamp', 0)
            
            if new_timestamp > existing_timestamp:
                valid_updates.append(update)
        else:
            # New entity, always include
            valid_updates.append(update)
    
    if valid_updates:
        return client.upsert(collection_name, valid_updates)
    
    return {"upsert_count": 0}

Data Deletion

Delete by Primary Key

def delete(
    collection_name: str,
    pks: Optional[Union[List, str, int]] = None,
    filter: Optional[str] = None,
    partition_name: Optional[str] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> Dict[str, Any]

Parameters:

  • pks: Primary key values (mutually exclusive with filter)
  • filter: Boolean expression (mutually exclusive with pks)
  • partition_name: Target partition
  • timeout: Operation timeout
# Delete by primary keys
result = client.delete("documents", pks=[1, 2, 3])
print(f"Deleted {result.get('delete_count', 0)} entities")

# Delete single entity
client.delete("products", pks=12345)

# Delete by string primary keys
client.delete("users", pks=["user_001", "user_002", "user_003"])

# Delete from specific partition
client.delete("logs", pks=[100, 101, 102], partition_name="old_logs")

Delete by Expression

# Delete by filter conditions
result = client.delete("products", filter="category == 'discontinued'")
print(f"Deleted {result['delete_count']} discontinued products")

# Delete old records
client.delete("events", filter="timestamp < 1640995200")  # Before 2022-01-01

# Delete with complex conditions
client.delete(
    "documents", 
    filter="status == 'draft' and created_at < 1577836800 and author == 'system'"
)

# Delete from specific partitions with conditions
client.delete(
    "user_activity",
    filter="action_type == 'login' and success == false",
    partition_name="failed_attempts"
)

Batch Deletion Patterns

def safe_batch_delete(collection_name: str, delete_condition: str, batch_size: int = 1000):
    """Safely delete large numbers of entities in batches"""
    
    total_deleted = 0
    
    while True:
        # Query entities matching delete condition
        to_delete = client.query(
            collection_name,
            filter=delete_condition,
            output_fields=["id"],  # Only need primary keys
            limit=batch_size
        )
        
        if not to_delete:
            break  # No more entities to delete
        
        # Extract primary keys
        pks = [entity['id'] for entity in to_delete]
        
        # Delete batch
        result = client.delete(collection_name, pks=pks)
        deleted_count = result.get('delete_count', 0)
        total_deleted += deleted_count
        
        print(f"Deleted batch of {deleted_count} entities, total: {total_deleted}")
        
        # If we deleted fewer than batch_size, we're done
        if deleted_count < batch_size:
            break
    
    return total_deleted

# Example: Delete old inactive users
deleted_count = safe_batch_delete(
    "users", 
    "last_login < 1609459200 and status == 'inactive'",  # Before 2021-01-01
    batch_size=500
)
print(f"Total deleted: {deleted_count} inactive users")

Data Retrieval

Get by Primary Key

def get(
    collection_name: str,
    ids: Union[List, str, int],
    output_fields: Optional[List[str]] = None,
    partition_names: Optional[List[str]] = None,
    timeout: Optional[float] = None
) -> List[Dict[str, Any]]
# Get single entity
entity = client.get("documents", ids=1, output_fields=["id", "title", "content"])
if entity:
    print(f"Document: {entity[0]['title']}")

# Get multiple entities
entities = client.get(
    "products",
    ids=[100, 101, 102],
    output_fields=["id", "name", "price", "category"]
)

for product in entities:
    print(f"{product['name']}: ${product['price']}")

# Get with string primary keys
user_profiles = client.get(
    "users",
    ids=["user_001", "user_002"],
    output_fields=["user_id", "name", "email", "profile"]
)

# Get from specific partitions
recent_data = client.get(
    "time_series",
    ids=range(1000, 1100),
    partition_names=["2024_q4"],
    output_fields=["id", "timestamp", "value"]
)

Error Handling for Retrieval

def safe_get_entities(collection_name: str, ids: List, output_fields: List[str]) -> List[Dict]:
    """Safely retrieve entities with error handling"""
    
    try:
        entities = client.get(
            collection_name,
            ids=ids,
            output_fields=output_fields
        )
        
        # Check if all requested entities were found
        found_ids = {entity['id'] for entity in entities}
        missing_ids = set(ids) - found_ids
        
        if missing_ids:
            print(f"Warning: {len(missing_ids)} entities not found: {list(missing_ids)[:5]}...")
        
        return entities
        
    except Exception as e:
        print(f"Error retrieving entities: {e}")
        return []

# Usage
products = safe_get_entities("products", [1, 2, 999999], ["id", "name", "price"])

Data Iteration

Query Iterator

def query_iterator(
    collection_name: str,
    filter: str,
    output_fields: Optional[List[str]] = None,
    batch_size: int = 1000,
    limit: Optional[int] = None,
    partition_names: Optional[List[str]] = None,
    timeout: Optional[float] = None,
    **kwargs
) -> QueryIterator
# Process large result sets efficiently
iterator = client.query_iterator(
    "large_collection",
    filter="status == 'active'",
    output_fields=["id", "data", "timestamp"],
    batch_size=2000
)

processed_count = 0
for batch in iterator:
    print(f"Processing batch of {len(batch)} records")
    
    # Process each record in the batch
    for record in batch:
        # Custom processing logic
        process_record(record)
        processed_count += 1
    
    # Optional: limit processing
    if processed_count >= 50000:
        print("Reached processing limit")
        break

print(f"Total processed: {processed_count} records")

Data Export Patterns

def export_collection_to_csv(collection_name: str, output_file: str, filter_expr: str = "", batch_size: int = 5000):
    """Export collection data to CSV file"""
    
    import csv
    
    # Get collection schema to determine fields
    collection_info = client.describe_collection(collection_name)
    field_names = [field['name'] for field in collection_info['schema']['fields']]
    
    # Remove vector fields for CSV export (too large)
    scalar_fields = [name for name in field_names if not name.endswith('_vector')]
    
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=scalar_fields)
        writer.writeheader()
        
        # Use iterator for memory-efficient export
        iterator = client.query_iterator(
            collection_name,
            filter=filter_expr or "id >= 0",  # Get all if no filter
            output_fields=scalar_fields,
            batch_size=batch_size
        )
        
        total_exported = 0
        for batch in iterator:
            # Convert batch to CSV rows
            for record in batch:
                # Handle JSON fields by converting to string
                csv_row = {}
                for field in scalar_fields:
                    value = record.get(field)
                    if isinstance(value, (dict, list)):
                        csv_row[field] = json.dumps(value)
                    else:
                        csv_row[field] = value
                
                writer.writerow(csv_row)
                total_exported += 1
            
            print(f"Exported {total_exported} records...")
    
    print(f"Export completed: {total_exported} records saved to {output_file}")

# Export active products to CSV
export_collection_to_csv(
    "products", 
    "active_products.csv", 
    filter_expr="status == 'active' and price > 0"
)

Data Transformation Pipeline

def data_migration_pipeline(source_collection: str, target_collection: str, transform_func):
    """Migrate data between collections with transformation"""
    
    # Process in batches
    iterator = client.query_iterator(
        source_collection,
        filter="id >= 0",  # All records
        batch_size=1000
    )
    
    migrated_count = 0
    errors = []
    
    for batch in iterator:
        try:
            # Transform batch data
            transformed_batch = []
            for record in batch:
                transformed = transform_func(record)
                if transformed:  # Skip None results
                    transformed_batch.append(transformed)
            
            # Insert into target collection
            if transformed_batch:
                result = client.insert(target_collection, transformed_batch)
                migrated_count += result['insert_count']
                print(f"Migrated {result['insert_count']} records, total: {migrated_count}")
        
        except Exception as e:
            error_msg = f"Error processing batch: {e}"
            errors.append(error_msg)
            print(error_msg)
    
    return {"migrated": migrated_count, "errors": errors}

# Example transformation function
def modernize_document(old_record):
    """Transform old document format to new format"""
    return {
        "id": old_record["id"],
        "title": old_record["title"],
        "content": old_record["content"],
        "vector": old_record["embedding"],  # Rename field
        "metadata": {
            "category": old_record.get("category", "general"),
            "created_at": old_record.get("timestamp", 0),
            "migrated_at": int(time.time())
        },
        "status": "migrated"
    }

# Run migration
result = data_migration_pipeline("old_documents", "new_documents", modernize_document)
print(f"Migration completed: {result}")

Data Validation

Insert Validation

def validate_and_insert(collection_name: str, data: List[Dict], schema_info: Dict) -> Dict:
    """Validate data against collection schema before insertion"""
    
    # Extract field information from schema
    required_fields = set()
    field_types = {}
    vector_dims = {}
    
    for field in schema_info['schema']['fields']:
        field_name = field['name']
        field_type = field['type']
        
        if not field.get('autoID', False):
            required_fields.add(field_name)
        
        field_types[field_name] = field_type
        
        if field_type in ['FloatVector', 'BinaryVector']:
            vector_dims[field_name] = field.get('params', {}).get('dim', 0)
    
    validated_data = []
    errors = []
    
    for i, record in enumerate(data):
        record_errors = []
        
        # Check required fields
        missing_fields = required_fields - set(record.keys())
        if missing_fields:
            record_errors.append(f"Missing fields: {missing_fields}")
        
        # Validate vector dimensions
        for field_name, expected_dim in vector_dims.items():
            if field_name in record:
                vector = record[field_name]
                if isinstance(vector, list) and len(vector) != expected_dim:
                    record_errors.append(f"{field_name} dimension mismatch: expected {expected_dim}, got {len(vector)}")
        
        # Validate data types (basic validation)
        for field_name, value in record.items():
            if field_name in field_types:
                field_type = field_types[field_name]
                if field_type == 'VarChar' and not isinstance(value, str):
                    record_errors.append(f"{field_name} must be string, got {type(value)}")
                elif field_type in ['Int64', 'Int32'] and not isinstance(value, int):
                    record_errors.append(f"{field_name} must be integer, got {type(value)}")
        
        if record_errors:
            errors.append(f"Record {i}: {'; '.join(record_errors)}")
        else:
            validated_data.append(record)
    
    # Insert valid data
    result = {"insert_count": 0, "errors": errors}
    
    if validated_data:
        insert_result = client.insert(collection_name, validated_data)
        result["insert_count"] = insert_result["insert_count"]
    
    return result

# Usage
collection_schema = client.describe_collection("products")
data_to_insert = [
    {"id": 1, "name": "Product 1", "vector": [0.1] * 128},
    {"id": 2, "vector": [0.2] * 64},  # Wrong dimension - will be flagged
]

validation_result = validate_and_insert("products", data_to_insert, collection_schema)
print(f"Inserted: {validation_result['insert_count']}")
if validation_result['errors']:
    print("Validation errors:", validation_result['errors'])

PyMilvus data management operations provide robust capabilities for handling large-scale vector and scalar data with efficient batch processing, validation, and error handling mechanisms.

Install with Tessl CLI

npx tessl i tessl/pypi-pymilvus

docs

data-management.md

index-management.md

index.md

milvus-client.md

orm-collection.md

search-operations.md

types-enums.md

user-management.md

utility-functions.md

tile.json