CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch-dsl

High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.

Pending
Overview
Eval results
Files

index-management.mddocs/

Index Management

Index creation, configuration, and management operations for Elasticsearch indices. Provides comprehensive control over index settings, mappings, aliases, and lifecycle with support for both synchronous and asynchronous operations.

Capabilities

Index Management Class

Main class for managing Elasticsearch indices with settings and mapping configuration.

class Index:
    """
    Index management class for creating and managing Elasticsearch indices.
    """
    
    def __init__(self, name, using=None):
        """
        Initialize index management.
        
        Args:
            name (str): Index name
            using (str, optional): Connection alias to use
        """
    
    def settings(self, **kwargs):
        """
        Configure index settings.
        
        Args:
            **kwargs: Index settings
            
        Parameters:
            number_of_shards (int): Number of primary shards
            number_of_replicas (int): Number of replica shards
            refresh_interval (str): Refresh interval
            max_result_window (int): Maximum result window size
            max_rescore_window (int): Maximum rescore window size
            analysis (dict): Analysis configuration (analyzers, tokenizers, etc.)
            mapping (dict): Mapping configuration
            
        Returns:
            Index: Current Index instance with settings applied
        """
    
    def mapping(self, mapping=None, **kwargs):
        """
        Configure index mapping.
        
        Args:
            mapping (dict, optional): Complete mapping configuration
            **kwargs: Mapping properties
            
        Returns:
            Index: Current Index instance with mapping applied
        """
    
    def aliases(self, **kwargs):
        """
        Configure index aliases.
        
        Args:
            **kwargs: Alias configurations
            
        Returns:
            Index: Current Index instance with aliases applied
        """
    
    def create(self, ignore=400, **kwargs):
        """
        Create the index.
        
        Args:
            ignore (int or list): HTTP status codes to ignore
            **kwargs: Additional create parameters
            
        Returns:
            dict: Create index response
            
        Raises:
            RequestError: If index already exists and ignore=400 not set
        """
    
    def delete(self, ignore=404, **kwargs):
        """
        Delete the index.
        
        Args:
            ignore (int or list): HTTP status codes to ignore  
            **kwargs: Additional delete parameters
            
        Returns:
            dict: Delete index response
        """
    
    def exists(self, **kwargs):
        """
        Check if index exists.
        
        Args:
            **kwargs: Additional parameters
            
        Returns:
            bool: True if index exists, False otherwise
        """
    
    def close(self, **kwargs):
        """
        Close the index.
        
        Args:
            **kwargs: Additional close parameters
            
        Returns:
            dict: Close index response
        """
    
    def open(self, **kwargs):
        """
        Open the index.
        
        Args:
            **kwargs: Additional open parameters
            
        Returns:
            dict: Open index response
        """
    
    def refresh(self, **kwargs):
        """
        Refresh the index.
        
        Args:
            **kwargs: Additional refresh parameters
            
        Returns:
            dict: Refresh response
        """
    
    def flush(self, **kwargs):
        """
        Flush the index.
        
        Args:
            **kwargs: Additional flush parameters
            
        Returns:
            dict: Flush response
        """
    
    def get_settings(self, **kwargs):
        """
        Get index settings.
        
        Args:
            **kwargs: Additional parameters
            
        Returns:
            dict: Index settings
        """
    
    def get_mapping(self, **kwargs):
        """
        Get index mapping.
        
        Args:
            **kwargs: Additional parameters
            
        Returns:
            dict: Index mapping
        """
    
    def put_settings(self, body, **kwargs):
        """
        Update index settings.
        
        Args:
            body (dict): Settings to update
            **kwargs: Additional parameters
            
        Returns:
            dict: Update settings response
        """
    
    def put_mapping(self, body, **kwargs):
        """
        Update index mapping.
        
        Args:
            body (dict): Mapping to update
            **kwargs: Additional parameters
            
        Returns:
            dict: Update mapping response
        """

class AsyncIndex:
    """
    Async version of Index class for async/await operations.
    """
    
    def __init__(self, name, using=None):
        """Initialize async index management."""
    
    def settings(self, **kwargs):
        """Configure index settings (same as Index)."""
    
    def mapping(self, mapping=None, **kwargs):
        """Configure index mapping (same as Index)."""
    
    def aliases(self, **kwargs):
        """Configure index aliases (same as Index)."""
    
    async def create(self, ignore=400, **kwargs):
        """Async create the index."""
    
    async def delete(self, ignore=404, **kwargs):
        """Async delete the index."""
    
    async def exists(self, **kwargs):
        """Async check if index exists."""
    
    async def close(self, **kwargs):
        """Async close the index."""
    
    async def open(self, **kwargs):
        """Async open the index."""
    
    async def refresh(self, **kwargs):
        """Async refresh the index."""
    
    async def flush(self, **kwargs):
        """Async flush the index."""
    
    async def get_settings(self, **kwargs):
        """Async get index settings."""
    
    async def get_mapping(self, **kwargs):
        """Async get index mapping."""
    
    async def put_settings(self, body, **kwargs):
        """Async update index settings."""
    
    async def put_mapping(self, body, **kwargs):
        """Async update index mapping."""

Index Template Management

Manage index templates for automatic index configuration.

class IndexTemplate:
    """
    Index template management class.
    """
    
    def __init__(self, name, using=None):
        """
        Initialize index template management.
        
        Args:
            name (str): Template name
            using (str, optional): Connection alias to use
        """
    
    def body(self, **kwargs):
        """
        Configure template body.
        
        Args:
            **kwargs: Template configuration
            
        Parameters:
            index_patterns (list): Index patterns to match
            template (dict): Template configuration with settings and mappings
            priority (int): Template priority
            version (int): Template version
            composed_of (list): Component templates to compose from
            data_stream (dict): Data stream configuration
            _meta (dict): Template metadata
            
        Returns:
            IndexTemplate: Current instance with body configured
        """
    
    def create(self, **kwargs):
        """
        Create the index template.
        
        Args:
            **kwargs: Additional create parameters
            
        Returns:
            dict: Create template response
        """
    
    def delete(self, **kwargs):
        """
        Delete the index template.
        
        Args:
            **kwargs: Additional delete parameters
            
        Returns:
            dict: Delete template response
        """
    
    def exists(self, **kwargs):
        """
        Check if template exists.
        
        Args:
            **kwargs: Additional parameters
            
        Returns:
            bool: True if template exists, False otherwise
        """
    
    def get(self, **kwargs):
        """
        Get the index template.
        
        Args:
            **kwargs: Additional parameters
            
        Returns:
            dict: Template configuration
        """

class AsyncIndexTemplate:
    """
    Async version of IndexTemplate class.
    """
    
    def __init__(self, name, using=None):
        """Initialize async index template management."""
    
    def body(self, **kwargs):
        """Configure template body (same as IndexTemplate)."""
    
    async def create(self, **kwargs):
        """Async create the index template."""
    
    async def delete(self, **kwargs):
        """Async delete the index template."""
    
    async def exists(self, **kwargs):
        """Async check if template exists."""
    
    async def get(self, **kwargs):
        """Async get the index template."""

Index Alias Management

Manage index aliases for flexible index operations.

def get_alias(name=None, index=None, using=None, **kwargs):
    """
    Get index aliases.
    
    Args:
        name (str, optional): Alias name to get
        index (str or list, optional): Index name(s) to get aliases for
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Alias information
    """

def put_alias(index, name, body=None, using=None, **kwargs):
    """
    Create or update index alias.
    
    Args:
        index (str or list): Index name(s) to alias
        name (str): Alias name
        body (dict, optional): Alias configuration (filter, routing, etc.)
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Put alias response
    """

def delete_alias(index, name, using=None, **kwargs):
    """
    Delete index alias.
    
    Args:
        index (str or list): Index name(s) to remove alias from
        name (str): Alias name to delete
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Delete alias response
    """

def update_aliases(body, using=None, **kwargs):
    """
    Perform multiple alias operations atomically.
    
    Args:
        body (dict): Alias operations to perform
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Update aliases response
        
    Body format:
        {
            "actions": [
                {"add": {"index": "index1", "alias": "alias1"}},
                {"remove": {"index": "index2", "alias": "alias2"}},
                {"remove_index": {"index": "old_index"}}
            ]
        }
    """

Utility Functions

Additional utility functions for index operations.

def get_mapping(index=None, using=None, **kwargs):
    """
    Get mapping for index(es).
    
    Args:
        index (str or list, optional): Index name(s)
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Index mappings
    """

def put_mapping(index, body, using=None, **kwargs):
    """
    Update mapping for index(es).
    
    Args:
        index (str or list): Index name(s) to update
        body (dict): Mapping configuration
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Put mapping response
    """

def get_settings(index=None, using=None, **kwargs):
    """
    Get settings for index(es).
    
    Args:
        index (str or list, optional): Index name(s)
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Index settings
    """

def put_settings(index, body, using=None, **kwargs):
    """
    Update settings for index(es).
    
    Args:
        index (str or list): Index name(s) to update
        body (dict): Settings configuration
        using (str, optional): Connection alias to use
        **kwargs: Additional parameters
        
    Returns:
        dict: Put settings response
    """

Usage Examples

Basic Index Creation and Management

from elasticsearch_dsl import Index, connections

# Configure connection
connections.create_connection(hosts=['localhost:9200'])

# Create index with settings and mapping
index = Index('blog')
index.settings(
    number_of_shards=2,
    number_of_replicas=1,
    refresh_interval='30s',
    analysis={
        'analyzer': {
            'blog_analyzer': {
                'type': 'custom',
                'tokenizer': 'standard',
                'filter': ['lowercase', 'stop']
            }
        }
    }
)

# Create the index
response = index.create()
print(f"Index created: {response}")

# Check if index exists
if index.exists():
    print("Index exists")

# Get index settings and mapping
settings = index.get_settings()
mapping = index.get_mapping()
print(f"Settings: {settings}")
print(f"Mapping: {mapping}")

Index Templates

from elasticsearch_dsl import IndexTemplate

# Create index template for log indices
template = IndexTemplate('logs-template')
template.body(
    index_patterns=['logs-*'],
    template={
        'settings': {
            'number_of_shards': 1,
            'number_of_replicas': 1,
            'refresh_interval': '10s'
        },
        'mappings': {
            'properties': {
                'timestamp': {'type': 'date'},
                'level': {'type': 'keyword'},
                'message': {
                    'type': 'text',
                    'analyzer': 'standard'
                },
                'host': {'type': 'keyword'},
                'service': {'type': 'keyword'}
            }
        }
    },
    priority=100,
    version=1
)

# Create the template
template.create()

# Any new index matching 'logs-*' will use this template
log_index = Index('logs-2023-10')
log_index.create()  # Will automatically apply template settings

Index Aliases

from elasticsearch_dsl import put_alias, get_alias, update_aliases

# Create alias for current month's logs
put_alias(
    index='logs-2023-10',
    name='logs-current',
    body={
        'filter': {
            'range': {
                'timestamp': {
                    'gte': '2023-10-01'
                }
            }
        }
    }
)

# Get alias information
aliases = get_alias(name='logs-current')
print(f"Current logs alias: {aliases}")

# Atomic alias operations - switch to new month
update_aliases({
    'actions': [
        {'remove': {'index': 'logs-2023-10', 'alias': 'logs-current'}},
        {'add': {'index': 'logs-2023-11', 'alias': 'logs-current'}}
    ]
})

Advanced Index Configuration

from elasticsearch_dsl import Index, Document, Text, Keyword, Date

# Define document first
class LogEntry(Document):
    timestamp = Date()
    level = Keyword()
    message = Text(analyzer='log_analyzer')
    host = Keyword()
    service = Keyword()
    
    class Index:
        name = 'application-logs'
        settings = {
            'number_of_shards': 3,
            'number_of_replicas': 2,
            'refresh_interval': '5s',
            'max_result_window': 50000,
            'analysis': {
                'analyzer': {
                    'log_analyzer': {
                        'type': 'custom',
                        'tokenizer': 'standard',
                        'filter': [
                            'lowercase',
                            'stop',
                            'log_patterns'
                        ]
                    }
                },
                'filter': {
                    'log_patterns': {
                        'type': 'pattern_replace',
                        'pattern': r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}',
                        'replacement': 'TIMESTAMP'
                    }
                }
            }
        }

# Initialize index from document
LogEntry.init()

# Or create index manually with same configuration
index = Index('application-logs')
index.settings(**LogEntry._index._settings)
index.mapping(LogEntry._doc_type.mapping.to_dict())
index.create()

Index Lifecycle Management

from elasticsearch_dsl import Index, connections
from datetime import datetime, timedelta

def rotate_logs():
    """Rotate log indices daily."""
    today = datetime.now().strftime('%Y-%m-%d')
    yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
    
    # Create today's index
    today_index = Index(f'logs-{today}')
    today_index.settings(
        number_of_shards=1,
        number_of_replicas=1,
        refresh_interval='30s'
    )
    
    if not today_index.exists():
        today_index.create()
        print(f"Created index: logs-{today}")
    
    # Update alias to point to today's index
    from elasticsearch_dsl import update_aliases
    
    actions = [
        {'add': {'index': f'logs-{today}', 'alias': 'logs-current'}}
    ]
    
    # Remove yesterday's alias if exists
    yesterday_index = Index(f'logs-{yesterday}')
    if yesterday_index.exists():
        actions.append({
            'remove': {'index': f'logs-{yesterday}', 'alias': 'logs-current'}
        })
    
    update_aliases({'actions': actions})
    print(f"Updated logs-current alias to point to logs-{today}")

# Run daily rotation
rotate_logs()

Async Index Operations

import asyncio
from elasticsearch_dsl import AsyncIndex, async_connections

async def async_index_operations():
    # Setup async connection
    await async_connections.create_connection(hosts=['localhost:9200'])
    
    # Create async index
    index = AsyncIndex('async-test')
    index.settings(
        number_of_shards=1,
        number_of_replicas=0
    )
    
    # Async operations
    if not await index.exists():
        create_response = await index.create()
        print(f"Created index: {create_response}")
    
    # Get settings asynchronously
    settings = await index.get_settings()
    print(f"Index settings: {settings}")
    
    # Refresh index
    await index.refresh()
    
    # Close and reopen
    await index.close()
    await index.open()

# Run async operations
asyncio.run(async_index_operations())

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch-dsl

docs

aggregations.md

analysis.md

connections.md

document-operations.md

field-types.md

index-management.md

index.md

search-queries.md

tile.json