CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch-dsl

High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.

Pending
Overview
Eval results
Files

connections.mddocs/

Connections

Connection configuration and management for single and multiple Elasticsearch clusters with comprehensive support for authentication, SSL/TLS, connection pooling, retry logic, and both synchronous and asynchronous clients. Enables flexible deployment architectures and secure cluster communication.

Capabilities

Connection Management Functions

Core functions for managing Elasticsearch connections.

def create_connection(alias='default', **kwargs):
    """
    Create and register new Elasticsearch connection.
    
    Args:
        alias (str): Connection alias name (default: 'default')
        **kwargs: Connection parameters passed to Elasticsearch client
        
    Returns:
        Elasticsearch: Elasticsearch client instance
        
    Connection Parameters:
        hosts (list or str): Elasticsearch host(s)
        http_auth (tuple): HTTP authentication (username, password)
        http_compress (bool): Enable HTTP compression
        timeout (int): Request timeout in seconds
        max_retries (int): Maximum number of retries
        retry_on_timeout (bool): Retry on timeout errors
        retry_on_status (list): HTTP status codes to retry on
        sniff_on_start (bool): Sniff nodes on startup
        sniff_on_connection_fail (bool): Sniff on connection failure
        sniff_timeout (int): Sniffer timeout
        sniffer_delay (int): Delay between sniff operations
        randomize_hosts (bool): Randomize host selection
        use_ssl (bool): Use SSL/TLS
        verify_certs (bool): Verify SSL certificates
        ssl_show_warn (bool): Show SSL warnings
        ca_certs (str): Path to CA certificates
        client_cert (str): Path to client certificate
        client_key (str): Path to client private key
        ssl_version (str): SSL/TLS version
        ssl_assert_hostname (bool): Assert hostname in certificate
        ssl_assert_fingerprint (str): Assert certificate fingerprint
        headers (dict): Default HTTP headers
        connections_per_node (int): Connections per node
        http_compress (bool): Enable gzip compression
        cloud_id (str): Elastic Cloud cluster ID
        api_key (tuple): API key authentication (id, key)
        bearer_auth (str): Bearer token authentication
        opaque_id (str): Opaque ID for request identification
    """

def add_connection(alias, connection):
    """
    Add existing connection to registry.
    
    Args:
        alias (str): Connection alias name
        connection: Elasticsearch client instance
    """

def remove_connection(alias):
    """
    Remove connection from registry.
    
    Args:
        alias (str): Connection alias name
        
    Raises:
        KeyError: If connection alias doesn't exist
    """

def get_connection(alias='default'):
    """
    Retrieve connection by alias.
    
    Args:
        alias (str): Connection alias name (default: 'default')
        
    Returns:
        Elasticsearch: Elasticsearch client instance
        
    Raises:
        KeyError: If connection alias doesn't exist
    """

def configure(**kwargs):
    """
    Configure default connection with given parameters.
    
    Args:
        **kwargs: Connection parameters (same as create_connection)
        
    Returns:
        Elasticsearch: Configured default connection
    """

Connection Registry

Global connection registry for managing multiple connections.

class Connections:
    """
    Global connection registry singleton.
    """
    
    def create_connection(self, alias='default', **kwargs):
        """Create and register connection (same as module function)."""
    
    def add_connection(self, alias, connection):
        """Add existing connection (same as module function)."""
    
    def remove_connection(self, alias):
        """Remove connection (same as module function)."""
    
    def get_connection(self, alias='default'):
        """Get connection (same as module function)."""
    
    def configure(self, **kwargs):
        """Configure default connection (same as module function)."""
    
    def all(self):
        """
        Get all registered connections.
        
        Returns:
            dict: Mapping of alias to connection
        """

# Global connections instance
connections: Connections

Async Connection Management

Asynchronous connection management for async/await operations.

def create_async_connection(alias='default', **kwargs):
    """
    Create and register new async Elasticsearch connection.
    
    Args:
        alias (str): Connection alias name (default: 'default')
        **kwargs: Connection parameters (same as create_connection)
        
    Returns:
        AsyncElasticsearch: Async Elasticsearch client instance
    """

def add_async_connection(alias, connection):
    """
    Add existing async connection to registry.
    
    Args:
        alias (str): Connection alias name
        connection: AsyncElasticsearch client instance
    """

def remove_async_connection(alias):
    """
    Remove async connection from registry.
    
    Args:
        alias (str): Connection alias name
        
    Raises:
        KeyError: If connection alias doesn't exist
    """

def get_async_connection(alias='default'):
    """
    Retrieve async connection by alias.
    
    Args:
        alias (str): Connection alias name (default: 'default')
        
    Returns:
        AsyncElasticsearch: Async Elasticsearch client instance
        
    Raises:
        KeyError: If connection alias doesn't exist
    """

def configure_async(**kwargs):
    """
    Configure default async connection with given parameters.
    
    Args:
        **kwargs: Connection parameters (same as create_connection)
        
    Returns:
        AsyncElasticsearch: Configured default async connection
    """

Async Connection Registry

Async connection registry for managing multiple async connections.

class AsyncConnections:
    """
    Global async connection registry singleton.
    """
    
    def create_connection(self, alias='default', **kwargs):
        """Create and register async connection."""
    
    def add_connection(self, alias, connection):
        """Add existing async connection."""
    
    def remove_connection(self, alias):
        """Remove async connection."""
    
    def get_connection(self, alias='default'):
        """Get async connection."""
    
    def configure(self, **kwargs):
        """Configure default async connection."""
    
    def all(self):
        """
        Get all registered async connections.
        
        Returns:
            dict: Mapping of alias to async connection
        """

# Global async connections instance
async_connections: AsyncConnections

Usage Examples

Basic Connection Setup

from elasticsearch_dsl import connections, Document, Text

# Simple connection to localhost
connections.create_connection(hosts=['localhost:9200'])

# Connection with authentication
connections.create_connection(
    alias='secure',
    hosts=['https://elasticsearch.example.com:9200'],
    http_auth=('username', 'password'),
    use_ssl=True,
    verify_certs=True
)

# Use connection in document operations
class Article(Document):
    title = Text()
    content = Text()
    
    class Index:
        name = 'articles'

# Document operations will use default connection
article = Article(title='Test', content='Content')
article.save()

# Use specific connection
article.save(using='secure')

Multiple Cluster Configuration

from elasticsearch_dsl import connections

# Production cluster
connections.create_connection(
    alias='production',
    hosts=['prod-es-1.example.com:9200', 'prod-es-2.example.com:9200'],
    http_auth=('prod_user', 'prod_password'),
    use_ssl=True,
    verify_certs=True,
    ca_certs='/path/to/ca.pem',
    timeout=30,
    max_retries=3,
    retry_on_timeout=True,
    sniff_on_start=True,
    sniff_on_connection_fail=True,
    sniff_timeout=10,
    randomize_hosts=True
)

# Development cluster
connections.create_connection(
    alias='development',
    hosts=['dev-es.example.com:9200'],
    http_auth=('dev_user', 'dev_password'),
    timeout=10,
    max_retries=1
)

# Analytics cluster (read-only)
connections.create_connection(
    alias='analytics',
    hosts=['analytics-es.example.com:9200'],
    http_auth=('analytics_user', 'analytics_password'),
    use_ssl=True,
    timeout=60  # Longer timeout for analytics queries
)

# Use different connections for different operations
from elasticsearch_dsl import Search

# Search production data
search = Search(using='production', index='logs')
response = search.execute()

# Run analytics on dedicated cluster
analytics_search = Search(using='analytics', index='metrics')
analytics_response = analytics_search.execute()

SSL/TLS and Authentication

from elasticsearch_dsl import connections

# SSL with client certificates
connections.create_connection(
    alias='ssl_client_cert',
    hosts=['https://secure-es.example.com:9200'],
    use_ssl=True,
    verify_certs=True,
    ca_certs='/path/to/ca-certificates.crt',
    client_cert='/path/to/client.crt',
    client_key='/path/to/client.key',
    ssl_assert_hostname=True
)

# API Key authentication
connections.create_connection(
    alias='api_key',
    hosts=['https://es.example.com:9200'],
    api_key=('api_key_id', 'api_key_secret'),
    use_ssl=True,
    verify_certs=True
)

# Bearer token authentication
connections.create_connection(
    alias='bearer_token',
    hosts=['https://es.example.com:9200'],
    bearer_auth='your_bearer_token_here',
    use_ssl=True
)

# Elastic Cloud connection
connections.create_connection(
    alias='elastic_cloud',
    cloud_id='cluster_name:base64_encoded_endpoint',
    http_auth=('elastic_username', 'elastic_password')
)

# Custom headers and opaque ID
connections.create_connection(
    alias='custom_headers',
    hosts=['https://es.example.com:9200'],
    headers={'Custom-Header': 'value'},
    opaque_id='my-application-v1.0',
    http_auth=('username', 'password')
)

Connection Pooling and Performance

from elasticsearch_dsl import connections

# High-performance configuration
connections.create_connection(
    alias='high_performance',
    hosts=[
        'es-node-01.example.com:9200',
        'es-node-02.example.com:9200', 
        'es-node-03.example.com:9200'
    ],
    # Connection pooling
    connections_per_node=10,
    
    # Retry configuration
    max_retries=5,
    retry_on_timeout=True,
    retry_on_status=[429, 502, 503, 504],
    
    # Sniffing for node discovery
    sniff_on_start=True,
    sniff_on_connection_fail=True,
    sniff_timeout=5,
    sniffer_delay=60,
    
    # Performance optimizations
    http_compress=True,
    randomize_hosts=True,
    
    # Timeouts
    timeout=20,
    
    # Authentication
    http_auth=('username', 'password'),
    use_ssl=True,
    verify_certs=True
)

# Bulk operations configuration
connections.create_connection(
    alias='bulk_operations',
    hosts=['bulk-es.example.com:9200'],
    timeout=300,  # 5 minute timeout for bulk operations
    max_retries=1,  # Fewer retries for bulk
    http_compress=True,  # Important for bulk data
    connections_per_node=20,  # More connections for concurrent bulk
    http_auth=('bulk_user', 'bulk_password')
)

Async Connection Setup

from elasticsearch_dsl import async_connections
import asyncio

async def setup_async_connections():
    # Create async connection
    await async_connections.create_connection(
        alias='async_default',
        hosts=['localhost:9200'],
        timeout=30,
        max_retries=3
    )
    
    # Async connection with authentication
    await async_connections.create_connection(
        alias='async_secure',
        hosts=['https://es.example.com:9200'],
        http_auth=('username', 'password'),
        use_ssl=True,
        verify_certs=True,
        timeout=60
    )

# Run async setup
asyncio.run(setup_async_connections())

# Use async connections with AsyncDocument and AsyncSearch
from elasticsearch_dsl import AsyncDocument, AsyncSearch

class AsyncArticle(AsyncDocument):
    title = Text()
    content = Text()
    
    class Index:
        name = 'async_articles'

async def async_operations():
    # Create and save document
    article = AsyncArticle(title='Async Test', content='Async content')
    await article.save(using='async_default')
    
    # Async search
    search = AsyncSearch(using='async_secure', index='logs')
    search = search.query('match', message='error')
    response = await search.execute()
    
    for hit in response:
        print(f"Log: {hit.message}")

# Run async operations
asyncio.run(async_operations())

Connection Health and Monitoring

from elasticsearch_dsl import connections
from elasticsearch.exceptions import ConnectionError, TransportError

def check_connection_health(alias='default'):
    """Check health of Elasticsearch connection."""
    try:
        client = connections.get_connection(alias)
        
        # Check cluster health
        health = client.cluster.health()
        print(f"Cluster status: {health['status']}")
        print(f"Number of nodes: {health['number_of_nodes']}")
        print(f"Active shards: {health['active_shards']}")
        
        # Check node info
        nodes = client.nodes.info()
        for node_id, node_info in nodes['nodes'].items():
            print(f"Node {node_id}: {node_info['name']} ({node_info['version']})")
            
        # Test with simple search
        client.search(index='_all', body={'query': {'match_all': {}}}, size=1)
        print("Connection test successful!")
        
        return True
        
    except ConnectionError as e:
        print(f"Connection error: {e}")
        return False
    except TransportError as e:
        print(f"Transport error: {e}")
        return False
    except Exception as e:
        print(f"Unexpected error: {e}")
        return False

# Check all connections
for alias in connections.all():
    print(f"Checking connection '{alias}':")
    check_connection_health(alias)
    print("-" * 40)

Dynamic Connection Management

from elasticsearch_dsl import connections, Document, Text
import os

class ConfigurableDocument(Document):
    """Document that uses environment-based connection selection."""
    
    title = Text()
    content = Text()
    
    class Index:
        name = 'configurable_docs'
    
    def save(self, **kwargs):
        # Use environment-specific connection
        env = os.getenv('ENVIRONMENT', 'development')
        connection_alias = f'es_{env}'
        
        if connection_alias not in connections.all():
            self._setup_connection(env)
            
        return super().save(using=connection_alias, **kwargs)
    
    def _setup_connection(self, env):
        """Setup connection based on environment."""
        if env == 'production':
            connections.create_connection(
                alias='es_production',
                hosts=os.getenv('ES_PROD_HOSTS', 'localhost:9200').split(','),
                http_auth=(
                    os.getenv('ES_PROD_USER'),
                    os.getenv('ES_PROD_PASSWORD')
                ),
                use_ssl=True,
                verify_certs=True,
                timeout=30
            )
        elif env == 'staging':
            connections.create_connection(
                alias='es_staging',
                hosts=os.getenv('ES_STAGING_HOSTS', 'localhost:9200').split(','),
                http_auth=(
                    os.getenv('ES_STAGING_USER'),
                    os.getenv('ES_STAGING_PASSWORD')
                ),
                timeout=20
            )
        else:  # development
            connections.create_connection(
                alias='es_development',
                hosts=['localhost:9200'],
                timeout=10
            )

# Usage
doc = ConfigurableDocument(title='Test', content='Content')
doc.save()  # Will use appropriate connection based on ENVIRONMENT variable

Connection Error Handling

from elasticsearch_dsl import connections, Search
from elasticsearch.exceptions import (
    ConnectionError, ConnectionTimeout, TransportError,
    NotFoundError, RequestError
)
import time

def robust_search_with_retry(index, query, max_retries=3, delay=1):
    """Perform search with connection retry logic."""
    
    for attempt in range(max_retries):
        try:
            search = Search(index=index)
            search = search.query('match', content=query)
            response = search.execute()
            return response
            
        except ConnectionTimeout:
            print(f"Attempt {attempt + 1}: Connection timeout")
            if attempt < max_retries - 1:
                time.sleep(delay * (2 ** attempt))  # Exponential backoff
                continue
            raise
            
        except ConnectionError as e:
            print(f"Attempt {attempt + 1}: Connection error - {e}")
            if attempt < max_retries - 1:
                # Try to recreate connection
                try:
                    connections.remove_connection('default')
                except KeyError:
                    pass
                connections.create_connection(hosts=['localhost:9200'])
                time.sleep(delay)
                continue
            raise
            
        except TransportError as e:
            if e.status_code in [429, 502, 503, 504]:  # Retry on these errors
                print(f"Attempt {attempt + 1}: Transport error {e.status_code}")
                if attempt < max_retries - 1:
                    time.sleep(delay * (2 ** attempt))
                    continue
            raise
            
        except (NotFoundError, RequestError):
            # Don't retry on client errors
            raise
            
        except Exception as e:
            print(f"Unexpected error: {e}")
            raise

# Usage
try:
    results = robust_search_with_retry('articles', 'elasticsearch')
    print(f"Found {len(results)} results")
except Exception as e:
    print(f"Search failed after retries: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch-dsl

docs

aggregations.md

analysis.md

connections.md

document-operations.md

field-types.md

index-management.md

index.md

search-queries.md

tile.json