Python client for Elasticsearch with comprehensive API coverage and both sync and async support
npx @tessl/cli install tessl/pypi-elasticsearch@9.1.0The official Python client for Elasticsearch, providing comprehensive access to Elasticsearch's REST APIs through both synchronous and asynchronous interfaces. The client handles connection management, request serialization, error handling, and response processing while maintaining compatibility with all Elasticsearch versions.
pip install elasticsearchfrom elasticsearch import Elasticsearch, AsyncElasticsearchFor exceptions and serializers:
from elasticsearch import (
ApiError,
NotFoundError,
ConflictError,
AuthenticationException,
JsonSerializer,
OrjsonSerializer
)For DSL (Domain Specific Language):
from elasticsearch.dsl import Search, Q, AFor helper functions:
from elasticsearch.helpers import bulk, scan
from elasticsearch.helpers.vectorstore import VectorStore, DenseVectorStrategyFor ES|QL query building:
from elasticsearch.esql import ESQL, and_, or_, not_from elasticsearch import Elasticsearch
# Connect to Elasticsearch
client = Elasticsearch(
hosts=['http://localhost:9200'],
http_auth=('username', 'password'),
timeout=30
)
# Index a document
response = client.index(
index="my-index",
id=1,
document={"title": "Test document", "content": "This is a test"}
)
# Search for documents
search_response = client.search(
index="my-index",
query={"match": {"title": "test"}}
)
# Get document by ID
doc = client.get(index="my-index", id=1)import asyncio
from elasticsearch import AsyncElasticsearch
async def main():
# Connect to Elasticsearch asynchronously
client = AsyncElasticsearch(
hosts=['http://localhost:9200'],
http_auth=('username', 'password')
)
# Index a document
await client.index(
index="my-index",
id=1,
document={"title": "Test document", "content": "This is a test"}
)
# Search for documents
response = await client.search(
index="my-index",
query={"match": {"title": "test"}}
)
await client.close()
asyncio.run(main())The Elasticsearch Python client is built on a layered architecture:
Both synchronous and asynchronous clients share identical APIs, with async methods returning coroutines. The client automatically handles connection pooling, load balancing, node failure detection, and retry logic.
Core document and search operations including indexing, searching, updating, and deleting documents. Provides the fundamental operations for interacting with Elasticsearch clusters.
class Elasticsearch:
def __init__(self, hosts=None, **kwargs): ...
def index(self, index, document, id=None, **kwargs): ...
def get(self, index, id, **kwargs): ...
def search(self, index=None, query=None, **kwargs): ...
def update(self, index, id, document=None, script=None, **kwargs): ...
def delete(self, index, id, **kwargs): ...
def bulk(self, operations, index=None, **kwargs): ...Index lifecycle operations including creation, deletion, mapping updates, settings configuration, and maintenance operations like refresh and flush.
class IndicesClient:
def create(self, index, mappings=None, settings=None, **kwargs): ...
def delete(self, index, **kwargs): ...
def get(self, index, **kwargs): ...
def put_mapping(self, index, properties=None, **kwargs): ...
def get_mapping(self, index=None, **kwargs): ...
def put_settings(self, index, settings, **kwargs): ...Advanced search capabilities including query DSL, aggregations, highlighting, sorting, and specialized search types like async search and scroll.
def search(self, index=None, query=None, aggs=None, sort=None, **kwargs): ...
def msearch(self, searches, index=None, **kwargs): ...
def scroll(self, scroll_id, scroll='5m', **kwargs): ...
def search_template(self, index=None, id=None, params=None, **kwargs): ...Cluster-level operations for monitoring health, managing settings, shard allocation, and node management.
class ClusterClient:
def health(self, index=None, **kwargs): ...
def state(self, index=None, metric=None, **kwargs): ...
def stats(self, node_id=None, **kwargs): ...
def put_settings(self, persistent=None, transient=None, **kwargs): ...User and role management, API key operations, token management, and security configuration for Elasticsearch clusters with security features enabled.
class SecurityClient:
def authenticate(self, **kwargs): ...
def create_user(self, username, password, roles=None, **kwargs): ...
def create_role(self, name, cluster=None, indices=None, **kwargs): ...
def create_api_key(self, name, role_descriptors=None, **kwargs): ...Machine learning job management, anomaly detection, data analysis, and model operations for Elasticsearch's ML capabilities.
class MlClient:
def put_job(self, job_id, analysis_config, data_description, **kwargs): ...
def open_job(self, job_id, **kwargs): ...
def put_datafeed(self, datafeed_id, indices, job_id, **kwargs): ...
def start_datafeed(self, datafeed_id, **kwargs): ...Index lifecycle management (ILM), snapshot lifecycle management (SLM), and data stream operations for automated data management.
class IlmClient:
def put_policy(self, name, policy, **kwargs): ...
def get_policy(self, name=None, **kwargs): ...
def explain_lifecycle(self, index, **kwargs): ...
class SlmClient:
def put_policy(self, name, schedule, name_pattern, repository, **kwargs): ...The Elasticsearch Python client includes many specialized namespace clients for advanced functionality:
Text Analysis and Data Processing:
client.connector - Elasticsearch connector management for data ingestionclient.text_structure - Text analysis and structure detectionclient.synonyms - Synonyms management for search enhancementclient.query_rules - Query rule management for search relevance tuningFleet and Monitoring:
client.fleet - Elastic Agent fleet management for observabilityclient.monitoring - Cluster and node monitoring operationsclient.features - Elasticsearch features and capabilities discoverySearch Applications:
client.search_application - Search application management and configurationclient.eql - Event Query Language for security and observabilityclient.sql - SQL query interface for ElasticsearchData Management:
client.transform - Data transformation and pivot operationsclient.rollup - Historical data rollup and aggregationclient.ccr - Cross-cluster replication managementclient.enrich - Document enrichment with lookup dataAdministrative:
client.license - License management and validationclient.ssl - SSL certificate managementclient.shutdown - Node shutdown coordinationclient.migration - Cluster migration assistanceAdvanced Features:
client.graph - Graph analytics and explorationclient.watcher - Alerting and notification systemclient.autoscaling - Automatic scaling policiesclient.searchable_snapshots - Snapshot-based cold storageUtility functions for common operations like bulk indexing, scanning large result sets, and reindexing data between indices.
def bulk(client, actions, **kwargs): ...
def scan(client, query=None, scroll='5m', **kwargs): ...
def reindex(client, source_index, target_index, **kwargs): ...Pythonic query building with the Domain Specific Language for constructing complex Elasticsearch queries, aggregations, and document modeling.
class Search:
def __init__(self, using=None, index=None, **kwargs): ...
def query(self, q): ...
def filter(self, f): ...
def aggregate(self, name, agg): ...
def sort(self, *keys): ...
class Q:
@classmethod
def match(cls, **kwargs): ...
@classmethod
def term(cls, **kwargs): ...
@classmethod
def range(cls, **kwargs): ...Execute Elasticsearch Query Language (ES|QL) queries with both synchronous and asynchronous support, providing SQL-like syntax for data analysis and reporting.
class EsqlClient:
def query(self, query: str, **kwargs): ...
def async_query(self, query: str, keep_alive: str, **kwargs): ...
def async_query_get(self, id: str, **kwargs): ...
def async_query_delete(self, id: str, **kwargs): ...
def list_queries(self, **kwargs): ...Machine learning inference operations supporting multiple AI service providers for text embeddings, completions, reranking, and sparse embeddings.
class InferenceClient:
def inference(self, inference_id: str, input: Union[str, List[str]], **kwargs): ...
def text_embedding(self, inference_id: str, input: Union[str, List[str]], **kwargs): ...
def sparse_embedding(self, inference_id: str, input: Union[str, List[str]], **kwargs): ...
def rerank(self, inference_id: str, query: str, input: List[str], **kwargs): ...
def completion(self, inference_id: str, input: Union[str, List[str]], **kwargs): ...
def put_openai(self, inference_id: str, task_type: str, **kwargs): ...
def put_hugging_face(self, inference_id: str, task_type: str, **kwargs): ...
def put_cohere(self, inference_id: str, task_type: str, **kwargs): ...High-level abstractions for building vector search applications with pluggable retrieval strategies, embedding services, and support for semantic search patterns.
class VectorStore:
def __init__(self, client: Elasticsearch, index: str, retrieval_strategy: RetrievalStrategy, **kwargs): ...
def add_documents(self, documents: List[Dict], vectors: Optional[List[List[float]]] = None, **kwargs): ...
def search(self, query: Optional[str] = None, query_vector: Optional[List[float]] = None, **kwargs): ...
def max_marginal_relevance_search(self, query: str, k: int = 4, **kwargs): ...
class DenseVectorStrategy(RetrievalStrategy): ...
class SparseVectorStrategy(RetrievalStrategy): ...
class BM25Strategy(RetrievalStrategy): ...
class ElasticsearchEmbeddings(EmbeddingService): ...Comprehensive exception hierarchy for handling various error conditions including HTTP errors, connection issues, and authentication failures.
class ApiError(Exception): ...
class NotFoundError(ApiError): ...
class ConflictError(ApiError): ...
class AuthenticationException(ApiError): ...
class ConnectionError(Exception): ...
class SerializationError(Exception): ...from typing import Any, Dict, List, Optional, Union
# Common type aliases
Document = Dict[str, Any]
Query = Dict[str, Any]
Mapping = Dict[str, Any]
Settings = Dict[str, Any]
Headers = Dict[str, str]
Hosts = Union[str, List[str], List[Dict[str, Any]]]
# Response types
class ObjectApiResponse:
def __init__(self, body: Any, meta: Any): ...
@property
def body(self) -> Any: ...
@property
def meta(self) -> Any: ...
class ApiResponse:
@property
def status_code(self) -> int: ...
@property
def headers(self) -> Dict[str, str]: ...