CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Pending
Overview
Eval results
Files

utility-functions.mddocs/

Utility Functions

PyMilvus provides a comprehensive set of utility functions for database maintenance, monitoring, timestamp management, and administrative operations. These functions are available both through the utility module and as direct imports from the main package.

Import Patterns

# Import utility module
from pymilvus import utility

# Use utility functions
utility.has_collection("my_collection")
utility.loading_progress("my_collection")

# Direct imports (preferred for commonly used functions)
from pymilvus import (
    has_collection, list_collections, drop_collection,
    create_user, delete_user, list_usernames,
    mkts_from_datetime, hybridts_to_datetime
)

# Use functions directly
has_collection("my_collection")
loading_progress("my_collection")

Collection Utilities

Collection Existence and Listing

from pymilvus import utility

def has_collection(
    collection_name: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> bool

def list_collections(
    timeout: Optional[float] = None,
    using: str = "default"
) -> List[str]

def drop_collection(
    collection_name: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> None
# Check collection existence
if utility.has_collection("documents"):
    print("Documents collection exists")
else:
    print("Documents collection not found")

# List all collections
collections = utility.list_collections()
print(f"Available collections: {collections}")

# Conditional operations based on existence
collection_name = "temp_collection"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)
    print(f"Dropped existing collection: {collection_name}")

# Batch collection operations
collections_to_check = ["users", "products", "orders", "analytics"]
existing_collections = []
missing_collections = []

for collection in collections_to_check:
    if utility.has_collection(collection):
        existing_collections.append(collection)
    else:
        missing_collections.append(collection)

print(f"Existing: {existing_collections}")
print(f"Missing: {missing_collections}")

Collection Renaming

def rename_collection(
    old_name: str,
    new_name: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> None
# Rename collection
utility.rename_collection("old_products", "products_archive")

# Safe rename with existence check
def safe_rename_collection(old_name: str, new_name: str):
    """Safely rename collection with validation"""
    
    if not utility.has_collection(old_name):
        print(f"Source collection {old_name} does not exist")
        return False
    
    if utility.has_collection(new_name):
        print(f"Target collection {new_name} already exists")
        return False
    
    try:
        utility.rename_collection(old_name, new_name)
        print(f"Successfully renamed {old_name} to {new_name}")
        return True
    except Exception as e:
        print(f"Rename failed: {e}")
        return False

# Usage
success = safe_rename_collection("temp_data", "processed_data")

Partition Utilities

Partition Operations

def has_partition(
    collection_name: str,
    partition_name: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> bool
# Check partition existence
collection_name = "time_series"
partitions_to_check = ["2024_q1", "2024_q2", "2024_q3", "2024_q4"]

for partition in partitions_to_check:
    exists = utility.has_partition(collection_name, partition)
    print(f"Partition {partition}: {'exists' if exists else 'missing'}")

# Conditional partition operations
def ensure_partition_exists(collection_name: str, partition_name: str):
    """Ensure partition exists, create if missing"""
    from pymilvus import Collection
    
    if not utility.has_partition(collection_name, partition_name):
        collection = Collection(collection_name)
        collection.create_partition(partition_name)
        print(f"Created partition: {partition_name}")
    else:
        print(f"Partition {partition_name} already exists")

# Create quarterly partitions
for quarter in ["2024_q1", "2024_q2", "2024_q3", "2024_q4"]:
    ensure_partition_exists("sales_data", quarter)

Loading and Progress Monitoring

Loading Operations

def loading_progress(
    collection_name: str,
    partition_names: Optional[List[str]] = None,
    using: str = "default",
    timeout: Optional[float] = None
) -> Dict[str, Any]

def wait_for_loading_complete(
    collection_name: str,
    partition_names: Optional[List[str]] = None,
    timeout: Optional[float] = None,
    using: str = "default"
) -> None
# Monitor loading progress
progress = utility.loading_progress("large_collection")
print(f"Loading progress: {progress}")

# Example progress structure:
# {
#     'loading_progress': 85.5,
#     'num_loaded_partitions': 3,
#     'not_loaded_partitions': ['partition_4'],
#     'loading_partitions': ['partition_5'],
#     'loaded_partitions': ['partition_1', 'partition_2', 'partition_3']
# }

# Wait for loading to complete
print("Waiting for collection to load...")
utility.wait_for_loading_complete("large_collection", timeout=300)  # 5 minute timeout
print("Collection loading completed")

# Monitor loading with progress updates
def monitor_loading_progress(collection_name: str, check_interval: int = 5):
    """Monitor loading progress with periodic updates"""
    import time
    
    while True:
        progress = utility.loading_progress(collection_name)
        loading_pct = progress.get('loading_progress', 0)
        
        print(f"Loading progress: {loading_pct:.1f}%")
        
        if loading_pct >= 100:
            print("Loading completed!")
            break
        
        time.sleep(check_interval)

# Usage
monitor_loading_progress("huge_dataset", check_interval=10)

Index Building Progress

def index_building_progress(
    collection_name: str,
    index_name: str = "",
    using: str = "default",
    timeout: Optional[float] = None
) -> Dict[str, Any]

def wait_for_index_building_complete(
    collection_name: str,
    index_name: str = "",
    timeout: Optional[float] = None,
    using: str = "default"
) -> None
# Monitor index building
index_progress = utility.index_building_progress("documents", "vector_index")
print(f"Index building progress: {index_progress}")

# Example index progress structure:
# {
#     'total_rows': 1000000,
#     'indexed_rows': 750000,
#     'pending_index_rows': 250000,
#     'index_state': 'InProgress',  # 'Unissued', 'InProgress', 'Finished', 'Failed'
#     'progress': 75.0
# }

# Wait for index building
utility.wait_for_index_building_complete("documents", "vector_index", timeout=600)

# Monitor multiple index builds
def monitor_all_indexes(collection_name: str):
    """Monitor all index building for a collection"""
    from pymilvus import Collection
    
    collection = Collection(collection_name)
    
    # Get all indexes
    indexes = collection.indexes
    
    for index in indexes:
        field_name = index.field_name
        
        print(f"Monitoring index on field: {field_name}")
        
        while True:
            progress = utility.index_building_progress(collection_name, field_name)
            state = progress.get('index_state', 'Unknown')
            pct = progress.get('progress', 0)
            
            print(f"  {field_name}: {state} - {pct:.1f}%")
            
            if state in ['Finished', 'Failed']:
                break
            
            time.sleep(10)
        
        if state == 'Finished':
            print(f"✓ Index on {field_name} completed successfully")
        else:
            print(f"✗ Index on {field_name} failed")

# Monitor all indexes for a collection
monitor_all_indexes("multi_field_collection")

User Management Utilities

User Operations

def create_user(
    user: str,
    password: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> None

def delete_user(
    user: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> None

def list_usernames(
    using: str = "default",
    timeout: Optional[float] = None
) -> List[str]

def update_password(
    user: str,
    old_password: str,
    new_password: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> None

def reset_password(
    user: str,
    new_password: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> None
# User management examples
users_to_create = [
    ("analyst", "analyst_password"),
    ("viewer", "viewer_password"),
    ("admin", "admin_password")
]

# Create users
for username, password in users_to_create:
    try:
        utility.create_user(username, password)
        print(f"✓ Created user: {username}")
    except Exception as e:
        print(f"✗ Failed to create {username}: {e}")

# List all users
users = utility.list_usernames()
print(f"System users: {users}")

# Password management
utility.update_password("analyst", "old_password", "new_secure_password")
utility.reset_password("viewer", "admin_reset_password")

# Cleanup old users
obsolete_users = ["temp_user", "test_user", "old_account"]
for username in obsolete_users:
    if username in utility.list_usernames():
        utility.delete_user(username)
        print(f"Deleted user: {username}")

Timestamp Utilities

Timestamp Creation

def mkts_from_hybridts(
    hybridts: int,
    milliseconds: float = 0.0,
    delta: Optional[timedelta] = None
) -> int

def mkts_from_unixtime(
    epoch: float,
    milliseconds: float = 0.0,
    delta: Optional[timedelta] = None
) -> int

def mkts_from_datetime(
    d_time: datetime,
    milliseconds: float = 0.0,
    delta: Optional[timedelta] = None
) -> int
from datetime import datetime, timedelta
from pymilvus import mkts_from_datetime, mkts_from_unixtime

# Create timestamp from datetime
now = datetime.now()
travel_timestamp = mkts_from_datetime(now)
print(f"Travel timestamp: {travel_timestamp}")

# Create timestamp for specific time
specific_time = datetime(2024, 1, 1, 12, 0, 0)
historical_timestamp = mkts_from_datetime(specific_time)

# Create timestamp with offset
one_hour_ago = mkts_from_datetime(now, delta=timedelta(hours=-1))
one_day_future = mkts_from_datetime(now, delta=timedelta(days=1))

# Create from Unix timestamp
unix_time = 1640995200  # 2022-01-01 00:00:00 UTC
timestamp_from_unix = mkts_from_unixtime(unix_time)

# Use timestamps for time travel queries
from pymilvus import MilvusClient
client = MilvusClient()

# Query data as it existed 1 hour ago
historical_results = client.query(
    "time_series_data",
    expr="id > 0",
    travel_timestamp=one_hour_ago,
    output_fields=["id", "value", "timestamp"]
)
print(f"Historical data (1 hour ago): {len(historical_results)} records")

Timestamp Conversion

def hybridts_to_datetime(
    hybridts: int,
    tz: Optional[timezone] = None
) -> datetime

def hybridts_to_unixtime(
    hybridts: int
) -> float
from pymilvus import hybridts_to_datetime, hybridts_to_unixtime

# Convert Milvus hybrid timestamp to datetime
milvus_timestamp = 434646822236381184  # Example hybrid timestamp
dt = hybridts_to_datetime(milvus_timestamp)
print(f"Datetime: {dt}")

# Convert to Unix timestamp  
unix_time = hybridts_to_unixtime(milvus_timestamp)
print(f"Unix time: {unix_time}")

# Working with search results that include timestamps
results = client.search("timestamped_data", [[0.1] * 128], limit=5)

for hit in results[0]:
    # If your collection has a timestamp field
    ts_field = hit.entity.get('_ts')  # Milvus internal timestamp
    if ts_field:
        readable_time = hybridts_to_datetime(ts_field)
        print(f"Record ID {hit.id}: created at {readable_time}")

Resource Group Management

Resource Group Operations

def create_resource_group(
    name: str,
    config: Optional[Dict] = None,
    using: str = "default",
    timeout: Optional[float] = None
) -> None

def drop_resource_group(
    name: str,
    using: str = "default", 
    timeout: Optional[float] = None
) -> None

def describe_resource_group(
    name: str,
    using: str = "default",
    timeout: Optional[float] = None
) -> Dict[str, Any]

def list_resource_groups(
    using: str = "default",
    timeout: Optional[float] = None
) -> List[str]

def update_resource_groups(
    resource_groups: Dict[str, Dict],
    using: str = "default",
    timeout: Optional[float] = None
) -> None
# Create resource groups for different workloads
resource_groups = {
    "gpu_group": {"requests": {"node_num": 2}, "limits": {"node_num": 4}},
    "cpu_group": {"requests": {"node_num": 4}, "limits": {"node_num": 8}},
    "memory_intensive": {"requests": {"node_num": 1}, "limits": {"node_num": 2}}
}

for group_name, config in resource_groups.items():
    try:
        utility.create_resource_group(group_name, config)
        print(f"✓ Created resource group: {group_name}")
    except Exception as e:
        print(f"✗ Failed to create {group_name}: {e}")

# List and describe resource groups
groups = utility.list_resource_groups()
print(f"Available resource groups: {groups}")

for group in groups:
    group_info = utility.describe_resource_group(group)
    print(f"Group {group}: {group_info}")

# Update resource group configuration
updates = {
    "gpu_group": {"limits": {"node_num": 6}},  # Increase limit
    "cpu_group": {"requests": {"node_num": 6}}  # Increase requests
}

utility.update_resource_groups(updates)
print("Resource group configurations updated")

Node and Replica Transfer

def transfer_node(
    source: str,
    target: str,
    num_nodes: int,
    using: str = "default",
    timeout: Optional[float] = None
) -> None

def transfer_replica(
    source_group: str,
    target_group: str,
    collection_name: str,
    num_replicas: int,
    using: str = "default",
    timeout: Optional[float] = None
) -> None
# Transfer nodes between resource groups
utility.transfer_node("cpu_group", "gpu_group", 2)
print("Transferred 2 nodes from cpu_group to gpu_group")

# Transfer replicas for load balancing
utility.transfer_replica("overloaded_group", "underutilized_group", "large_collection", 1)
print("Transferred 1 replica to balance load")

# Dynamic resource rebalancing
def rebalance_resources():
    """Automatically rebalance resources based on usage"""
    
    groups = utility.list_resource_groups()
    
    for group in groups:
        group_info = utility.describe_resource_group(group)
        
        available_nodes = group_info.get('num_available_node', 0)
        loaded_replicas = group_info.get('num_loaded_replica', 0)
        
        # Simple rebalancing logic
        if available_nodes > loaded_replicas + 2:
            # Group has excess capacity
            print(f"Group {group} has excess capacity: {available_nodes} nodes, {loaded_replicas} replicas")
        elif available_nodes < loaded_replicas:
            # Group is overloaded
            print(f"Group {group} is overloaded: {available_nodes} nodes, {loaded_replicas} replicas")

rebalance_resources()

Server Information

Version and Server Details

def get_server_version(
    using: str = "default",
    timeout: Optional[float] = None
) -> str

def get_server_type(
    using: str = "default"
) -> str
# Get server information
version = utility.get_server_version()
server_type = utility.get_server_type()

print(f"Milvus Version: {version}")
print(f"Server Type: {server_type}")

# Version compatibility check
def check_version_compatibility(required_version: str):
    """Check if server version meets requirements"""
    
    current_version = utility.get_server_version()
    
    # Simple version comparison (you might want more sophisticated logic)
    current_parts = current_version.split('.')
    required_parts = required_version.split('.')
    
    for i, (current, required) in enumerate(zip(current_parts, required_parts)):
        if int(current) > int(required):
            return True
        elif int(current) < int(required):
            return False
    
    return True  # Equal versions

# Check compatibility
if check_version_compatibility("2.3.0"):
    print("Server version is compatible")
else:
    print("Server version is too old")

Maintenance Operations

Bulk Operations

def do_bulk_insert(
    collection_name: str,
    files: List[str],
    partition_name: Optional[str] = None,
    using: str = "default",
    timeout: Optional[float] = None,
    **kwargs
) -> int

def get_bulk_insert_state(
    task_id: int,
    using: str = "default",
    timeout: Optional[float] = None,
    **kwargs
) -> Dict[str, Any]

def list_bulk_insert_tasks(
    limit: int = 0,
    collection_name: Optional[str] = None,
    using: str = "default",
    timeout: Optional[float] = None,
    **kwargs
) -> List[Dict[str, Any]]
# Bulk insert from files
files_to_insert = [
    "/data/batch1.json",
    "/data/batch2.json", 
    "/data/batch3.json"
]

task_id = utility.do_bulk_insert("large_collection", files_to_insert)
print(f"Bulk insert task started: {task_id}")

# Monitor bulk insert progress
while True:
    state = utility.get_bulk_insert_state(task_id)
    status = state.get('state', 'Unknown')
    progress = state.get('progress', 0)
    
    print(f"Bulk insert progress: {status} - {progress}%")
    
    if status in ['ImportCompleted', 'ImportFailed']:
        break
    
    time.sleep(30)

# List all bulk insert tasks
tasks = utility.list_bulk_insert_tasks(limit=10)
for task in tasks:
    print(f"Task {task['task_id']}: {task['state']} - {task.get('collection_name', 'N/A')}")

Maintenance and Monitoring

def flush_all(
    using: str = "default",
    timeout: Optional[float] = None,
    **kwargs
) -> None

def get_query_segment_info(
    collection_name: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> List[Dict[str, Any]]

def load_balance(
    src_node_id: int,
    dst_node_ids: Optional[List[int]] = None,
    sealed_segment_ids: Optional[List[int]] = None,
    using: str = "default",
    timeout: Optional[float] = None
) -> None
# Flush all collections to ensure data persistence
utility.flush_all()
print("Flushed all collections")

# Get segment information for query analysis
segment_info = utility.get_query_segment_info("analytics_collection")
print(f"Segment info: {len(segment_info)} segments")

for segment in segment_info[:5]:  # Show first 5 segments
    print(f"  Segment {segment['segment_id']}: {segment['num_rows']} rows, {segment['mem_size']} bytes")

# Load balancing between nodes
utility.load_balance(
    src_node_id=1,
    dst_node_ids=[2, 3],  # Distribute to nodes 2 and 3
    sealed_segment_ids=None  # Balance all segments
)
print("Load balancing completed")

Alias Management Utilities

Alias Operations

def create_alias(
    collection_name: str,
    alias: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> None

def drop_alias(
    alias: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> None

def alter_alias(
    collection_name: str,
    alias: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> None

def list_aliases(
    collection_name: str,
    timeout: Optional[float] = None,
    using: str = "default"
) -> List[str]
# Create aliases for version management
utility.create_alias("products_v2", "products_current")
utility.create_alias("products_v1", "products_stable")

# Blue-green deployment pattern
def deploy_new_version(new_collection: str, alias: str):
    """Deploy new collection version using alias switching"""
    
    # Get current alias target
    try:
        aliases = utility.list_aliases(new_collection)
        print(f"Current aliases for {new_collection}: {aliases}")
    except:
        pass
    
    # Switch alias to new collection
    utility.alter_alias(new_collection, alias)
    print(f"Switched alias {alias} to {new_collection}")

# Deploy new version
deploy_new_version("products_v3", "products_current")

# List all aliases for a collection
aliases = utility.list_aliases("products_v3")
print(f"Aliases for products_v3: {aliases}")

Error Handling and Retry Logic

def retry_utility_operation(operation_func, max_retries: int = 3, delay: float = 1.0):
    """Retry utility operations with exponential backoff"""
    import time
    
    for attempt in range(max_retries):
        try:
            return operation_func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            
            wait_time = delay * (2 ** attempt)
            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)

# Usage examples
def safe_has_collection(collection_name: str) -> bool:
    """Safely check collection existence with retry"""
    return retry_utility_operation(
        lambda: utility.has_collection(collection_name),
        max_retries=3
    )

def safe_wait_for_loading(collection_name: str, timeout: int = 300):
    """Safely wait for loading with retry logic"""
    return retry_utility_operation(
        lambda: utility.wait_for_loading_complete(collection_name, timeout=timeout),
        max_retries=2
    )

# Use safe operations
if safe_has_collection("important_collection"):
    safe_wait_for_loading("important_collection")
    print("Collection loaded successfully")

PyMilvus utility functions provide essential database administration, monitoring, and maintenance capabilities, enabling efficient management of large-scale vector database deployments with comprehensive error handling and retry mechanisms.

Install with Tessl CLI

npx tessl i tessl/pypi-pymilvus

docs

data-management.md

index-management.md

index.md

milvus-client.md

orm-collection.md

search-operations.md

types-enums.md

user-management.md

utility-functions.md

tile.json