High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.
—
Connection configuration and management for single and multiple Elasticsearch clusters with comprehensive support for authentication, SSL/TLS, connection pooling, retry logic, and both synchronous and asynchronous clients. Enables flexible deployment architectures and secure cluster communication.
Core functions for managing Elasticsearch connections.
def create_connection(alias='default', **kwargs):
"""
Create and register new Elasticsearch connection.
Args:
alias (str): Connection alias name (default: 'default')
**kwargs: Connection parameters passed to Elasticsearch client
Returns:
Elasticsearch: Elasticsearch client instance
Connection Parameters:
hosts (list or str): Elasticsearch host(s)
http_auth (tuple): HTTP authentication (username, password)
http_compress (bool): Enable HTTP compression
timeout (int): Request timeout in seconds
max_retries (int): Maximum number of retries
retry_on_timeout (bool): Retry on timeout errors
retry_on_status (list): HTTP status codes to retry on
sniff_on_start (bool): Sniff nodes on startup
sniff_on_connection_fail (bool): Sniff on connection failure
sniff_timeout (int): Sniffer timeout
sniffer_delay (int): Delay between sniff operations
randomize_hosts (bool): Randomize host selection
use_ssl (bool): Use SSL/TLS
verify_certs (bool): Verify SSL certificates
ssl_show_warn (bool): Show SSL warnings
ca_certs (str): Path to CA certificates
client_cert (str): Path to client certificate
client_key (str): Path to client private key
ssl_version (str): SSL/TLS version
ssl_assert_hostname (bool): Assert hostname in certificate
ssl_assert_fingerprint (str): Assert certificate fingerprint
headers (dict): Default HTTP headers
connections_per_node (int): Connections per node
http_compress (bool): Enable gzip compression
cloud_id (str): Elastic Cloud cluster ID
api_key (tuple): API key authentication (id, key)
bearer_auth (str): Bearer token authentication
opaque_id (str): Opaque ID for request identification
"""
def add_connection(alias, connection):
"""
Add existing connection to registry.
Args:
alias (str): Connection alias name
connection: Elasticsearch client instance
"""
def remove_connection(alias):
"""
Remove connection from registry.
Args:
alias (str): Connection alias name
Raises:
KeyError: If connection alias doesn't exist
"""
def get_connection(alias='default'):
"""
Retrieve connection by alias.
Args:
alias (str): Connection alias name (default: 'default')
Returns:
Elasticsearch: Elasticsearch client instance
Raises:
KeyError: If connection alias doesn't exist
"""
def configure(**kwargs):
"""
Configure default connection with given parameters.
Args:
**kwargs: Connection parameters (same as create_connection)
Returns:
Elasticsearch: Configured default connection
"""Global connection registry for managing multiple connections.
class Connections:
"""
Global connection registry singleton.
"""
def create_connection(self, alias='default', **kwargs):
"""Create and register connection (same as module function)."""
def add_connection(self, alias, connection):
"""Add existing connection (same as module function)."""
def remove_connection(self, alias):
"""Remove connection (same as module function)."""
def get_connection(self, alias='default'):
"""Get connection (same as module function)."""
def configure(self, **kwargs):
"""Configure default connection (same as module function)."""
def all(self):
"""
Get all registered connections.
Returns:
dict: Mapping of alias to connection
"""
# Global connections instance
connections: ConnectionsAsynchronous connection management for async/await operations.
def create_async_connection(alias='default', **kwargs):
"""
Create and register new async Elasticsearch connection.
Args:
alias (str): Connection alias name (default: 'default')
**kwargs: Connection parameters (same as create_connection)
Returns:
AsyncElasticsearch: Async Elasticsearch client instance
"""
def add_async_connection(alias, connection):
"""
Add existing async connection to registry.
Args:
alias (str): Connection alias name
connection: AsyncElasticsearch client instance
"""
def remove_async_connection(alias):
"""
Remove async connection from registry.
Args:
alias (str): Connection alias name
Raises:
KeyError: If connection alias doesn't exist
"""
def get_async_connection(alias='default'):
"""
Retrieve async connection by alias.
Args:
alias (str): Connection alias name (default: 'default')
Returns:
AsyncElasticsearch: Async Elasticsearch client instance
Raises:
KeyError: If connection alias doesn't exist
"""
def configure_async(**kwargs):
"""
Configure default async connection with given parameters.
Args:
**kwargs: Connection parameters (same as create_connection)
Returns:
AsyncElasticsearch: Configured default async connection
"""Async connection registry for managing multiple async connections.
class AsyncConnections:
"""
Global async connection registry singleton.
"""
def create_connection(self, alias='default', **kwargs):
"""Create and register async connection."""
def add_connection(self, alias, connection):
"""Add existing async connection."""
def remove_connection(self, alias):
"""Remove async connection."""
def get_connection(self, alias='default'):
"""Get async connection."""
def configure(self, **kwargs):
"""Configure default async connection."""
def all(self):
"""
Get all registered async connections.
Returns:
dict: Mapping of alias to async connection
"""
# Global async connections instance
async_connections: AsyncConnectionsfrom elasticsearch_dsl import connections, Document, Text
# Simple connection to localhost
connections.create_connection(hosts=['localhost:9200'])
# Connection with authentication
connections.create_connection(
alias='secure',
hosts=['https://elasticsearch.example.com:9200'],
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True
)
# Use connection in document operations
class Article(Document):
title = Text()
content = Text()
class Index:
name = 'articles'
# Document operations will use default connection
article = Article(title='Test', content='Content')
article.save()
# Use specific connection
article.save(using='secure')from elasticsearch_dsl import connections
# Production cluster
connections.create_connection(
alias='production',
hosts=['prod-es-1.example.com:9200', 'prod-es-2.example.com:9200'],
http_auth=('prod_user', 'prod_password'),
use_ssl=True,
verify_certs=True,
ca_certs='/path/to/ca.pem',
timeout=30,
max_retries=3,
retry_on_timeout=True,
sniff_on_start=True,
sniff_on_connection_fail=True,
sniff_timeout=10,
randomize_hosts=True
)
# Development cluster
connections.create_connection(
alias='development',
hosts=['dev-es.example.com:9200'],
http_auth=('dev_user', 'dev_password'),
timeout=10,
max_retries=1
)
# Analytics cluster (read-only)
connections.create_connection(
alias='analytics',
hosts=['analytics-es.example.com:9200'],
http_auth=('analytics_user', 'analytics_password'),
use_ssl=True,
timeout=60 # Longer timeout for analytics queries
)
# Use different connections for different operations
from elasticsearch_dsl import Search
# Search production data
search = Search(using='production', index='logs')
response = search.execute()
# Run analytics on dedicated cluster
analytics_search = Search(using='analytics', index='metrics')
analytics_response = analytics_search.execute()from elasticsearch_dsl import connections
# SSL with client certificates
connections.create_connection(
alias='ssl_client_cert',
hosts=['https://secure-es.example.com:9200'],
use_ssl=True,
verify_certs=True,
ca_certs='/path/to/ca-certificates.crt',
client_cert='/path/to/client.crt',
client_key='/path/to/client.key',
ssl_assert_hostname=True
)
# API Key authentication
connections.create_connection(
alias='api_key',
hosts=['https://es.example.com:9200'],
api_key=('api_key_id', 'api_key_secret'),
use_ssl=True,
verify_certs=True
)
# Bearer token authentication
connections.create_connection(
alias='bearer_token',
hosts=['https://es.example.com:9200'],
bearer_auth='your_bearer_token_here',
use_ssl=True
)
# Elastic Cloud connection
connections.create_connection(
alias='elastic_cloud',
cloud_id='cluster_name:base64_encoded_endpoint',
http_auth=('elastic_username', 'elastic_password')
)
# Custom headers and opaque ID
connections.create_connection(
alias='custom_headers',
hosts=['https://es.example.com:9200'],
headers={'Custom-Header': 'value'},
opaque_id='my-application-v1.0',
http_auth=('username', 'password')
)from elasticsearch_dsl import connections
# High-performance configuration
connections.create_connection(
alias='high_performance',
hosts=[
'es-node-01.example.com:9200',
'es-node-02.example.com:9200',
'es-node-03.example.com:9200'
],
# Connection pooling
connections_per_node=10,
# Retry configuration
max_retries=5,
retry_on_timeout=True,
retry_on_status=[429, 502, 503, 504],
# Sniffing for node discovery
sniff_on_start=True,
sniff_on_connection_fail=True,
sniff_timeout=5,
sniffer_delay=60,
# Performance optimizations
http_compress=True,
randomize_hosts=True,
# Timeouts
timeout=20,
# Authentication
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True
)
# Bulk operations configuration
connections.create_connection(
alias='bulk_operations',
hosts=['bulk-es.example.com:9200'],
timeout=300, # 5 minute timeout for bulk operations
max_retries=1, # Fewer retries for bulk
http_compress=True, # Important for bulk data
connections_per_node=20, # More connections for concurrent bulk
http_auth=('bulk_user', 'bulk_password')
)from elasticsearch_dsl import async_connections
import asyncio
async def setup_async_connections():
# Create async connection
await async_connections.create_connection(
alias='async_default',
hosts=['localhost:9200'],
timeout=30,
max_retries=3
)
# Async connection with authentication
await async_connections.create_connection(
alias='async_secure',
hosts=['https://es.example.com:9200'],
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True,
timeout=60
)
# Run async setup
asyncio.run(setup_async_connections())
# Use async connections with AsyncDocument and AsyncSearch
from elasticsearch_dsl import AsyncDocument, AsyncSearch
class AsyncArticle(AsyncDocument):
title = Text()
content = Text()
class Index:
name = 'async_articles'
async def async_operations():
# Create and save document
article = AsyncArticle(title='Async Test', content='Async content')
await article.save(using='async_default')
# Async search
search = AsyncSearch(using='async_secure', index='logs')
search = search.query('match', message='error')
response = await search.execute()
for hit in response:
print(f"Log: {hit.message}")
# Run async operations
asyncio.run(async_operations())from elasticsearch_dsl import connections
from elasticsearch.exceptions import ConnectionError, TransportError
def check_connection_health(alias='default'):
"""Check health of Elasticsearch connection."""
try:
client = connections.get_connection(alias)
# Check cluster health
health = client.cluster.health()
print(f"Cluster status: {health['status']}")
print(f"Number of nodes: {health['number_of_nodes']}")
print(f"Active shards: {health['active_shards']}")
# Check node info
nodes = client.nodes.info()
for node_id, node_info in nodes['nodes'].items():
print(f"Node {node_id}: {node_info['name']} ({node_info['version']})")
# Test with simple search
client.search(index='_all', body={'query': {'match_all': {}}}, size=1)
print("Connection test successful!")
return True
except ConnectionError as e:
print(f"Connection error: {e}")
return False
except TransportError as e:
print(f"Transport error: {e}")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False
# Check all connections
for alias in connections.all():
print(f"Checking connection '{alias}':")
check_connection_health(alias)
print("-" * 40)from elasticsearch_dsl import connections, Document, Text
import os
class ConfigurableDocument(Document):
"""Document that uses environment-based connection selection."""
title = Text()
content = Text()
class Index:
name = 'configurable_docs'
def save(self, **kwargs):
# Use environment-specific connection
env = os.getenv('ENVIRONMENT', 'development')
connection_alias = f'es_{env}'
if connection_alias not in connections.all():
self._setup_connection(env)
return super().save(using=connection_alias, **kwargs)
def _setup_connection(self, env):
"""Setup connection based on environment."""
if env == 'production':
connections.create_connection(
alias='es_production',
hosts=os.getenv('ES_PROD_HOSTS', 'localhost:9200').split(','),
http_auth=(
os.getenv('ES_PROD_USER'),
os.getenv('ES_PROD_PASSWORD')
),
use_ssl=True,
verify_certs=True,
timeout=30
)
elif env == 'staging':
connections.create_connection(
alias='es_staging',
hosts=os.getenv('ES_STAGING_HOSTS', 'localhost:9200').split(','),
http_auth=(
os.getenv('ES_STAGING_USER'),
os.getenv('ES_STAGING_PASSWORD')
),
timeout=20
)
else: # development
connections.create_connection(
alias='es_development',
hosts=['localhost:9200'],
timeout=10
)
# Usage
doc = ConfigurableDocument(title='Test', content='Content')
doc.save() # Will use appropriate connection based on ENVIRONMENT variablefrom elasticsearch_dsl import connections, Search
from elasticsearch.exceptions import (
ConnectionError, ConnectionTimeout, TransportError,
NotFoundError, RequestError
)
import time
def robust_search_with_retry(index, query, max_retries=3, delay=1):
"""Perform search with connection retry logic."""
for attempt in range(max_retries):
try:
search = Search(index=index)
search = search.query('match', content=query)
response = search.execute()
return response
except ConnectionTimeout:
print(f"Attempt {attempt + 1}: Connection timeout")
if attempt < max_retries - 1:
time.sleep(delay * (2 ** attempt)) # Exponential backoff
continue
raise
except ConnectionError as e:
print(f"Attempt {attempt + 1}: Connection error - {e}")
if attempt < max_retries - 1:
# Try to recreate connection
try:
connections.remove_connection('default')
except KeyError:
pass
connections.create_connection(hosts=['localhost:9200'])
time.sleep(delay)
continue
raise
except TransportError as e:
if e.status_code in [429, 502, 503, 504]: # Retry on these errors
print(f"Attempt {attempt + 1}: Transport error {e.status_code}")
if attempt < max_retries - 1:
time.sleep(delay * (2 ** attempt))
continue
raise
except (NotFoundError, RequestError):
# Don't retry on client errors
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
try:
results = robust_search_with_retry('articles', 'elasticsearch')
print(f"Found {len(results)} results")
except Exception as e:
print(f"Search failed after retries: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch-dsl