Python SDK for Milvus vector database with comprehensive functionality for connecting to servers, managing collections, and performing vector operations.
—
PyMilvus provides comprehensive authentication, authorization, and access control capabilities including user management, role-based access control (RBAC), privilege groups, and resource management. This enables secure multi-tenant deployments with fine-grained permissions.
from pymilvus import MilvusClient
client = MilvusClient()
def create_user(
user_name: str,
password: str,
timeout: Optional[float] = None
) -> None# Create new user
client.create_user("data_analyst", "secure_password_123")
# Create user with special characters in password
client.create_user("ml_engineer", "P@ssw0rd!2024")
# Multiple user creation
users_to_create = [
("reader_user", "read_only_pass"),
("writer_user", "write_access_pass"),
("admin_user", "admin_secure_pass")
]
for username, password in users_to_create:
try:
client.create_user(username, password)
print(f"✓ Created user: {username}")
except Exception as e:
print(f"✗ Failed to create user {username}: {e}")def list_users(
timeout: Optional[float] = None
) -> List[str]
def describe_user(
user_name: str,
timeout: Optional[float] = None
) -> Dict[str, Any]# List all users
users = client.list_users()
print(f"System users: {users}")
# Get detailed user information
user_info = client.describe_user("data_analyst")
print(f"User info: {user_info}")
# Check user roles and privileges
for user in users:
try:
info = client.describe_user(user)
roles = info.get('roles', [])
print(f"User {user}: roles = {roles}")
except Exception as e:
print(f"Error getting info for {user}: {e}")def update_password(
user_name: str,
old_password: str,
new_password: str,
timeout: Optional[float] = None
) -> None
def reset_password(
user_name: str,
new_password: str,
timeout: Optional[float] = None
) -> None# Update password (requires current password)
client.update_password("data_analyst", "old_password", "new_secure_password")
# Reset password (admin operation, doesn't require old password)
client.reset_password("data_analyst", "admin_reset_password")
# Password update with error handling
def safe_password_update(username: str, old_pass: str, new_pass: str):
"""Safely update password with validation"""
# Basic password validation
if len(new_pass) < 8:
raise ValueError("Password must be at least 8 characters")
if not any(c.isupper() for c in new_pass):
raise ValueError("Password must contain uppercase letter")
if not any(c.isdigit() for c in new_pass):
raise ValueError("Password must contain digit")
try:
client.update_password(username, old_pass, new_pass)
print(f"Password updated for user: {username}")
return True
except Exception as e:
print(f"Password update failed: {e}")
return False
# Usage
success = safe_password_update("data_analyst", "old_pass", "NewSecure123!")def drop_user(
user_name: str,
timeout: Optional[float] = None
) -> None# Delete single user
client.drop_user("obsolete_user")
# Bulk user deletion with confirmation
def bulk_delete_users(usernames: List[str], confirm: bool = False):
"""Safely delete multiple users"""
if not confirm:
print("This will delete the following users:")
for username in usernames:
print(f" - {username}")
print("Set confirm=True to proceed")
return
deleted_count = 0
for username in usernames:
try:
# Check if user exists first
users = client.list_users()
if username in users:
client.drop_user(username)
deleted_count += 1
print(f"✓ Deleted user: {username}")
else:
print(f"- User {username} doesn't exist")
except Exception as e:
print(f"✗ Failed to delete {username}: {e}")
print(f"Deleted {deleted_count}/{len(usernames)} users")
# Usage
bulk_delete_users(["temp_user1", "test_user2"], confirm=True)def create_role(
role_name: str,
timeout: Optional[float] = None
) -> None
def drop_role(
role_name: str,
timeout: Optional[float] = None
) -> None
def list_roles(
timeout: Optional[float] = None
) -> List[str]
def describe_role(
role_name: str,
timeout: Optional[float] = None
) -> Dict[str, Any]# Create roles for different access levels
roles_to_create = [
"data_reader", # Read-only access
"data_writer", # Read + write access
"collection_admin", # Collection management
"system_admin" # Full system access
]
for role in roles_to_create:
try:
client.create_role(role)
print(f"✓ Created role: {role}")
except Exception as e:
print(f"✗ Role creation failed for {role}: {e}")
# List and describe roles
roles = client.list_roles()
print(f"Available roles: {roles}")
for role in roles:
role_info = client.describe_role(role)
print(f"Role {role}: {role_info}")def grant_role(
user_name: str,
role_name: str,
timeout: Optional[float] = None
) -> None
def revoke_role(
user_name: str,
role_name: str,
timeout: Optional[float] = None
) -> None# Assign roles to users
role_assignments = {
"analyst1": ["data_reader"],
"analyst2": ["data_reader", "data_writer"],
"admin1": ["collection_admin", "system_admin"],
"service_user": ["data_writer"]
}
for username, roles in role_assignments.items():
for role in roles:
try:
client.grant_role(username, role)
print(f"✓ Granted {role} to {username}")
except Exception as e:
print(f"✗ Failed to grant {role} to {username}: {e}")
# Revoke role from user
client.revoke_role("analyst1", "data_writer")
print("Revoked data_writer role from analyst1")def grant_privilege(
role_name: str,
object_type: str,
privilege: str,
object_name: str,
db_name: str = "",
timeout: Optional[float] = None
) -> None
def revoke_privilege(
role_name: str,
object_type: str,
privilege: str,
object_name: str,
db_name: str = "",
timeout: Optional[float] = None
) -> NoneObject Types:
"Global": System-level privileges"Collection": Collection-specific privileges"User": User management privilegesCommon Privileges:
"Insert", "Delete", "Upsert": Data modification"Search", "Query": Data access"IndexUser": Index management"CollectionAdmin": Collection administration"DatabaseAdmin": Database administration# Grant collection-level privileges
collection_privileges = [
("data_reader", "Collection", "Search", "public_data"),
("data_reader", "Collection", "Query", "public_data"),
("data_writer", "Collection", "Insert", "user_content"),
("data_writer", "Collection", "Delete", "user_content"),
("data_writer", "Collection", "Upsert", "user_content"),
]
for role, obj_type, privilege, obj_name in collection_privileges:
try:
client.grant_privilege(role, obj_type, privilege, obj_name)
print(f"✓ Granted {privilege} on {obj_name} to {role}")
except Exception as e:
print(f"✗ Failed to grant {privilege}: {e}")
# Grant global privileges
client.grant_privilege("collection_admin", "Global", "CreateCollection", "*")
client.grant_privilege("collection_admin", "Global", "DropCollection", "*")
# Grant user management privileges
client.grant_privilege("system_admin", "User", "UpdateUser", "*")
client.grant_privilege("system_admin", "User", "SelectUser", "*")def grant_privilege_v2(
role_name: str,
privilege: str,
collection_name: str = "*",
db_name: str = "",
timeout: Optional[float] = None
) -> None
def revoke_privilege_v2(
role_name: str,
privilege: str,
collection_name: str = "*",
db_name: str = "",
timeout: Optional[float] = None
) -> None# V2 privilege management with simplified syntax
v2_privileges = [
("analyst_role", "Query", "analytics_data"),
("analyst_role", "Search", "analytics_data"),
("writer_role", "Insert", "content_collection"),
("writer_role", "Delete", "content_collection"),
("admin_role", "DropCollection", "*"), # Global privilege
]
for role, privilege, collection in v2_privileges:
try:
client.grant_privilege_v2(role, privilege, collection)
print(f"✓ Granted {privilege} on {collection} to {role}")
except Exception as e:
print(f"✗ V2 privilege grant failed: {e}")def create_privilege_group(
group_name: str,
timeout: Optional[float] = None
) -> None
def drop_privilege_group(
group_name: str,
timeout: Optional[float] = None
) -> None
def list_privilege_groups(
timeout: Optional[float] = None
) -> List[str]# Create privilege groups for common access patterns
privilege_groups = [
"read_only_group",
"content_editor_group",
"analytics_group",
"admin_group"
]
for group in privilege_groups:
try:
client.create_privilege_group(group)
print(f"✓ Created privilege group: {group}")
except Exception as e:
print(f"✗ Failed to create group {group}: {e}")
# List privilege groups
groups = client.list_privilege_groups()
print(f"Privilege groups: {groups}")def add_privileges_to_group(
group_name: str,
privileges: List[str],
timeout: Optional[float] = None
) -> None
def remove_privileges_from_group(
group_name: str,
privileges: List[str],
timeout: Optional[float] = None
) -> None# Define privilege sets for different groups
group_privileges = {
"read_only_group": ["Query", "Search"],
"content_editor_group": ["Query", "Search", "Insert", "Delete", "Upsert"],
"analytics_group": ["Query", "Search", "IndexUser"],
"admin_group": ["Query", "Search", "Insert", "Delete", "Upsert", "CreateCollection", "DropCollection"]
}
# Add privileges to groups
for group_name, privileges in group_privileges.items():
try:
client.add_privileges_to_group(group_name, privileges)
print(f"✓ Added {len(privileges)} privileges to {group_name}")
except Exception as e:
print(f"✗ Failed to add privileges to {group_name}: {e}")
# Remove specific privileges from group
client.remove_privileges_from_group("analytics_group", ["IndexUser"])
print("Removed IndexUser privilege from analytics_group")def create_database(
db_name: str,
timeout: Optional[float] = None
) -> None
def drop_database(
db_name: str,
timeout: Optional[float] = None
) -> None
def list_databases(
timeout: Optional[float] = None
) -> List[str]
def describe_database(
db_name: str,
timeout: Optional[float] = None
) -> Dict[str, Any]# Create databases for different environments
environments = ["development", "staging", "production", "analytics"]
for env in environments:
try:
client.create_database(env)
print(f"✓ Created database: {env}")
except Exception as e:
print(f"✗ Failed to create database {env}: {e}")
# List and describe databases
databases = client.list_databases()
print(f"Available databases: {databases}")
for db in databases:
db_info = client.describe_database(db)
print(f"Database {db}: {db_info}")def use_database(
db_name: str,
timeout: Optional[float] = None
) -> None
def using_database(
db_name: str,
timeout: Optional[float] = None
) -> ContextManager# Switch to specific database
client.use_database("production")
print("Switched to production database")
# Use context manager for temporary database switch
with client.using_database("analytics"):
# Operations in this block use the analytics database
collections = client.list_collections()
print(f"Analytics collections: {collections}")
# Back to previous database context
collections = client.list_collections()
print(f"Production collections: {collections}")def alter_database_properties(
db_name: str,
properties: Dict[str, str],
timeout: Optional[float] = None
) -> None
def drop_database_properties(
db_name: str,
property_keys: List[str],
timeout: Optional[float] = None
) -> None# Set database properties
db_properties = {
"description": "Production database for AI applications",
"environment": "production",
"owner": "ml_team"
}
client.alter_database_properties("production", db_properties)
# Remove specific properties
client.drop_database_properties("production", ["owner"])def create_alias(
collection_name: str,
alias: str,
timeout: Optional[float] = None
) -> None
def drop_alias(
alias: str,
timeout: Optional[float] = None
) -> None
def alter_alias(
collection_name: str,
alias: str,
timeout: Optional[float] = None
) -> None
def describe_alias(
alias: str,
timeout: Optional[float] = None
) -> Dict[str, Any]
def list_aliases(
collection_name: str,
timeout: Optional[float] = None
) -> List[str]# Create aliases for collections
alias_mappings = {
"products_v2": "products_latest",
"user_embeddings_2024": "current_embeddings",
"search_logs_december": "recent_logs"
}
for collection, alias in alias_mappings.items():
try:
client.create_alias(collection, alias)
print(f"✓ Created alias {alias} -> {collection}")
except Exception as e:
print(f"✗ Failed to create alias {alias}: {e}")
# Switch alias to new collection (blue-green deployment)
client.alter_alias("products_v3", "products_latest")
print("Switched products_latest alias to products_v3")
# Get alias information
alias_info = client.describe_alias("products_latest")
print(f"Alias info: {alias_info}")
# List aliases for collection
aliases = client.list_aliases("products_v3")
print(f"Aliases for products_v3: {aliases}")def setup_rbac_system():
"""Setup comprehensive RBAC system"""
# 1. Create roles
roles = {
"viewer": "Read-only access to data",
"editor": "Read and write access to specific collections",
"analyst": "Read access + query capabilities",
"admin": "Full administrative access"
}
for role_name, description in roles.items():
try:
client.create_role(role_name)
print(f"✓ Created role: {role_name} - {description}")
except Exception as e:
print(f"Role {role_name} might already exist: {e}")
# 2. Create privilege groups
groups = ["basic_read", "data_modification", "collection_management", "system_administration"]
for group in groups:
try:
client.create_privilege_group(group)
except Exception as e:
print(f"Group {group} might already exist: {e}")
# 3. Define privilege sets
privilege_sets = {
"basic_read": ["Query", "Search"],
"data_modification": ["Insert", "Delete", "Upsert"],
"collection_management": ["CreateCollection", "DropCollection", "IndexUser"],
"system_administration": ["CreateUser", "UpdateUser", "SelectUser", "CreateRole"]
}
# 4. Add privileges to groups
for group, privileges in privilege_sets.items():
try:
client.add_privileges_to_group(group, privileges)
print(f"✓ Added {len(privileges)} privileges to {group}")
except Exception as e:
print(f"Error adding privileges to {group}: {e}")
# 5. Grant privileges to roles
role_privilege_mapping = {
"viewer": [("Collection", "Search", "public_*"), ("Collection", "Query", "public_*")],
"editor": [
("Collection", "Search", "*"), ("Collection", "Query", "*"),
("Collection", "Insert", "user_*"), ("Collection", "Delete", "user_*")
],
"analyst": [
("Collection", "Search", "*"), ("Collection", "Query", "*"),
("Collection", "IndexUser", "*")
],
"admin": [("Global", "CreateCollection", "*"), ("Global", "DropCollection", "*")]
}
for role, privileges in role_privilege_mapping.items():
for obj_type, privilege, obj_name in privileges:
try:
client.grant_privilege(role, obj_type, privilege, obj_name)
print(f"✓ Granted {privilege} on {obj_name} to {role}")
except Exception as e:
print(f"Error granting privilege: {e}")
# Setup RBAC system
setup_rbac_system()def setup_tenant_isolation(tenant_name: str):
"""Setup isolated access for a tenant"""
# Create tenant-specific role
tenant_role = f"tenant_{tenant_name}"
tenant_user = f"user_{tenant_name}"
tenant_collections = f"{tenant_name}_*"
try:
# Create role for tenant
client.create_role(tenant_role)
# Grant tenant-specific privileges
tenant_privileges = [
("Collection", "Search", tenant_collections),
("Collection", "Query", tenant_collections),
("Collection", "Insert", tenant_collections),
("Collection", "Delete", tenant_collections),
("Collection", "Upsert", tenant_collections),
]
for obj_type, privilege, obj_name in tenant_privileges:
client.grant_privilege(tenant_role, obj_type, privilege, obj_name)
print(f"✓ Setup access control for tenant: {tenant_name}")
return {
"role": tenant_role,
"collections_pattern": tenant_collections,
"privileges": [p[1] for p in tenant_privileges]
}
except Exception as e:
print(f"✗ Failed to setup tenant {tenant_name}: {e}")
return None
# Setup isolation for multiple tenants
tenants = ["acme_corp", "tech_startup", "retail_chain"]
tenant_configs = {}
for tenant in tenants:
config = setup_tenant_isolation(tenant)
if config:
tenant_configs[tenant] = config
print(f"Configured {len(tenant_configs)} tenant access controls")def audit_user_permissions():
"""Audit all user permissions and roles"""
users = client.list_users()
roles = client.list_roles()
audit_report = {
"users": {},
"roles": {},
"summary": {}
}
# Audit users
for user in users:
try:
user_info = client.describe_user(user)
audit_report["users"][user] = user_info
except Exception as e:
audit_report["users"][user] = {"error": str(e)}
# Audit roles
for role in roles:
try:
role_info = client.describe_role(role)
audit_report["roles"][role] = role_info
except Exception as e:
audit_report["roles"][role] = {"error": str(e)}
# Generate summary
audit_report["summary"] = {
"total_users": len(users),
"total_roles": len(roles),
"users_with_roles": len([u for u in audit_report["users"].values() if u.get("roles")]),
"roles_with_privileges": len([r for r in audit_report["roles"].values() if r.get("privileges")])
}
return audit_report
# Run security audit
audit = audit_user_permissions()
print(f"Security Audit Summary:")
print(f" Total Users: {audit['summary']['total_users']}")
print(f" Total Roles: {audit['summary']['total_roles']}")
print(f" Users with Roles: {audit['summary']['users_with_roles']}")
print(f" Roles with Privileges: {audit['summary']['roles_with_privileges']}")from pymilvus import Role
# Create Role object
role = Role("analytics_team")
# Role operations
def create(self, timeout: Optional[float] = None) -> None
def drop(self, timeout: Optional[float] = None) -> None
def describe(self, timeout: Optional[float] = None) -> Dict[str, Any]
def add_user(self, username: str, timeout: Optional[float] = None) -> None
def remove_user(self, username: str, timeout: Optional[float] = None) -> None
def grant(self, object_type: str, privilege: str, object_name: str, timeout: Optional[float] = None) -> None
def revoke(self, object_type: str, privilege: str, object_name: str, timeout: Optional[float] = None) -> None
def list_grants(self, timeout: Optional[float] = None) -> List[Dict[str, Any]]
def list_users(self, timeout: Optional[float] = None) -> List[str]# Using Role ORM class
analytics_role = Role("analytics_team")
# Create the role
analytics_role.create()
# Grant privileges using ORM
analytics_role.grant("Collection", "Search", "analytics_data")
analytics_role.grant("Collection", "Query", "analytics_data")
# Add users to role
analytics_role.add_user("analyst1")
analytics_role.add_user("analyst2")
# List role information
grants = analytics_role.list_grants()
users = analytics_role.list_users()
print(f"Role grants: {grants}")
print(f"Role users: {users}")
# Get detailed role information
role_info = analytics_role.describe()
print(f"Role description: {role_info}")PyMilvus user management provides enterprise-grade authentication and authorization capabilities, enabling secure multi-tenant vector database deployments with fine-grained access control and comprehensive auditing features.
Install with Tessl CLI
npx tessl i tessl/pypi-pymilvus