CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch5

Python client for Elasticsearch 5.x providing comprehensive access to all Elasticsearch APIs and features.

Pending
Overview
Eval results
Files

transport-connection.mddocs/

Transport and Connection

Low-level connection management, transport configuration, and client customization. Provides control over HTTP communication, connection pooling, retry logic, serialization, and advanced client configuration options.

Capabilities

Elasticsearch Client Configuration

Main client class with comprehensive configuration options for production use.

class Elasticsearch:
    def __init__(
        self,
        hosts=None,
        transport_class=Transport,
        **kwargs
    ):
        """
        Initialize Elasticsearch client with connection and transport configuration.
        
        Parameters:
        - hosts: List of Elasticsearch nodes (default: [{'host': 'localhost', 'port': 9200}])
        - transport_class: Transport class to handle communication (default: Transport)
        - **kwargs: Additional arguments passed to Transport constructor
        
        Host specification formats:
        - String: 'localhost:9200'
        - Dict: {'host': 'localhost', 'port': 9200, 'use_ssl': True}
        - List: ['host1:9200', 'host2:9200'] or [{'host': 'host1'}, {'host': 'host2'}]
        
        Examples:
        # Simple connection
        es = Elasticsearch(['localhost:9200'])
        
        # Multiple hosts with SSL
        es = Elasticsearch([
            {'host': 'host1', 'port': 9200, 'use_ssl': True},
            {'host': 'host2', 'port': 9200, 'use_ssl': True}
        ])
        
        # With authentication
        es = Elasticsearch(
            ['https://host1:9200'],
            http_auth=('username', 'password'),
            use_ssl=True,
            verify_certs=True
        )
        """

Transport Layer Configuration

Core transport class handling HTTP communication, connection management, and request processing.

class Transport:
    def __init__(
        self,
        hosts,
        connection_class=Urllib3HttpConnection,
        connection_pool_class=ConnectionPool,
        host_info_callback=get_host_info,
        sniff_on_start=False,
        sniffer_timeout=None,
        sniff_on_connection_fail=False,
        sniff_timeout=0.1,
        serializer=JSONSerializer(),
        serializers=None,
        default_mimetype='application/json',
        max_retries=3,
        retry_on_status=(502, 503, 504),
        retry_on_timeout=False,
        send_get_body_as='GET',
        **kwargs
    ):
        """
        Initialize transport layer with connection and retry configuration.
        
        Parameters:
        - hosts: List of host configurations
        - connection_class: HTTP connection implementation (Urllib3HttpConnection, RequestsHttpConnection)
        - connection_pool_class: Connection pool management class (ConnectionPool, DummyConnectionPool)
        - host_info_callback: Function to process discovered host information
        - sniff_on_start: Discover cluster nodes at startup (bool)
        - sniffer_timeout: Interval between automatic node discovery (seconds)
        - sniff_on_connection_fail: Discover nodes when connection fails (bool)
        - sniff_timeout: Timeout for node discovery requests (seconds)
        - serializer: Default request/response serializer
        - serializers: Dict of serializers by mimetype
        - default_mimetype: Default response content type
        - max_retries: Maximum retry attempts for failed requests
        - retry_on_status: HTTP status codes that trigger retries
        - retry_on_timeout: Retry requests that timeout (bool)
        - send_get_body_as: Method for GET requests with body ('GET' or 'POST')
        - **kwargs: Additional connection parameters
        
        Connection parameters passed to connection class:
        - timeout: Request timeout in seconds (default: 10)
        - use_ssl: Enable HTTPS connections (bool)
        - verify_certs: Verify SSL certificates (bool)
        - ca_certs: Path to CA certificate file
        - client_cert: Path to client certificate file
        - client_key: Path to client private key file
        - ssl_version: SSL version to use
        - ssl_assert_hostname: Verify SSL hostname (bool)
        - ssl_assert_fingerprint: Expected SSL certificate fingerprint
        - maxsize: Maximum connection pool size
        - http_auth: HTTP authentication tuple ('username', 'password')
        - http_compress: Enable HTTP compression (bool)
        - headers: Default HTTP headers dict
        """

    def perform_request(self, method: str, url: str, params: dict = None, body: dict = None) -> dict:
        """
        Execute HTTP request with automatic retry and connection management.
        
        Parameters:
        - method: HTTP method ('GET', 'POST', 'PUT', 'DELETE', 'HEAD')
        - url: Request URL path
        - params: Query parameters dict
        - body: Request body (will be serialized)
        
        Returns:
        dict: Deserialized response body
        
        Raises:
        ConnectionError: Network/connection failures
        TransportError: HTTP errors with status codes
        SerializationError: Request/response serialization failures
        """

    def add_connection(self, host: dict):
        """Add a new connection to the pool."""

    def set_connections(self, hosts: list):
        """Replace all connections with new host list."""

    def get_connection(self):
        """Get an available connection from the pool."""

    def sniff_hosts(self, initial: bool = False):
        """Discover cluster nodes and update connection pool."""

    def mark_dead(self, connection):
        """Mark a connection as failed for dead timeout period."""

    def close(self):
        """Close all connections and clean up resources."""

Connection Pool Management

Manage multiple connections with failure detection, load balancing, and automatic recovery.

class ConnectionPool:
    def __init__(
        self,
        connections,
        dead_timeout=60,
        timeout_cutoff=5,
        selector_class=RoundRobinSelector,
        randomize_hosts=True,
        **kwargs
    ):
        """
        Initialize connection pool with failure handling and load balancing.
        
        Parameters:
        - connections: List of connection instances
        - dead_timeout: Seconds to wait before retrying failed connections
        - timeout_cutoff: Failures needed to mark connection dead
        - selector_class: Connection selection strategy class
        - randomize_hosts: Randomize initial connection order (bool)
        - **kwargs: Additional configuration passed to selector
        """

    def get_connection(self):
        """
        Select an available connection using the configured selector.
        
        Returns:
        Connection: Available connection instance
        
        Raises:
        ConnectionError: If no connections are available
        """

    def mark_dead(self, connection, now: float = None):
        """
        Mark connection as failed and start dead timeout.
        
        Parameters:
        - connection: Connection instance to mark as dead
        - now: Current timestamp (auto-generated if None)
        """

    def mark_live(self, connection):
        """Reset connection failure count and mark as available."""

    def resurrect(self, force: bool = False):
        """
        Attempt to revive dead connections after timeout period.
        
        Parameters:
        - force: Revive regardless of timeout (bool)
        """

    def close(self):
        """Close all connections in the pool."""

class DummyConnectionPool:
    """
    Single connection pool for cases with only one Elasticsearch node.
    Provides same interface as ConnectionPool but manages single connection.
    """

Connection Selection Strategies

Different strategies for selecting connections from the pool.

class ConnectionSelector:
    def __init__(self, opts: dict):
        """
        Base class for connection selection strategies.
        
        Parameters:
        - opts: Selector configuration options
        """

    def select(self, connections: list):
        """
        Select connection from available connections.
        
        Parameters:
        - connections: List of available connections
        
        Returns:
        Connection: Selected connection instance
        """

class RoundRobinSelector(ConnectionSelector):
    """
    Round-robin connection selection for even load distribution.
    Cycles through available connections in order.
    """

class RandomSelector(ConnectionSelector):
    """
    Random connection selection for simple load balancing.
    Randomly selects from available connections.
    """

HTTP Connection Implementations

Different HTTP client implementations for various use cases and environments.

class Connection:
    def __init__(
        self,
        host='localhost',
        port=9200,
        use_ssl=False,
        url_prefix='',
        timeout=10,
        **kwargs
    ):
        """
        Base connection class defining the connection interface.
        
        Parameters:
        - host: Elasticsearch host address
        - port: Port number (default: 9200)
        - use_ssl: Enable HTTPS (bool)
        - url_prefix: URL prefix for requests
        - timeout: Request timeout in seconds
        - **kwargs: Implementation-specific parameters
        """

    def perform_request(self, method: str, url: str, params: dict, body: bytes, timeout: float, ignore: tuple, headers: dict):
        """
        Execute HTTP request (implemented by subclasses).
        
        Returns:
        tuple: (status_code, headers_dict, response_body)
        """

    def log_request_success(self, method: str, full_url: str, path: str, body: bytes, status_code: int, response: str, duration: float):
        """Log successful request for debugging and monitoring."""

    def log_request_fail(self, method: str, full_url: str, path: str, body: bytes, duration: float, status_code: int, response: str, exception: Exception):
        """Log failed request for debugging and error tracking."""

class Urllib3HttpConnection(Connection):
    """
    HTTP connection using urllib3 library (default implementation).
    
    Additional parameters:
    - maxsize: Connection pool size (default: 10)
    - block: Block when pool is full (bool, default: False)
    - http_compress: Enable compression (bool, default: False)
    - headers: Default headers dict
    - ssl_context: Custom SSL context
    - assert_same_host: Verify host matches (bool, default: True)
    """

class RequestsHttpConnection(Connection):
    """
    HTTP connection using requests library (alternative implementation).
    
    Additional parameters:
    - pool_connections: Number of connection pools
    - pool_maxsize: Maximum connections per pool
    - max_retries: urllib3 retry configuration
    - pool_block: Block when pool exhausted (bool)
    """

Data Serialization

Handle request/response serialization with support for multiple formats.

class JSONSerializer:
    """
    Default JSON serializer for request/response data.
    
    Attributes:
    - mimetype: 'application/json'
    """

    def loads(self, s: str) -> dict:
        """
        Deserialize JSON string to Python object.
        
        Parameters:
        - s: JSON string to deserialize
        
        Returns:
        dict: Deserialized Python object
        
        Raises:
        SerializationError: If JSON is invalid
        """

    def dumps(self, data: dict) -> str:
        """
        Serialize Python object to JSON string.
        
        Parameters:
        - data: Python object to serialize
        
        Returns:
        str: JSON string representation
        
        Raises:
        SerializationError: If object cannot be serialized
        """

    def default(self, data):
        """
        Handle special data types during serialization.
        Supports: datetime, date, Decimal, UUID objects
        """

class TextSerializer:
    """
    Plain text serializer for string data.
    
    Attributes:
    - mimetype: 'text/plain'
    """

    def loads(self, s: str) -> str:
        """Return string unchanged."""

    def dumps(self, data: str) -> str:
        """Serialize string data only (raises error for other types)."""

class Deserializer:
    def __init__(self, serializers: dict, default_mimetype: str = 'application/json'):
        """
        Response deserializer supporting multiple content types.
        
        Parameters:
        - serializers: Dict mapping mimetypes to serializer instances
        - default_mimetype: Fallback content type
        """

    def loads(self, s: str, mimetype: str = None) -> dict:
        """
        Deserialize response based on content type.
        
        Parameters:
        - s: Response string to deserialize
        - mimetype: Content type (uses default if None)
        
        Returns:
        dict: Deserialized response
        """

Host Discovery and Configuration

Automatic node discovery and host configuration management.

def get_host_info(node_info: dict, host: dict) -> dict:
    """
    Default callback to process discovered node information.
    
    Parameters:
    - node_info: Node information from cluster state
    - host: Current host configuration
    
    Returns:
    dict: Updated host configuration with discovered information
    
    Extracts:
    - host: IP address or hostname
    - port: HTTP port number
    - use_ssl: Whether node uses HTTPS
    - url_prefix: URL prefix if configured
    """

Usage Examples

Basic Client Configuration

from elasticsearch5 import Elasticsearch

# Simple connection to local cluster
es = Elasticsearch()

# Multiple hosts for high availability
es = Elasticsearch([
    'host1:9200',
    'host2:9200',
    'host3:9200'
])

# Detailed host configuration
es = Elasticsearch([
    {'host': 'host1', 'port': 9200},
    {'host': 'host2', 'port': 9200, 'use_ssl': True},
    {'host': 'host3', 'port': 9243, 'url_prefix': '/elasticsearch'}
])

SSL and Authentication

# SSL with authentication
es = Elasticsearch(
    ['https://host1:9200'],
    http_auth=('username', 'password'),
    use_ssl=True,
    verify_certs=True,
    ca_certs='/path/to/ca.crt',
    timeout=30
)

# Client certificate authentication
es = Elasticsearch(
    ['https://host1:9200'],
    use_ssl=True,
    verify_certs=True,
    ca_certs='/path/to/ca.crt',
    client_cert='/path/to/client.crt',
    client_key='/path/to/client.key'
)

# Custom SSL context
import ssl
ssl_context = ssl.create_default_context(cafile='/path/to/ca.crt')
ssl_context.check_hostname = False

es = Elasticsearch(
    ['https://host1:9200'],
    use_ssl=True,
    ssl_context=ssl_context
)

Advanced Transport Configuration

from elasticsearch5 import (
    Elasticsearch, 
    Transport,
    RequestsHttpConnection,
    RoundRobinSelector
)

# Custom transport with requests
es = Elasticsearch(
    ['host1:9200', 'host2:9200'],
    connection_class=RequestsHttpConnection,
    timeout=20,
    max_retries=5,
    retry_on_timeout=True,
    retry_on_status=(429, 502, 503, 504),
    http_compress=True
)

# Node discovery configuration
es = Elasticsearch(
    ['host1:9200'],
    sniff_on_start=True,
    sniff_on_connection_fail=True,
    sniffer_timeout=60,  # Sniff every 60 seconds
    sniff_timeout=10     # 10 second sniff timeout
)

# Custom connection pool
from elasticsearch5 import ConnectionPool, RandomSelector

es = Elasticsearch(
    ['host1:9200', 'host2:9200', 'host3:9200'],
    connection_pool_class=ConnectionPool,
    selector_class=RandomSelector,
    dead_timeout=30,      # Retry dead connections after 30s
    timeout_cutoff=3      # Mark dead after 3 failures
)

Custom Serialization

from elasticsearch5 import Elasticsearch, JSONSerializer
import json
from datetime import datetime

class CustomJSONSerializer(JSONSerializer):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

# Use custom serializer
es = Elasticsearch(
    ['localhost:9200'],
    serializer=CustomJSONSerializer()
)

# Multiple serializers for different content types
from elasticsearch5.serializers import TextSerializer

serializers = {
    'application/json': CustomJSONSerializer(),
    'text/plain': TextSerializer(),
    'text/csv': TextSerializer()
}

es = Elasticsearch(
    ['localhost:9200'],
    serializers=serializers,
    default_mimetype='application/json'
)

Connection Monitoring and Debugging

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('elasticsearch')
logger.setLevel(logging.DEBUG)

# Custom connection with monitoring
class MonitoredConnection(Urllib3HttpConnection):
    def log_request_success(self, method, full_url, path, body, status_code, response, duration):
        super().log_request_success(method, full_url, path, body, status_code, response, duration)
        print(f"SUCCESS: {method} {path} -> {status_code} ({duration:.2f}s)")
    
    def log_request_fail(self, method, full_url, path, body, duration, status_code, response, exception):
        super().log_request_fail(method, full_url, path, body, duration, status_code, response, exception)
        print(f"FAILED: {method} {path} -> {status_code} ({duration:.2f}s): {exception}")

es = Elasticsearch(
    ['localhost:9200'],
    connection_class=MonitoredConnection
)

Connection Pool Management

# Access transport layer directly
transport = es.transport

# Add new connection at runtime
transport.add_connection({'host': 'new-host', 'port': 9200})

# Update entire connection list
transport.set_connections([
    {'host': 'host1', 'port': 9200},
    {'host': 'host2', 'port': 9200},
    {'host': 'host3', 'port': 9200}
])

# Manual node discovery
transport.sniff_hosts()

# Check connection pool status
pool = transport.connection_pool
print(f"Total connections: {len(pool.connections)}")
print(f"Dead connections: {len(pool.dead_count)}")

# Force resurrection of dead connections
pool.resurrect(force=True)

# Clean shutdown
transport.close()

Error Handling and Retries

from elasticsearch5.exceptions import (
    ConnectionError,
    TransportError,
    SerializationError
)

try:
    es = Elasticsearch(
        ['host1:9200', 'host2:9200'],
        max_retries=3,
        retry_on_status=(429, 502, 503, 504),
        retry_on_timeout=True,
        timeout=10
    )
    
    result = es.search(index='my_index', body={'query': {'match_all': {}}})
    
except ConnectionError as e:
    print(f"Connection failed: {e}")
except TransportError as e:
    print(f"Transport error {e.status_code}: {e.error}")
except SerializationError as e:
    print(f"Serialization error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Production Configuration

# Production-ready configuration
es = Elasticsearch(
    [
        {'host': 'es-node-1', 'port': 9200},
        {'host': 'es-node-2', 'port': 9200},
        {'host': 'es-node-3', 'port': 9200}
    ],
    
    # Security
    http_auth=('username', 'password'),
    use_ssl=True,
    verify_certs=True,
    ca_certs='/etc/ssl/certs/elasticsearch-ca.pem',
    
    # Performance
    timeout=30,
    max_retries=3,
    retry_on_status=(429, 502, 503, 504),
    retry_on_timeout=True,
    http_compress=True,
    
    # Reliability
    sniff_on_start=True,
    sniff_on_connection_fail=True,
    sniffer_timeout=60,
    dead_timeout=30,
    
    # Connection pool
    maxsize=25,  # Max connections per host
    selector_class=RoundRobinSelector,
    
    # Headers
    headers={'User-Agent': 'MyApp/1.0'}
)

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch5

docs

bulk-operations.md

cluster-operations.md

document-operations.md

index-management.md

index.md

search-operations.md

transport-connection.md

tile.json