High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.
—
Index creation, configuration, and management operations for Elasticsearch indices. Provides comprehensive control over index settings, mappings, aliases, and lifecycle with support for both synchronous and asynchronous operations.
Main class for managing Elasticsearch indices with settings and mapping configuration.
class Index:
"""
Index management class for creating and managing Elasticsearch indices.
"""
def __init__(self, name, using=None):
"""
Initialize index management.
Args:
name (str): Index name
using (str, optional): Connection alias to use
"""
def settings(self, **kwargs):
"""
Configure index settings.
Args:
**kwargs: Index settings
Parameters:
number_of_shards (int): Number of primary shards
number_of_replicas (int): Number of replica shards
refresh_interval (str): Refresh interval
max_result_window (int): Maximum result window size
max_rescore_window (int): Maximum rescore window size
analysis (dict): Analysis configuration (analyzers, tokenizers, etc.)
mapping (dict): Mapping configuration
Returns:
Index: Current Index instance with settings applied
"""
def mapping(self, mapping=None, **kwargs):
"""
Configure index mapping.
Args:
mapping (dict, optional): Complete mapping configuration
**kwargs: Mapping properties
Returns:
Index: Current Index instance with mapping applied
"""
def aliases(self, **kwargs):
"""
Configure index aliases.
Args:
**kwargs: Alias configurations
Returns:
Index: Current Index instance with aliases applied
"""
def create(self, ignore=400, **kwargs):
"""
Create the index.
Args:
ignore (int or list): HTTP status codes to ignore
**kwargs: Additional create parameters
Returns:
dict: Create index response
Raises:
RequestError: If index already exists and ignore=400 not set
"""
def delete(self, ignore=404, **kwargs):
"""
Delete the index.
Args:
ignore (int or list): HTTP status codes to ignore
**kwargs: Additional delete parameters
Returns:
dict: Delete index response
"""
def exists(self, **kwargs):
"""
Check if index exists.
Args:
**kwargs: Additional parameters
Returns:
bool: True if index exists, False otherwise
"""
def close(self, **kwargs):
"""
Close the index.
Args:
**kwargs: Additional close parameters
Returns:
dict: Close index response
"""
def open(self, **kwargs):
"""
Open the index.
Args:
**kwargs: Additional open parameters
Returns:
dict: Open index response
"""
def refresh(self, **kwargs):
"""
Refresh the index.
Args:
**kwargs: Additional refresh parameters
Returns:
dict: Refresh response
"""
def flush(self, **kwargs):
"""
Flush the index.
Args:
**kwargs: Additional flush parameters
Returns:
dict: Flush response
"""
def get_settings(self, **kwargs):
"""
Get index settings.
Args:
**kwargs: Additional parameters
Returns:
dict: Index settings
"""
def get_mapping(self, **kwargs):
"""
Get index mapping.
Args:
**kwargs: Additional parameters
Returns:
dict: Index mapping
"""
def put_settings(self, body, **kwargs):
"""
Update index settings.
Args:
body (dict): Settings to update
**kwargs: Additional parameters
Returns:
dict: Update settings response
"""
def put_mapping(self, body, **kwargs):
"""
Update index mapping.
Args:
body (dict): Mapping to update
**kwargs: Additional parameters
Returns:
dict: Update mapping response
"""
class AsyncIndex:
"""
Async version of Index class for async/await operations.
"""
def __init__(self, name, using=None):
"""Initialize async index management."""
def settings(self, **kwargs):
"""Configure index settings (same as Index)."""
def mapping(self, mapping=None, **kwargs):
"""Configure index mapping (same as Index)."""
def aliases(self, **kwargs):
"""Configure index aliases (same as Index)."""
async def create(self, ignore=400, **kwargs):
"""Async create the index."""
async def delete(self, ignore=404, **kwargs):
"""Async delete the index."""
async def exists(self, **kwargs):
"""Async check if index exists."""
async def close(self, **kwargs):
"""Async close the index."""
async def open(self, **kwargs):
"""Async open the index."""
async def refresh(self, **kwargs):
"""Async refresh the index."""
async def flush(self, **kwargs):
"""Async flush the index."""
async def get_settings(self, **kwargs):
"""Async get index settings."""
async def get_mapping(self, **kwargs):
"""Async get index mapping."""
async def put_settings(self, body, **kwargs):
"""Async update index settings."""
async def put_mapping(self, body, **kwargs):
"""Async update index mapping."""Manage index templates for automatic index configuration.
class IndexTemplate:
"""
Index template management class.
"""
def __init__(self, name, using=None):
"""
Initialize index template management.
Args:
name (str): Template name
using (str, optional): Connection alias to use
"""
def body(self, **kwargs):
"""
Configure template body.
Args:
**kwargs: Template configuration
Parameters:
index_patterns (list): Index patterns to match
template (dict): Template configuration with settings and mappings
priority (int): Template priority
version (int): Template version
composed_of (list): Component templates to compose from
data_stream (dict): Data stream configuration
_meta (dict): Template metadata
Returns:
IndexTemplate: Current instance with body configured
"""
def create(self, **kwargs):
"""
Create the index template.
Args:
**kwargs: Additional create parameters
Returns:
dict: Create template response
"""
def delete(self, **kwargs):
"""
Delete the index template.
Args:
**kwargs: Additional delete parameters
Returns:
dict: Delete template response
"""
def exists(self, **kwargs):
"""
Check if template exists.
Args:
**kwargs: Additional parameters
Returns:
bool: True if template exists, False otherwise
"""
def get(self, **kwargs):
"""
Get the index template.
Args:
**kwargs: Additional parameters
Returns:
dict: Template configuration
"""
class AsyncIndexTemplate:
"""
Async version of IndexTemplate class.
"""
def __init__(self, name, using=None):
"""Initialize async index template management."""
def body(self, **kwargs):
"""Configure template body (same as IndexTemplate)."""
async def create(self, **kwargs):
"""Async create the index template."""
async def delete(self, **kwargs):
"""Async delete the index template."""
async def exists(self, **kwargs):
"""Async check if template exists."""
async def get(self, **kwargs):
"""Async get the index template."""Manage index aliases for flexible index operations.
def get_alias(name=None, index=None, using=None, **kwargs):
"""
Get index aliases.
Args:
name (str, optional): Alias name to get
index (str or list, optional): Index name(s) to get aliases for
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Alias information
"""
def put_alias(index, name, body=None, using=None, **kwargs):
"""
Create or update index alias.
Args:
index (str or list): Index name(s) to alias
name (str): Alias name
body (dict, optional): Alias configuration (filter, routing, etc.)
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Put alias response
"""
def delete_alias(index, name, using=None, **kwargs):
"""
Delete index alias.
Args:
index (str or list): Index name(s) to remove alias from
name (str): Alias name to delete
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Delete alias response
"""
def update_aliases(body, using=None, **kwargs):
"""
Perform multiple alias operations atomically.
Args:
body (dict): Alias operations to perform
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Update aliases response
Body format:
{
"actions": [
{"add": {"index": "index1", "alias": "alias1"}},
{"remove": {"index": "index2", "alias": "alias2"}},
{"remove_index": {"index": "old_index"}}
]
}
"""Additional utility functions for index operations.
def get_mapping(index=None, using=None, **kwargs):
"""
Get mapping for index(es).
Args:
index (str or list, optional): Index name(s)
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Index mappings
"""
def put_mapping(index, body, using=None, **kwargs):
"""
Update mapping for index(es).
Args:
index (str or list): Index name(s) to update
body (dict): Mapping configuration
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Put mapping response
"""
def get_settings(index=None, using=None, **kwargs):
"""
Get settings for index(es).
Args:
index (str or list, optional): Index name(s)
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Index settings
"""
def put_settings(index, body, using=None, **kwargs):
"""
Update settings for index(es).
Args:
index (str or list): Index name(s) to update
body (dict): Settings configuration
using (str, optional): Connection alias to use
**kwargs: Additional parameters
Returns:
dict: Put settings response
"""from elasticsearch_dsl import Index, connections
# Configure connection
connections.create_connection(hosts=['localhost:9200'])
# Create index with settings and mapping
index = Index('blog')
index.settings(
number_of_shards=2,
number_of_replicas=1,
refresh_interval='30s',
analysis={
'analyzer': {
'blog_analyzer': {
'type': 'custom',
'tokenizer': 'standard',
'filter': ['lowercase', 'stop']
}
}
}
)
# Create the index
response = index.create()
print(f"Index created: {response}")
# Check if index exists
if index.exists():
print("Index exists")
# Get index settings and mapping
settings = index.get_settings()
mapping = index.get_mapping()
print(f"Settings: {settings}")
print(f"Mapping: {mapping}")from elasticsearch_dsl import IndexTemplate
# Create index template for log indices
template = IndexTemplate('logs-template')
template.body(
index_patterns=['logs-*'],
template={
'settings': {
'number_of_shards': 1,
'number_of_replicas': 1,
'refresh_interval': '10s'
},
'mappings': {
'properties': {
'timestamp': {'type': 'date'},
'level': {'type': 'keyword'},
'message': {
'type': 'text',
'analyzer': 'standard'
},
'host': {'type': 'keyword'},
'service': {'type': 'keyword'}
}
}
},
priority=100,
version=1
)
# Create the template
template.create()
# Any new index matching 'logs-*' will use this template
log_index = Index('logs-2023-10')
log_index.create() # Will automatically apply template settingsfrom elasticsearch_dsl import put_alias, get_alias, update_aliases
# Create alias for current month's logs
put_alias(
index='logs-2023-10',
name='logs-current',
body={
'filter': {
'range': {
'timestamp': {
'gte': '2023-10-01'
}
}
}
}
)
# Get alias information
aliases = get_alias(name='logs-current')
print(f"Current logs alias: {aliases}")
# Atomic alias operations - switch to new month
update_aliases({
'actions': [
{'remove': {'index': 'logs-2023-10', 'alias': 'logs-current'}},
{'add': {'index': 'logs-2023-11', 'alias': 'logs-current'}}
]
})from elasticsearch_dsl import Index, Document, Text, Keyword, Date
# Define document first
class LogEntry(Document):
timestamp = Date()
level = Keyword()
message = Text(analyzer='log_analyzer')
host = Keyword()
service = Keyword()
class Index:
name = 'application-logs'
settings = {
'number_of_shards': 3,
'number_of_replicas': 2,
'refresh_interval': '5s',
'max_result_window': 50000,
'analysis': {
'analyzer': {
'log_analyzer': {
'type': 'custom',
'tokenizer': 'standard',
'filter': [
'lowercase',
'stop',
'log_patterns'
]
}
},
'filter': {
'log_patterns': {
'type': 'pattern_replace',
'pattern': r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}',
'replacement': 'TIMESTAMP'
}
}
}
}
# Initialize index from document
LogEntry.init()
# Or create index manually with same configuration
index = Index('application-logs')
index.settings(**LogEntry._index._settings)
index.mapping(LogEntry._doc_type.mapping.to_dict())
index.create()from elasticsearch_dsl import Index, connections
from datetime import datetime, timedelta
def rotate_logs():
"""Rotate log indices daily."""
today = datetime.now().strftime('%Y-%m-%d')
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# Create today's index
today_index = Index(f'logs-{today}')
today_index.settings(
number_of_shards=1,
number_of_replicas=1,
refresh_interval='30s'
)
if not today_index.exists():
today_index.create()
print(f"Created index: logs-{today}")
# Update alias to point to today's index
from elasticsearch_dsl import update_aliases
actions = [
{'add': {'index': f'logs-{today}', 'alias': 'logs-current'}}
]
# Remove yesterday's alias if exists
yesterday_index = Index(f'logs-{yesterday}')
if yesterday_index.exists():
actions.append({
'remove': {'index': f'logs-{yesterday}', 'alias': 'logs-current'}
})
update_aliases({'actions': actions})
print(f"Updated logs-current alias to point to logs-{today}")
# Run daily rotation
rotate_logs()import asyncio
from elasticsearch_dsl import AsyncIndex, async_connections
async def async_index_operations():
# Setup async connection
await async_connections.create_connection(hosts=['localhost:9200'])
# Create async index
index = AsyncIndex('async-test')
index.settings(
number_of_shards=1,
number_of_replicas=0
)
# Async operations
if not await index.exists():
create_response = await index.create()
print(f"Created index: {create_response}")
# Get settings asynchronously
settings = await index.get_settings()
print(f"Index settings: {settings}")
# Refresh index
await index.refresh()
# Close and reopen
await index.close()
await index.open()
# Run async operations
asyncio.run(async_index_operations())Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch-dsl