Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
User and role management, API key operations, token management, and security configuration for Elasticsearch clusters with security features enabled. These operations provide comprehensive security administration capabilities.
Authenticate users and validate credentials.
def authenticate(
self,
**kwargs
) -> ObjectApiResponse:
"""
Authenticate the current user and return user information.
Returns:
ObjectApiResponse with authenticated user details including username, roles, and metadata
"""
def get_token(
self,
grant_type: str,
username: Optional[str] = None,
password: Optional[str] = None,
refresh_token: Optional[str] = None,
scope: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get authentication token.
Parameters:
- grant_type: Grant type (password, refresh_token, client_credentials)
- username: Username for password grant
- password: Password for password grant
- refresh_token: Refresh token for token refresh
- scope: Requested access scope
Returns:
ObjectApiResponse with access token and refresh token
"""
def invalidate_token(
self,
token: Optional[str] = None,
refresh_token: Optional[str] = None,
realm_name: Optional[str] = None,
username: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Invalidate authentication tokens.
Parameters:
- token: Access token to invalidate
- refresh_token: Refresh token to invalidate
- realm_name: Realm name to invalidate tokens for
- username: Username to invalidate tokens for
Returns:
ObjectApiResponse with invalidation result
"""Create, update, and manage user accounts.
def create_user(
self,
username: str,
password: Optional[str] = None,
password_hash: Optional[str] = None,
roles: Optional[List[str]] = None,
full_name: Optional[str] = None,
email: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
enabled: Optional[bool] = None,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create a new user.
Parameters:
- username: Username for the new user
- password: Password in plain text
- password_hash: Pre-hashed password
- roles: List of role names to assign
- full_name: Full display name
- email: Email address
- metadata: Additional user metadata
- enabled: Whether user is enabled
- refresh: Refresh policy for the request
Returns:
ObjectApiResponse with user creation result
"""
def put_user(
self,
username: str,
password: Optional[str] = None,
password_hash: Optional[str] = None,
roles: Optional[List[str]] = None,
full_name: Optional[str] = None,
email: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
enabled: Optional[bool] = None,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create or update a user.
Parameters: Same as create_user
Returns:
ObjectApiResponse with user creation or update result
"""
def get_user(
self,
username: Optional[Union[str, List[str]]] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get user information.
Parameters:
- username: Username(s) to retrieve, or None for all users
Returns:
ObjectApiResponse with user information
"""
def delete_user(
self,
username: str,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Delete a user.
Parameters:
- username: Username to delete
- refresh: Refresh policy for the request
Returns:
ObjectApiResponse with deletion result
"""
def enable_user(
self,
username: str,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Enable a user account.
Parameters:
- username: Username to enable
- refresh: Refresh policy
Returns:
ObjectApiResponse with enable result
"""
def disable_user(
self,
username: str,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Disable a user account.
Parameters:
- username: Username to disable
- refresh: Refresh policy
Returns:
ObjectApiResponse with disable result
"""
def change_password(
self,
username: str,
password: str,
password_hash: Optional[str] = None,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Change user password.
Parameters:
- username: Username to change password for
- password: New password in plain text
- password_hash: New pre-hashed password
- refresh: Refresh policy
Returns:
ObjectApiResponse with password change result
"""Define and manage user roles and permissions.
def create_role(
self,
name: str,
cluster: Optional[List[str]] = None,
indices: Optional[List[Dict[str, Any]]] = None,
applications: Optional[List[Dict[str, Any]]] = None,
run_as: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
transient_metadata: Optional[Dict[str, Any]] = None,
global_: Optional[Dict[str, Any]] = None,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create a role.
Parameters:
- name: Role name
- cluster: List of cluster privileges
- indices: List of index privilege specifications
- applications: List of application privilege specifications
- run_as: List of usernames this role can impersonate
- metadata: Role metadata
- transient_metadata: Transient metadata
- global_: Global privileges configuration
- refresh: Refresh policy
Returns:
ObjectApiResponse with role creation result
"""
def put_role(
self,
name: str,
cluster: Optional[List[str]] = None,
indices: Optional[List[Dict[str, Any]]] = None,
applications: Optional[List[Dict[str, Any]]] = None,
run_as: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
transient_metadata: Optional[Dict[str, Any]] = None,
global_: Optional[Dict[str, Any]] = None,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create or update a role.
Parameters: Same as create_role
Returns:
ObjectApiResponse with role creation or update result
"""
def get_role(
self,
name: Optional[Union[str, List[str]]] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get role information.
Parameters:
- name: Role name(s) to retrieve, or None for all roles
Returns:
ObjectApiResponse with role information
"""
def delete_role(
self,
name: str,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Delete a role.
Parameters:
- name: Role name to delete
- refresh: Refresh policy
Returns:
ObjectApiResponse with deletion result
"""Generate and manage API keys for authentication.
def create_api_key(
self,
name: str,
role_descriptors: Optional[Dict[str, Any]] = None,
expiration: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create an API key.
Parameters:
- name: API key name
- role_descriptors: Role definitions for the API key
- expiration: Expiration time for the key
- metadata: API key metadata
- refresh: Refresh policy
Returns:
ObjectApiResponse with API key details including the key value
"""
def get_api_key(
self,
id: Optional[str] = None,
name: Optional[str] = None,
realm_name: Optional[str] = None,
username: Optional[str] = None,
owner: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get API key information.
Parameters:
- id: API key ID
- name: API key name
- realm_name: Realm name
- username: Username
- owner: Whether to return only keys owned by authenticated user
Returns:
ObjectApiResponse with API key information
"""
def invalidate_api_key(
self,
id: Optional[str] = None,
name: Optional[str] = None,
realm_name: Optional[str] = None,
username: Optional[str] = None,
owner: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Invalidate API keys.
Parameters:
- id: API key ID to invalidate
- name: API key name to invalidate
- realm_name: Realm name to invalidate keys for
- username: Username to invalidate keys for
- owner: Whether to invalidate only keys owned by authenticated user
Returns:
ObjectApiResponse with invalidation result
"""
def grant_api_key(
self,
grant_type: str,
api_key: Dict[str, Any],
access_token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Grant API key on behalf of another user.
Parameters:
- grant_type: Grant type (password, access_token)
- api_key: API key specification
- access_token: Access token for access_token grant
- username: Username for password grant
- password: Password for password grant
Returns:
ObjectApiResponse with granted API key
"""Manage application privileges and check user permissions.
def put_privileges(
self,
privileges: Dict[str, Any],
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create or update application privileges.
Parameters:
- privileges: Privilege definitions by application
- refresh: Refresh policy
Returns:
ObjectApiResponse with privilege creation result
"""
def get_privileges(
self,
application: Optional[str] = None,
name: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get application privileges.
Parameters:
- application: Application name
- name: Privilege name
Returns:
ObjectApiResponse with privilege information
"""
def delete_privileges(
self,
application: str,
name: str,
refresh: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Delete application privileges.
Parameters:
- application: Application name
- name: Privilege name
- refresh: Refresh policy
Returns:
ObjectApiResponse with deletion result
"""
def has_privileges(
self,
user: Optional[str] = None,
cluster: Optional[List[str]] = None,
index: Optional[List[Dict[str, Any]]] = None,
application: Optional[List[Dict[str, Any]]] = None,
**kwargs
) -> ObjectApiResponse:
"""
Check user privileges.
Parameters:
- user: Username to check privileges for
- cluster: Cluster privileges to check
- index: Index privileges to check
- application: Application privileges to check
Returns:
ObjectApiResponse with privilege check results
"""Manage SSL certificates and certificate authorities.
def get_builtin_privileges(
self,
**kwargs
) -> ObjectApiResponse:
"""
Get built-in cluster and index privileges.
Returns:
ObjectApiResponse with built-in privilege definitions
"""
def clear_cached_realms(
self,
realms: List[str],
**kwargs
) -> ObjectApiResponse:
"""
Clear cached realm information.
Parameters:
- realms: List of realm names to clear
Returns:
ObjectApiResponse with cache clearing result
"""
def clear_cached_roles(
self,
name: List[str],
**kwargs
) -> ObjectApiResponse:
"""
Clear cached role information.
Parameters:
- name: List of role names to clear from cache
Returns:
ObjectApiResponse with cache clearing result
"""
def clear_cached_privileges(
self,
application: List[str],
**kwargs
) -> ObjectApiResponse:
"""
Clear cached application privileges.
Parameters:
- application: List of application names to clear
Returns:
ObjectApiResponse with cache clearing result
"""from elasticsearch import Elasticsearch
client = Elasticsearch(
hosts=['https://localhost:9200'],
http_auth=('admin', 'admin_password'),
verify_certs=True
)
# Create a new user
client.security.create_user(
username="analyst_user",
password="secure_password123",
roles=["data_analyst", "kibana_user"],
full_name="Data Analyst",
email="analyst@company.com",
metadata={
"department": "analytics",
"hire_date": "2024-01-01"
},
enabled=True
)
# Get user information
user_info = client.security.get_user(username="analyst_user")
print(f"User roles: {user_info.body['analyst_user']['roles']}")
# Update user roles
client.security.put_user(
username="analyst_user",
roles=["data_analyst", "kibana_user", "monitoring_user"]
)
# Change user password
client.security.change_password(
username="analyst_user",
password="new_secure_password456"
)
# Disable user temporarily
client.security.disable_user(username="analyst_user")
# Re-enable user
client.security.enable_user(username="analyst_user")# Create a custom role with specific privileges
client.security.create_role(
name="custom_data_analyst",
cluster=["monitor", "manage_index_templates"],
indices=[
{
"names": ["sales-*", "analytics-*"],
"privileges": ["read", "view_index_metadata"],
"field_security": {
"grant": ["*"],
"except": ["sensitive_field"]
},
"query": {
"term": {"department": "sales"}
}
},
{
"names": ["logs-*"],
"privileges": ["read"],
"query": {
"range": {
"@timestamp": {"gte": "now-30d"}
}
}
}
],
applications=[
{
"application": "kibana-.kibana",
"privileges": ["feature_dashboard.read", "feature_visualize.read"],
"resources": ["space:default"]
}
],
metadata={
"created_by": "admin",
"description": "Custom role for data analysts"
}
)
# Get role information
role_info = client.security.get_role(name="custom_data_analyst")
for role_name, role_def in role_info.body.items():
print(f"Role: {role_name}")
print(f"Cluster privileges: {role_def['cluster']}")
print(f"Index privileges: {len(role_def['indices'])} definitions")
# Update role to add run_as privilege
client.security.put_role(
name="custom_data_analyst",
cluster=["monitor", "manage_index_templates"],
indices=[
{
"names": ["sales-*", "analytics-*", "reports-*"],
"privileges": ["read", "view_index_metadata"]
}
],
run_as=["report_user"]
)# Create API key for application access
api_key_response = client.security.create_api_key(
name="analytics_app_key",
role_descriptors={
"analytics_role": {
"cluster": ["monitor"],
"indices": [
{
"names": ["analytics-*"],
"privileges": ["read", "write"]
}
]
}
},
expiration="90d",
metadata={
"application": "analytics_dashboard",
"environment": "production"
}
)
api_key = api_key_response.body['api_key']
api_key_id = api_key_response.body['id']
print(f"API Key created: {api_key_id}")
# Use API key for authentication
api_client = Elasticsearch(
hosts=['https://localhost:9200'],
api_key=(api_key_id, api_key),
verify_certs=True
)
# List API keys
keys = client.security.get_api_key(owner=True)
for key in keys.body['api_keys']:
print(f"Key: {key['name']}, Created: {key['creation']}, Status: {'Valid' if not key['invalidated'] else 'Invalid'}")
# Invalidate API key when no longer needed
client.security.invalidate_api_key(id=api_key_id)# Authenticate current user
auth_info = client.security.authenticate()
current_user = auth_info.body
print(f"Authenticated as: {current_user['username']}")
print(f"Roles: {current_user['roles']}")
print(f"Full name: {current_user['full_name']}")
# Check user privileges
privilege_check = client.security.has_privileges(
cluster=["monitor", "manage_index_templates"],
index=[
{
"names": ["sales-*"],
"privileges": ["read", "write"]
},
{
"names": ["logs-*"],
"privileges": ["read"]
}
]
)
print(f"Has cluster monitor privilege: {privilege_check.body['cluster']['monitor']}")
for index_check in privilege_check.body['index']:
index_name = list(index_check.keys())[0]
privileges = index_check[index_name]
print(f"Index {index_name} privileges: {privileges}")
# Get available built-in privileges
builtin_privileges = client.security.get_builtin_privileges()
print("Available cluster privileges:", builtin_privileges.body['cluster'])
print("Available index privileges:", builtin_privileges.body['index'])# Get authentication token
token_response = client.security.get_token(
grant_type="password",
username="analyst_user",
password="secure_password123"
)
access_token = token_response.body['access_token']
refresh_token = token_response.body['refresh_token']
# Use token for authentication
token_client = Elasticsearch(
hosts=['https://localhost:9200'],
headers={"Authorization": f"Bearer {access_token}"},
verify_certs=True
)
# Refresh token when needed
new_token_response = client.security.get_token(
grant_type="refresh_token",
refresh_token=refresh_token
)
# Invalidate tokens when done
client.security.invalidate_token(
token=access_token,
refresh_token=refresh_token
)# Create application privileges
client.security.put_privileges(
privileges={
"myapp": {
"read": {
"application": "myapp",
"name": "read",
"actions": ["data:read/*", "action:login"],
"metadata": {
"description": "Read access to myapp"
}
},
"write": {
"application": "myapp",
"name": "write",
"actions": ["data:write/*", "data:read/*", "action:login"],
"metadata": {
"description": "Write access to myapp"
}
}
}
}
)
# Create role with application privileges
client.security.create_role(
name="myapp_user",
applications=[
{
"application": "myapp",
"privileges": ["read"],
"resources": ["resource1", "resource2"]
}
]
)
# Clear caches for performance
client.security.clear_cached_roles(name=["myapp_user"])
client.security.clear_cached_privileges(application=["myapp"])Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch