Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
Comprehensive exception hierarchy for handling various error conditions including HTTP errors, connection issues, authentication failures, and serialization problems. The client provides specific exception types for different error scenarios to enable proper error handling.
class ApiError(Exception):
"""
Base class for all API-related errors.
Attributes:
- message: Error message
- meta: API response metadata
- body: Response body containing error details
"""
def __init__(self, message, meta=None, body=None): ...
@property
def status_code(self) -> int: ...
@property
def error(self) -> str: ...
@property
def info(self) -> Dict[str, Any]: ...
class TransportError(Exception):
"""
Transport layer errors from elastic-transport library.
Attributes:
- message: Error description
- errors: List of underlying errors
"""
def __init__(self, message, errors=None): ...
class SerializationError(Exception):
"""
Errors during JSON serialization/deserialization.
Attributes:
- message: Error description
- original_exception: Underlying serialization error
"""
def __init__(self, message, original_exception=None): ...
class ConnectionError(TransportError):
"""
Connection-related errors including network issues.
Attributes:
- message: Error description
- errors: Connection error details
"""
class SSLError(ConnectionError):
"""
SSL/TLS certificate and connection errors.
Attributes:
- message: SSL error description
- errors: SSL error details
"""
class ConnectionTimeout(ConnectionError):
"""
Connection timeout errors.
Attributes:
- message: Timeout error description
- timeout: Timeout value that was exceeded
"""class BadRequestError(ApiError):
"""
HTTP 400 Bad Request errors.
Raised when:
- Invalid query syntax
- Missing required parameters
- Invalid parameter values
- Malformed request body
"""
class AuthenticationException(ApiError):
"""
HTTP 401 Unauthorized errors.
Raised when:
- Invalid credentials
- Missing authentication
- Expired tokens
- Invalid API keys
"""
class AuthorizationException(ApiError):
"""
HTTP 403 Forbidden errors.
Raised when:
- Insufficient privileges
- Access to restricted resources
- Security policy violations
"""
class NotFoundError(ApiError):
"""
HTTP 404 Not Found errors.
Raised when:
- Index does not exist
- Document not found
- Template not found
- Alias not found
"""
class ConflictError(ApiError):
"""
HTTP 409 Conflict errors.
Raised when:
- Version conflicts during updates
- Resource already exists
- Concurrent modification conflicts
- Index already exists
"""
class UnsupportedProductError(ApiError):
"""
Error when connecting to unsupported Elasticsearch products.
Raised when:
- Connecting to incompatible server
- Server doesn't identify as Elasticsearch
- Unsupported server version
"""class ElasticsearchWarning(UserWarning):
"""
Deprecation and usage warnings from Elasticsearch.
Attributes:
- message: Warning message
- category: Warning category
"""
# Legacy alias
ElasticsearchDeprecationWarning = ElasticsearchWarning# Legacy aliases for backward compatibility
RequestError = BadRequestErrorclass ErrorInfo:
"""
Elasticsearch error information structure.
Attributes:
- type: Error type (e.g., 'index_not_found_exception')
- reason: Human-readable error description
- index: Index name (if applicable)
- shard: Shard information (if applicable)
- caused_by: Nested error information
- root_cause: Root cause of the error
- stack_trace: Server-side stack trace (if available)
"""
def __init__(self, error_dict: Dict[str, Any]): ...
@property
def type(self) -> str: ...
@property
def reason(self) -> str: ...
@property
def index(self) -> Optional[str]: ...from elasticsearch import Elasticsearch, NotFoundError, ConflictError, ApiError
client = Elasticsearch(hosts=['http://localhost:9200'])
try:
# Attempt to get a document
response = client.get(index="products", id="123")
print(response.body['_source'])
except NotFoundError as e:
print(f"Document not found: {e.message}")
print(f"Index: {e.info.get('index')}")
print(f"ID: {e.info.get('id')}")
except ApiError as e:
print(f"API error: {e.message}")
print(f"Status code: {e.status_code}")
print(f"Error type: {e.error}")from elasticsearch import ConflictError
try:
# Update with version control
client.update(
index="products",
id="123",
document={"price": 99.99},
if_seq_no=10,
if_primary_term=1
)
except ConflictError as e:
print(f"Version conflict: {e.message}")
# Get current document version
current_doc = client.get(index="products", id="123")
current_seq_no = current_doc.body['_seq_no']
current_primary_term = current_doc.body['_primary_term']
# Retry with current version
try:
client.update(
index="products",
id="123",
document={"price": 99.99},
if_seq_no=current_seq_no,
if_primary_term=current_primary_term
)
except ConflictError:
print("Still conflicting, manual intervention required")from elasticsearch import ConnectionError, ConnectionTimeout, SSLError
try:
client = Elasticsearch(
hosts=['https://elasticsearch.example.com:9200'],
http_auth=('username', 'password'),
timeout=30
)
response = client.ping()
except ConnectionTimeout as e:
print(f"Connection timed out: {e.message}")
print("Try increasing the timeout or check network connectivity")
except SSLError as e:
print(f"SSL error: {e.message}")
print("Check SSL certificate configuration")
except ConnectionError as e:
print(f"Connection failed: {e.message}")
print("Check if Elasticsearch is running and accessible")from elasticsearch.helpers import BulkIndexError, bulk
def safe_bulk_index(client, actions):
"""Safely perform bulk indexing with error handling."""
try:
success_count, failed_items = bulk(
client,
actions,
raise_on_error=False,
raise_on_exception=False
)
print(f"Successfully indexed: {success_count} documents")
if failed_items:
print(f"Failed to index: {len(failed_items)} documents")
for item in failed_items:
print(f"Failed item: {item}")
except BulkIndexError as e:
print(f"Bulk indexing failed: {e}")
# Process individual errors
for error in e.errors:
action = error.get('index', error.get('create', error.get('update', {})))
if 'error' in action:
print(f"Error in document {action.get('_id', 'unknown')}: {action['error']['reason']}")
# Usage
actions = [
{
"_index": "products",
"_id": "1",
"_source": {"name": "Product 1", "price": 10.99}
},
{
"_index": "products",
"_id": "2",
"_source": {"name": "Product 2", "price": 20.99}
}
]
safe_bulk_index(client, actions)from elasticsearch import AuthenticationException, AuthorizationException
try:
client = Elasticsearch(
hosts=['http://localhost:9200'],
http_auth=('username', 'wrong_password')
)
# Attempt an operation that requires authentication
client.cluster.health()
except AuthenticationException as e:
print(f"Authentication failed: {e.message}")
print("Check username and password")
except AuthorizationException as e:
print(f"Authorization failed: {e.message}")
print("User lacks required privileges")from elasticsearch import (
Elasticsearch,
ApiError,
ConnectionError,
NotFoundError,
ConflictError,
AuthenticationException,
SerializationError
)
import logging
class ElasticsearchClient:
"""Wrapper with comprehensive error handling."""
def __init__(self, **kwargs):
self.client = Elasticsearch(**kwargs)
self.logger = logging.getLogger(__name__)
def safe_get(self, index, doc_id, **kwargs):
"""Safely get a document with proper error handling."""
try:
return self.client.get(index=index, id=doc_id, **kwargs)
except NotFoundError:
self.logger.info(f"Document {doc_id} not found in index {index}")
return None
except ConnectionError as e:
self.logger.error(f"Connection error: {e}")
raise
except ApiError as e:
self.logger.error(f"API error: {e.message} (status: {e.status_code})")
raise
def safe_index(self, index, doc_id, document, **kwargs):
"""Safely index a document with retry logic."""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
return self.client.index(
index=index,
id=doc_id,
document=document,
**kwargs
)
except ConflictError as e:
if retry_count == max_retries - 1:
self.logger.error(f"Version conflict after {max_retries} retries")
raise
self.logger.warning(f"Version conflict, retrying... ({retry_count + 1}/{max_retries})")
retry_count += 1
except SerializationError as e:
self.logger.error(f"Serialization error: {e}")
raise
except ApiError as e:
self.logger.error(f"API error during indexing: {e.message}")
raise
# Usage
client = ElasticsearchClient(hosts=['http://localhost:9200'])
# Safe operations
doc = client.safe_get("products", "123")
if doc:
print(f"Found document: {doc.body['_source']}")
client.safe_index(
"products",
"124",
{"name": "New Product", "price": 29.99}
)Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch