CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymilvus

Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.

Pending
Overview
Eval results
Files

user-management.mddocs/

User Management

PyMilvus provides comprehensive authentication, authorization, and access control capabilities including user management, role-based access control (RBAC), privilege groups, and resource management. This enables secure multi-tenant deployments with fine-grained permissions.

User Operations

Create User

from pymilvus import MilvusClient

client = MilvusClient()

def create_user(
    user_name: str,
    password: str,
    timeout: Optional[float] = None
) -> None
# Create new user
client.create_user("data_analyst", "secure_password_123")

# Create user with special characters in password
client.create_user("ml_engineer", "P@ssw0rd!2024")

# Multiple user creation
users_to_create = [
    ("reader_user", "read_only_pass"),
    ("writer_user", "write_access_pass"),
    ("admin_user", "admin_secure_pass")
]

for username, password in users_to_create:
    try:
        client.create_user(username, password)
        print(f"✓ Created user: {username}")
    except Exception as e:
        print(f"✗ Failed to create user {username}: {e}")

User Information

def list_users(
    timeout: Optional[float] = None
) -> List[str]

def describe_user(
    user_name: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]
# List all users
users = client.list_users()
print(f"System users: {users}")

# Get detailed user information
user_info = client.describe_user("data_analyst")
print(f"User info: {user_info}")

# Check user roles and privileges
for user in users:
    try:
        info = client.describe_user(user)
        roles = info.get('roles', [])
        print(f"User {user}: roles = {roles}")
    except Exception as e:
        print(f"Error getting info for {user}: {e}")

Password Management

def update_password(
    user_name: str,
    old_password: str,
    new_password: str,
    timeout: Optional[float] = None
) -> None

def reset_password(
    user_name: str,
    new_password: str,
    timeout: Optional[float] = None
) -> None
# Update password (requires current password)
client.update_password("data_analyst", "old_password", "new_secure_password")

# Reset password (admin operation, doesn't require old password)
client.reset_password("data_analyst", "admin_reset_password")

# Password update with error handling
def safe_password_update(username: str, old_pass: str, new_pass: str):
    """Safely update password with validation"""
    
    # Basic password validation
    if len(new_pass) < 8:
        raise ValueError("Password must be at least 8 characters")
    
    if not any(c.isupper() for c in new_pass):
        raise ValueError("Password must contain uppercase letter")
    
    if not any(c.isdigit() for c in new_pass):
        raise ValueError("Password must contain digit")
    
    try:
        client.update_password(username, old_pass, new_pass)
        print(f"Password updated for user: {username}")
        return True
    except Exception as e:
        print(f"Password update failed: {e}")
        return False

# Usage
success = safe_password_update("data_analyst", "old_pass", "NewSecure123!")

Delete User

def drop_user(
    user_name: str,
    timeout: Optional[float] = None
) -> None
# Delete single user
client.drop_user("obsolete_user")

# Bulk user deletion with confirmation
def bulk_delete_users(usernames: List[str], confirm: bool = False):
    """Safely delete multiple users"""
    
    if not confirm:
        print("This will delete the following users:")
        for username in usernames:
            print(f"  - {username}")
        print("Set confirm=True to proceed")
        return
    
    deleted_count = 0
    for username in usernames:
        try:
            # Check if user exists first
            users = client.list_users()
            if username in users:
                client.drop_user(username)
                deleted_count += 1
                print(f"✓ Deleted user: {username}")
            else:
                print(f"- User {username} doesn't exist")
        except Exception as e:
            print(f"✗ Failed to delete {username}: {e}")
    
    print(f"Deleted {deleted_count}/{len(usernames)} users")

# Usage
bulk_delete_users(["temp_user1", "test_user2"], confirm=True)

Role Management

Create and Manage Roles

def create_role(
    role_name: str,
    timeout: Optional[float] = None
) -> None

def drop_role(
    role_name: str,
    timeout: Optional[float] = None
) -> None

def list_roles(
    timeout: Optional[float] = None
) -> List[str]

def describe_role(
    role_name: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]
# Create roles for different access levels
roles_to_create = [
    "data_reader",      # Read-only access
    "data_writer",      # Read + write access
    "collection_admin", # Collection management
    "system_admin"      # Full system access
]

for role in roles_to_create:
    try:
        client.create_role(role)
        print(f"✓ Created role: {role}")
    except Exception as e:
        print(f"✗ Role creation failed for {role}: {e}")

# List and describe roles
roles = client.list_roles()
print(f"Available roles: {roles}")

for role in roles:
    role_info = client.describe_role(role)
    print(f"Role {role}: {role_info}")

Role Assignment

def grant_role(
    user_name: str,
    role_name: str,
    timeout: Optional[float] = None
) -> None

def revoke_role(
    user_name: str,
    role_name: str,
    timeout: Optional[float] = None
) -> None
# Assign roles to users
role_assignments = {
    "analyst1": ["data_reader"],
    "analyst2": ["data_reader", "data_writer"],
    "admin1": ["collection_admin", "system_admin"],
    "service_user": ["data_writer"]
}

for username, roles in role_assignments.items():
    for role in roles:
        try:
            client.grant_role(username, role)
            print(f"✓ Granted {role} to {username}")
        except Exception as e:
            print(f"✗ Failed to grant {role} to {username}: {e}")

# Revoke role from user
client.revoke_role("analyst1", "data_writer")
print("Revoked data_writer role from analyst1")

Privilege Management

Grant and Revoke Privileges

def grant_privilege(
    role_name: str,
    object_type: str,
    privilege: str,
    object_name: str,
    db_name: str = "",
    timeout: Optional[float] = None
) -> None

def revoke_privilege(
    role_name: str,
    object_type: str,
    privilege: str,
    object_name: str,
    db_name: str = "",
    timeout: Optional[float] = None
) -> None

Object Types:

  • "Global": System-level privileges
  • "Collection": Collection-specific privileges
  • "User": User management privileges

Common Privileges:

  • "Insert", "Delete", "Upsert": Data modification
  • "Search", "Query": Data access
  • "IndexUser": Index management
  • "CollectionAdmin": Collection administration
  • "DatabaseAdmin": Database administration
# Grant collection-level privileges
collection_privileges = [
    ("data_reader", "Collection", "Search", "public_data"),
    ("data_reader", "Collection", "Query", "public_data"),
    ("data_writer", "Collection", "Insert", "user_content"),
    ("data_writer", "Collection", "Delete", "user_content"),
    ("data_writer", "Collection", "Upsert", "user_content"),
]

for role, obj_type, privilege, obj_name in collection_privileges:
    try:
        client.grant_privilege(role, obj_type, privilege, obj_name)
        print(f"✓ Granted {privilege} on {obj_name} to {role}")
    except Exception as e:
        print(f"✗ Failed to grant {privilege}: {e}")

# Grant global privileges
client.grant_privilege("collection_admin", "Global", "CreateCollection", "*")
client.grant_privilege("collection_admin", "Global", "DropCollection", "*")

# Grant user management privileges
client.grant_privilege("system_admin", "User", "UpdateUser", "*")
client.grant_privilege("system_admin", "User", "SelectUser", "*")

Enhanced Privilege Management (V2)

def grant_privilege_v2(
    role_name: str,
    privilege: str,
    collection_name: str = "*",
    db_name: str = "",
    timeout: Optional[float] = None
) -> None

def revoke_privilege_v2(
    role_name: str,
    privilege: str,
    collection_name: str = "*", 
    db_name: str = "",
    timeout: Optional[float] = None
) -> None
# V2 privilege management with simplified syntax
v2_privileges = [
    ("analyst_role", "Query", "analytics_data"),
    ("analyst_role", "Search", "analytics_data"),
    ("writer_role", "Insert", "content_collection"),
    ("writer_role", "Delete", "content_collection"),
    ("admin_role", "DropCollection", "*"),  # Global privilege
]

for role, privilege, collection in v2_privileges:
    try:
        client.grant_privilege_v2(role, privilege, collection)
        print(f"✓ Granted {privilege} on {collection} to {role}")
    except Exception as e:
        print(f"✗ V2 privilege grant failed: {e}")

Privilege Groups

Privilege Group Management

def create_privilege_group(
    group_name: str,
    timeout: Optional[float] = None
) -> None

def drop_privilege_group(
    group_name: str,
    timeout: Optional[float] = None
) -> None

def list_privilege_groups(
    timeout: Optional[float] = None
) -> List[str]
# Create privilege groups for common access patterns
privilege_groups = [
    "read_only_group",
    "content_editor_group", 
    "analytics_group",
    "admin_group"
]

for group in privilege_groups:
    try:
        client.create_privilege_group(group)
        print(f"✓ Created privilege group: {group}")
    except Exception as e:
        print(f"✗ Failed to create group {group}: {e}")

# List privilege groups
groups = client.list_privilege_groups()
print(f"Privilege groups: {groups}")

Add and Remove Privileges from Groups

def add_privileges_to_group(
    group_name: str,
    privileges: List[str],
    timeout: Optional[float] = None
) -> None

def remove_privileges_from_group(
    group_name: str,
    privileges: List[str],
    timeout: Optional[float] = None
) -> None
# Define privilege sets for different groups
group_privileges = {
    "read_only_group": ["Query", "Search"],
    "content_editor_group": ["Query", "Search", "Insert", "Delete", "Upsert"],
    "analytics_group": ["Query", "Search", "IndexUser"],
    "admin_group": ["Query", "Search", "Insert", "Delete", "Upsert", "CreateCollection", "DropCollection"]
}

# Add privileges to groups
for group_name, privileges in group_privileges.items():
    try:
        client.add_privileges_to_group(group_name, privileges)
        print(f"✓ Added {len(privileges)} privileges to {group_name}")
    except Exception as e:
        print(f"✗ Failed to add privileges to {group_name}: {e}")

# Remove specific privileges from group
client.remove_privileges_from_group("analytics_group", ["IndexUser"])
print("Removed IndexUser privilege from analytics_group")

Database Operations

Database Management

def create_database(
    db_name: str,
    timeout: Optional[float] = None
) -> None

def drop_database(
    db_name: str,
    timeout: Optional[float] = None
) -> None

def list_databases(
    timeout: Optional[float] = None
) -> List[str]

def describe_database(
    db_name: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]
# Create databases for different environments
environments = ["development", "staging", "production", "analytics"]

for env in environments:
    try:
        client.create_database(env)
        print(f"✓ Created database: {env}")
    except Exception as e:
        print(f"✗ Failed to create database {env}: {e}")

# List and describe databases
databases = client.list_databases()
print(f"Available databases: {databases}")

for db in databases:
    db_info = client.describe_database(db)
    print(f"Database {db}: {db_info}")

Database Context Management

def use_database(
    db_name: str,
    timeout: Optional[float] = None
) -> None

def using_database(
    db_name: str,
    timeout: Optional[float] = None
) -> ContextManager
# Switch to specific database
client.use_database("production")
print("Switched to production database")

# Use context manager for temporary database switch
with client.using_database("analytics"):
    # Operations in this block use the analytics database
    collections = client.list_collections()
    print(f"Analytics collections: {collections}")

# Back to previous database context
collections = client.list_collections()
print(f"Production collections: {collections}")

Database Properties

def alter_database_properties(
    db_name: str,
    properties: Dict[str, str],
    timeout: Optional[float] = None
) -> None

def drop_database_properties(
    db_name: str,
    property_keys: List[str],
    timeout: Optional[float] = None
) -> None
# Set database properties
db_properties = {
    "description": "Production database for AI applications",
    "environment": "production",
    "owner": "ml_team"
}

client.alter_database_properties("production", db_properties)

# Remove specific properties
client.drop_database_properties("production", ["owner"])

Alias Management

Collection Aliases

def create_alias(
    collection_name: str,
    alias: str,
    timeout: Optional[float] = None
) -> None

def drop_alias(
    alias: str,
    timeout: Optional[float] = None
) -> None

def alter_alias(
    collection_name: str,
    alias: str,
    timeout: Optional[float] = None
) -> None

def describe_alias(
    alias: str,
    timeout: Optional[float] = None
) -> Dict[str, Any]

def list_aliases(
    collection_name: str,
    timeout: Optional[float] = None
) -> List[str]
# Create aliases for collections
alias_mappings = {
    "products_v2": "products_latest",
    "user_embeddings_2024": "current_embeddings", 
    "search_logs_december": "recent_logs"
}

for collection, alias in alias_mappings.items():
    try:
        client.create_alias(collection, alias)
        print(f"✓ Created alias {alias} -> {collection}")
    except Exception as e:
        print(f"✗ Failed to create alias {alias}: {e}")

# Switch alias to new collection (blue-green deployment)
client.alter_alias("products_v3", "products_latest")
print("Switched products_latest alias to products_v3")

# Get alias information
alias_info = client.describe_alias("products_latest")
print(f"Alias info: {alias_info}")

# List aliases for collection
aliases = client.list_aliases("products_v3")
print(f"Aliases for products_v3: {aliases}")

Access Control Patterns

Role-Based Access Control Setup

def setup_rbac_system():
    """Setup comprehensive RBAC system"""
    
    # 1. Create roles
    roles = {
        "viewer": "Read-only access to data",
        "editor": "Read and write access to specific collections", 
        "analyst": "Read access + query capabilities",
        "admin": "Full administrative access"
    }
    
    for role_name, description in roles.items():
        try:
            client.create_role(role_name)
            print(f"✓ Created role: {role_name} - {description}")
        except Exception as e:
            print(f"Role {role_name} might already exist: {e}")
    
    # 2. Create privilege groups
    groups = ["basic_read", "data_modification", "collection_management", "system_administration"]
    
    for group in groups:
        try:
            client.create_privilege_group(group)
        except Exception as e:
            print(f"Group {group} might already exist: {e}")
    
    # 3. Define privilege sets
    privilege_sets = {
        "basic_read": ["Query", "Search"],
        "data_modification": ["Insert", "Delete", "Upsert"],
        "collection_management": ["CreateCollection", "DropCollection", "IndexUser"],
        "system_administration": ["CreateUser", "UpdateUser", "SelectUser", "CreateRole"]
    }
    
    # 4. Add privileges to groups
    for group, privileges in privilege_sets.items():
        try:
            client.add_privileges_to_group(group, privileges)
            print(f"✓ Added {len(privileges)} privileges to {group}")
        except Exception as e:
            print(f"Error adding privileges to {group}: {e}")
    
    # 5. Grant privileges to roles
    role_privilege_mapping = {
        "viewer": [("Collection", "Search", "public_*"), ("Collection", "Query", "public_*")],
        "editor": [
            ("Collection", "Search", "*"), ("Collection", "Query", "*"),
            ("Collection", "Insert", "user_*"), ("Collection", "Delete", "user_*")
        ],
        "analyst": [
            ("Collection", "Search", "*"), ("Collection", "Query", "*"),
            ("Collection", "IndexUser", "*")
        ],
        "admin": [("Global", "CreateCollection", "*"), ("Global", "DropCollection", "*")]
    }
    
    for role, privileges in role_privilege_mapping.items():
        for obj_type, privilege, obj_name in privileges:
            try:
                client.grant_privilege(role, obj_type, privilege, obj_name)
                print(f"✓ Granted {privilege} on {obj_name} to {role}")
            except Exception as e:
                print(f"Error granting privilege: {e}")

# Setup RBAC system
setup_rbac_system()

Multi-Tenant Access Control

def setup_tenant_isolation(tenant_name: str):
    """Setup isolated access for a tenant"""
    
    # Create tenant-specific role
    tenant_role = f"tenant_{tenant_name}"
    tenant_user = f"user_{tenant_name}"
    tenant_collections = f"{tenant_name}_*"
    
    try:
        # Create role for tenant
        client.create_role(tenant_role)
        
        # Grant tenant-specific privileges
        tenant_privileges = [
            ("Collection", "Search", tenant_collections),
            ("Collection", "Query", tenant_collections),
            ("Collection", "Insert", tenant_collections),
            ("Collection", "Delete", tenant_collections),
            ("Collection", "Upsert", tenant_collections),
        ]
        
        for obj_type, privilege, obj_name in tenant_privileges:
            client.grant_privilege(tenant_role, obj_type, privilege, obj_name)
        
        print(f"✓ Setup access control for tenant: {tenant_name}")
        
        return {
            "role": tenant_role,
            "collections_pattern": tenant_collections,
            "privileges": [p[1] for p in tenant_privileges]
        }
        
    except Exception as e:
        print(f"✗ Failed to setup tenant {tenant_name}: {e}")
        return None

# Setup isolation for multiple tenants
tenants = ["acme_corp", "tech_startup", "retail_chain"]
tenant_configs = {}

for tenant in tenants:
    config = setup_tenant_isolation(tenant)
    if config:
        tenant_configs[tenant] = config

print(f"Configured {len(tenant_configs)} tenant access controls")

Security Audit Functions

def audit_user_permissions():
    """Audit all user permissions and roles"""
    
    users = client.list_users()
    roles = client.list_roles()
    
    audit_report = {
        "users": {},
        "roles": {},
        "summary": {}
    }
    
    # Audit users
    for user in users:
        try:
            user_info = client.describe_user(user)
            audit_report["users"][user] = user_info
        except Exception as e:
            audit_report["users"][user] = {"error": str(e)}
    
    # Audit roles
    for role in roles:
        try:
            role_info = client.describe_role(role)
            audit_report["roles"][role] = role_info
        except Exception as e:
            audit_report["roles"][role] = {"error": str(e)}
    
    # Generate summary
    audit_report["summary"] = {
        "total_users": len(users),
        "total_roles": len(roles),
        "users_with_roles": len([u for u in audit_report["users"].values() if u.get("roles")]),
        "roles_with_privileges": len([r for r in audit_report["roles"].values() if r.get("privileges")])
    }
    
    return audit_report

# Run security audit
audit = audit_user_permissions()
print(f"Security Audit Summary:")
print(f"  Total Users: {audit['summary']['total_users']}")
print(f"  Total Roles: {audit['summary']['total_roles']}")
print(f"  Users with Roles: {audit['summary']['users_with_roles']}")
print(f"  Roles with Privileges: {audit['summary']['roles_with_privileges']}")

Using ORM Role Class

from pymilvus import Role

# Create Role object
role = Role("analytics_team")

# Role operations
def create(self, timeout: Optional[float] = None) -> None
def drop(self, timeout: Optional[float] = None) -> None
def describe(self, timeout: Optional[float] = None) -> Dict[str, Any]
def add_user(self, username: str, timeout: Optional[float] = None) -> None
def remove_user(self, username: str, timeout: Optional[float] = None) -> None
def grant(self, object_type: str, privilege: str, object_name: str, timeout: Optional[float] = None) -> None
def revoke(self, object_type: str, privilege: str, object_name: str, timeout: Optional[float] = None) -> None
def list_grants(self, timeout: Optional[float] = None) -> List[Dict[str, Any]]
def list_users(self, timeout: Optional[float] = None) -> List[str]
# Using Role ORM class
analytics_role = Role("analytics_team")

# Create the role
analytics_role.create()

# Grant privileges using ORM
analytics_role.grant("Collection", "Search", "analytics_data")
analytics_role.grant("Collection", "Query", "analytics_data")

# Add users to role
analytics_role.add_user("analyst1")
analytics_role.add_user("analyst2")

# List role information
grants = analytics_role.list_grants()
users = analytics_role.list_users()

print(f"Role grants: {grants}")
print(f"Role users: {users}")

# Get detailed role information
role_info = analytics_role.describe()
print(f"Role description: {role_info}")

PyMilvus user management provides enterprise-grade authentication and authorization capabilities, enabling secure multi-tenant vector database deployments with fine-grained access control and comprehensive auditing features.

Install with Tessl CLI

npx tessl i tessl/pypi-pymilvus

docs

data-management.md

index-management.md

index.md

milvus-client.md

orm-collection.md

search-operations.md

types-enums.md

user-management.md

utility-functions.md

tile.json