Python client for Elasticsearch 5.x providing comprehensive access to all Elasticsearch APIs and features.
—
Low-level connection management, transport configuration, and client customization. Provides control over HTTP communication, connection pooling, retry logic, serialization, and advanced client configuration options.
Main client class with comprehensive configuration options for production use.
class Elasticsearch:
def __init__(
self,
hosts=None,
transport_class=Transport,
**kwargs
):
"""
Initialize Elasticsearch client with connection and transport configuration.
Parameters:
- hosts: List of Elasticsearch nodes (default: [{'host': 'localhost', 'port': 9200}])
- transport_class: Transport class to handle communication (default: Transport)
- **kwargs: Additional arguments passed to Transport constructor
Host specification formats:
- String: 'localhost:9200'
- Dict: {'host': 'localhost', 'port': 9200, 'use_ssl': True}
- List: ['host1:9200', 'host2:9200'] or [{'host': 'host1'}, {'host': 'host2'}]
Examples:
# Simple connection
es = Elasticsearch(['localhost:9200'])
# Multiple hosts with SSL
es = Elasticsearch([
{'host': 'host1', 'port': 9200, 'use_ssl': True},
{'host': 'host2', 'port': 9200, 'use_ssl': True}
])
# With authentication
es = Elasticsearch(
['https://host1:9200'],
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True
)
"""Core transport class handling HTTP communication, connection management, and request processing.
class Transport:
def __init__(
self,
hosts,
connection_class=Urllib3HttpConnection,
connection_pool_class=ConnectionPool,
host_info_callback=get_host_info,
sniff_on_start=False,
sniffer_timeout=None,
sniff_on_connection_fail=False,
sniff_timeout=0.1,
serializer=JSONSerializer(),
serializers=None,
default_mimetype='application/json',
max_retries=3,
retry_on_status=(502, 503, 504),
retry_on_timeout=False,
send_get_body_as='GET',
**kwargs
):
"""
Initialize transport layer with connection and retry configuration.
Parameters:
- hosts: List of host configurations
- connection_class: HTTP connection implementation (Urllib3HttpConnection, RequestsHttpConnection)
- connection_pool_class: Connection pool management class (ConnectionPool, DummyConnectionPool)
- host_info_callback: Function to process discovered host information
- sniff_on_start: Discover cluster nodes at startup (bool)
- sniffer_timeout: Interval between automatic node discovery (seconds)
- sniff_on_connection_fail: Discover nodes when connection fails (bool)
- sniff_timeout: Timeout for node discovery requests (seconds)
- serializer: Default request/response serializer
- serializers: Dict of serializers by mimetype
- default_mimetype: Default response content type
- max_retries: Maximum retry attempts for failed requests
- retry_on_status: HTTP status codes that trigger retries
- retry_on_timeout: Retry requests that timeout (bool)
- send_get_body_as: Method for GET requests with body ('GET' or 'POST')
- **kwargs: Additional connection parameters
Connection parameters passed to connection class:
- timeout: Request timeout in seconds (default: 10)
- use_ssl: Enable HTTPS connections (bool)
- verify_certs: Verify SSL certificates (bool)
- ca_certs: Path to CA certificate file
- client_cert: Path to client certificate file
- client_key: Path to client private key file
- ssl_version: SSL version to use
- ssl_assert_hostname: Verify SSL hostname (bool)
- ssl_assert_fingerprint: Expected SSL certificate fingerprint
- maxsize: Maximum connection pool size
- http_auth: HTTP authentication tuple ('username', 'password')
- http_compress: Enable HTTP compression (bool)
- headers: Default HTTP headers dict
"""
def perform_request(self, method: str, url: str, params: dict = None, body: dict = None) -> dict:
"""
Execute HTTP request with automatic retry and connection management.
Parameters:
- method: HTTP method ('GET', 'POST', 'PUT', 'DELETE', 'HEAD')
- url: Request URL path
- params: Query parameters dict
- body: Request body (will be serialized)
Returns:
dict: Deserialized response body
Raises:
ConnectionError: Network/connection failures
TransportError: HTTP errors with status codes
SerializationError: Request/response serialization failures
"""
def add_connection(self, host: dict):
"""Add a new connection to the pool."""
def set_connections(self, hosts: list):
"""Replace all connections with new host list."""
def get_connection(self):
"""Get an available connection from the pool."""
def sniff_hosts(self, initial: bool = False):
"""Discover cluster nodes and update connection pool."""
def mark_dead(self, connection):
"""Mark a connection as failed for dead timeout period."""
def close(self):
"""Close all connections and clean up resources."""Manage multiple connections with failure detection, load balancing, and automatic recovery.
class ConnectionPool:
def __init__(
self,
connections,
dead_timeout=60,
timeout_cutoff=5,
selector_class=RoundRobinSelector,
randomize_hosts=True,
**kwargs
):
"""
Initialize connection pool with failure handling and load balancing.
Parameters:
- connections: List of connection instances
- dead_timeout: Seconds to wait before retrying failed connections
- timeout_cutoff: Failures needed to mark connection dead
- selector_class: Connection selection strategy class
- randomize_hosts: Randomize initial connection order (bool)
- **kwargs: Additional configuration passed to selector
"""
def get_connection(self):
"""
Select an available connection using the configured selector.
Returns:
Connection: Available connection instance
Raises:
ConnectionError: If no connections are available
"""
def mark_dead(self, connection, now: float = None):
"""
Mark connection as failed and start dead timeout.
Parameters:
- connection: Connection instance to mark as dead
- now: Current timestamp (auto-generated if None)
"""
def mark_live(self, connection):
"""Reset connection failure count and mark as available."""
def resurrect(self, force: bool = False):
"""
Attempt to revive dead connections after timeout period.
Parameters:
- force: Revive regardless of timeout (bool)
"""
def close(self):
"""Close all connections in the pool."""
class DummyConnectionPool:
"""
Single connection pool for cases with only one Elasticsearch node.
Provides same interface as ConnectionPool but manages single connection.
"""Different strategies for selecting connections from the pool.
class ConnectionSelector:
def __init__(self, opts: dict):
"""
Base class for connection selection strategies.
Parameters:
- opts: Selector configuration options
"""
def select(self, connections: list):
"""
Select connection from available connections.
Parameters:
- connections: List of available connections
Returns:
Connection: Selected connection instance
"""
class RoundRobinSelector(ConnectionSelector):
"""
Round-robin connection selection for even load distribution.
Cycles through available connections in order.
"""
class RandomSelector(ConnectionSelector):
"""
Random connection selection for simple load balancing.
Randomly selects from available connections.
"""Different HTTP client implementations for various use cases and environments.
class Connection:
def __init__(
self,
host='localhost',
port=9200,
use_ssl=False,
url_prefix='',
timeout=10,
**kwargs
):
"""
Base connection class defining the connection interface.
Parameters:
- host: Elasticsearch host address
- port: Port number (default: 9200)
- use_ssl: Enable HTTPS (bool)
- url_prefix: URL prefix for requests
- timeout: Request timeout in seconds
- **kwargs: Implementation-specific parameters
"""
def perform_request(self, method: str, url: str, params: dict, body: bytes, timeout: float, ignore: tuple, headers: dict):
"""
Execute HTTP request (implemented by subclasses).
Returns:
tuple: (status_code, headers_dict, response_body)
"""
def log_request_success(self, method: str, full_url: str, path: str, body: bytes, status_code: int, response: str, duration: float):
"""Log successful request for debugging and monitoring."""
def log_request_fail(self, method: str, full_url: str, path: str, body: bytes, duration: float, status_code: int, response: str, exception: Exception):
"""Log failed request for debugging and error tracking."""
class Urllib3HttpConnection(Connection):
"""
HTTP connection using urllib3 library (default implementation).
Additional parameters:
- maxsize: Connection pool size (default: 10)
- block: Block when pool is full (bool, default: False)
- http_compress: Enable compression (bool, default: False)
- headers: Default headers dict
- ssl_context: Custom SSL context
- assert_same_host: Verify host matches (bool, default: True)
"""
class RequestsHttpConnection(Connection):
"""
HTTP connection using requests library (alternative implementation).
Additional parameters:
- pool_connections: Number of connection pools
- pool_maxsize: Maximum connections per pool
- max_retries: urllib3 retry configuration
- pool_block: Block when pool exhausted (bool)
"""Handle request/response serialization with support for multiple formats.
class JSONSerializer:
"""
Default JSON serializer for request/response data.
Attributes:
- mimetype: 'application/json'
"""
def loads(self, s: str) -> dict:
"""
Deserialize JSON string to Python object.
Parameters:
- s: JSON string to deserialize
Returns:
dict: Deserialized Python object
Raises:
SerializationError: If JSON is invalid
"""
def dumps(self, data: dict) -> str:
"""
Serialize Python object to JSON string.
Parameters:
- data: Python object to serialize
Returns:
str: JSON string representation
Raises:
SerializationError: If object cannot be serialized
"""
def default(self, data):
"""
Handle special data types during serialization.
Supports: datetime, date, Decimal, UUID objects
"""
class TextSerializer:
"""
Plain text serializer for string data.
Attributes:
- mimetype: 'text/plain'
"""
def loads(self, s: str) -> str:
"""Return string unchanged."""
def dumps(self, data: str) -> str:
"""Serialize string data only (raises error for other types)."""
class Deserializer:
def __init__(self, serializers: dict, default_mimetype: str = 'application/json'):
"""
Response deserializer supporting multiple content types.
Parameters:
- serializers: Dict mapping mimetypes to serializer instances
- default_mimetype: Fallback content type
"""
def loads(self, s: str, mimetype: str = None) -> dict:
"""
Deserialize response based on content type.
Parameters:
- s: Response string to deserialize
- mimetype: Content type (uses default if None)
Returns:
dict: Deserialized response
"""Automatic node discovery and host configuration management.
def get_host_info(node_info: dict, host: dict) -> dict:
"""
Default callback to process discovered node information.
Parameters:
- node_info: Node information from cluster state
- host: Current host configuration
Returns:
dict: Updated host configuration with discovered information
Extracts:
- host: IP address or hostname
- port: HTTP port number
- use_ssl: Whether node uses HTTPS
- url_prefix: URL prefix if configured
"""from elasticsearch5 import Elasticsearch
# Simple connection to local cluster
es = Elasticsearch()
# Multiple hosts for high availability
es = Elasticsearch([
'host1:9200',
'host2:9200',
'host3:9200'
])
# Detailed host configuration
es = Elasticsearch([
{'host': 'host1', 'port': 9200},
{'host': 'host2', 'port': 9200, 'use_ssl': True},
{'host': 'host3', 'port': 9243, 'url_prefix': '/elasticsearch'}
])# SSL with authentication
es = Elasticsearch(
['https://host1:9200'],
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True,
ca_certs='/path/to/ca.crt',
timeout=30
)
# Client certificate authentication
es = Elasticsearch(
['https://host1:9200'],
use_ssl=True,
verify_certs=True,
ca_certs='/path/to/ca.crt',
client_cert='/path/to/client.crt',
client_key='/path/to/client.key'
)
# Custom SSL context
import ssl
ssl_context = ssl.create_default_context(cafile='/path/to/ca.crt')
ssl_context.check_hostname = False
es = Elasticsearch(
['https://host1:9200'],
use_ssl=True,
ssl_context=ssl_context
)from elasticsearch5 import (
Elasticsearch,
Transport,
RequestsHttpConnection,
RoundRobinSelector
)
# Custom transport with requests
es = Elasticsearch(
['host1:9200', 'host2:9200'],
connection_class=RequestsHttpConnection,
timeout=20,
max_retries=5,
retry_on_timeout=True,
retry_on_status=(429, 502, 503, 504),
http_compress=True
)
# Node discovery configuration
es = Elasticsearch(
['host1:9200'],
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60, # Sniff every 60 seconds
sniff_timeout=10 # 10 second sniff timeout
)
# Custom connection pool
from elasticsearch5 import ConnectionPool, RandomSelector
es = Elasticsearch(
['host1:9200', 'host2:9200', 'host3:9200'],
connection_pool_class=ConnectionPool,
selector_class=RandomSelector,
dead_timeout=30, # Retry dead connections after 30s
timeout_cutoff=3 # Mark dead after 3 failures
)from elasticsearch5 import Elasticsearch, JSONSerializer
import json
from datetime import datetime
class CustomJSONSerializer(JSONSerializer):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
# Use custom serializer
es = Elasticsearch(
['localhost:9200'],
serializer=CustomJSONSerializer()
)
# Multiple serializers for different content types
from elasticsearch5.serializers import TextSerializer
serializers = {
'application/json': CustomJSONSerializer(),
'text/plain': TextSerializer(),
'text/csv': TextSerializer()
}
es = Elasticsearch(
['localhost:9200'],
serializers=serializers,
default_mimetype='application/json'
)import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('elasticsearch')
logger.setLevel(logging.DEBUG)
# Custom connection with monitoring
class MonitoredConnection(Urllib3HttpConnection):
def log_request_success(self, method, full_url, path, body, status_code, response, duration):
super().log_request_success(method, full_url, path, body, status_code, response, duration)
print(f"SUCCESS: {method} {path} -> {status_code} ({duration:.2f}s)")
def log_request_fail(self, method, full_url, path, body, duration, status_code, response, exception):
super().log_request_fail(method, full_url, path, body, duration, status_code, response, exception)
print(f"FAILED: {method} {path} -> {status_code} ({duration:.2f}s): {exception}")
es = Elasticsearch(
['localhost:9200'],
connection_class=MonitoredConnection
)# Access transport layer directly
transport = es.transport
# Add new connection at runtime
transport.add_connection({'host': 'new-host', 'port': 9200})
# Update entire connection list
transport.set_connections([
{'host': 'host1', 'port': 9200},
{'host': 'host2', 'port': 9200},
{'host': 'host3', 'port': 9200}
])
# Manual node discovery
transport.sniff_hosts()
# Check connection pool status
pool = transport.connection_pool
print(f"Total connections: {len(pool.connections)}")
print(f"Dead connections: {len(pool.dead_count)}")
# Force resurrection of dead connections
pool.resurrect(force=True)
# Clean shutdown
transport.close()from elasticsearch5.exceptions import (
ConnectionError,
TransportError,
SerializationError
)
try:
es = Elasticsearch(
['host1:9200', 'host2:9200'],
max_retries=3,
retry_on_status=(429, 502, 503, 504),
retry_on_timeout=True,
timeout=10
)
result = es.search(index='my_index', body={'query': {'match_all': {}}})
except ConnectionError as e:
print(f"Connection failed: {e}")
except TransportError as e:
print(f"Transport error {e.status_code}: {e.error}")
except SerializationError as e:
print(f"Serialization error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")# Production-ready configuration
es = Elasticsearch(
[
{'host': 'es-node-1', 'port': 9200},
{'host': 'es-node-2', 'port': 9200},
{'host': 'es-node-3', 'port': 9200}
],
# Security
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True,
ca_certs='/etc/ssl/certs/elasticsearch-ca.pem',
# Performance
timeout=30,
max_retries=3,
retry_on_status=(429, 502, 503, 504),
retry_on_timeout=True,
http_compress=True,
# Reliability
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60,
dead_timeout=30,
# Connection pool
maxsize=25, # Max connections per host
selector_class=RoundRobinSelector,
# Headers
headers={'User-Agent': 'MyApp/1.0'}
)Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch5