Python client for Elasticsearch 5.x providing comprehensive access to all Elasticsearch APIs and features.
—
Essential CRUD operations for working with individual documents in Elasticsearch. These operations provide the foundation for document-based interactions including creation, retrieval, updates, and deletion.
Create new documents with explicit IDs, ensuring the document doesn't already exist.
def create(index: str, doc_type: str, id: str, body: dict, **params) -> dict:
"""
Create a new document with the specified ID.
Parameters:
- index: Index name where the document will be stored
- doc_type: Document type (use '_doc' for Elasticsearch 6.x+ compatibility)
- id: Unique document identifier
- body: Document content as a dictionary
- refresh: Control when changes are visible ('true', 'false', 'wait_for')
- routing: Routing value for document placement
- timeout: Request timeout
- version: Expected document version for optimistic concurrency
- version_type: Version type ('internal', 'external', 'external_gte')
Returns:
dict: Response containing '_index', '_id', '_version', 'result', and '_shards'
Raises:
ConflictError: If document with the same ID already exists
"""Index documents (create or update) with optional auto-generated IDs.
def index(index: str, doc_type: str, body: dict, id: str = None, **params) -> dict:
"""
Index a document (create new or update existing).
Parameters:
- index: Index name where the document will be stored
- doc_type: Document type
- body: Document content as a dictionary
- id: Document ID (auto-generated if not provided)
- op_type: Operation type ('index', 'create')
- refresh: Control when changes are visible
- routing: Routing value for document placement
- timeout: Request timeout
- version: Expected document version
- version_type: Version type ('internal', 'external', 'external_gte')
- pipeline: Ingest pipeline to process document
Returns:
dict: Response with document metadata and operation result
"""Retrieve documents by ID with support for field filtering and routing.
def get(index: str, id: str, doc_type: str = '_all', **params) -> dict:
"""
Retrieve a document by its ID.
Parameters:
- index: Index name containing the document
- id: Document identifier
- doc_type: Document type (default '_all' searches all types)
- _source: Fields to include/exclude in response
- _source_excludes: Fields to exclude from _source
- _source_includes: Fields to include in _source
- routing: Routing value used when indexing
- preference: Node preference for request execution
- realtime: Whether to retrieve from transaction log (true) or search (false)
- refresh: Refresh index before retrieval
- version: Expected document version
- version_type: Version type for version checking
Returns:
dict: Document with '_source', '_id', '_version', and metadata
Raises:
NotFoundError: If document doesn't exist
"""
def get_source(index: str, doc_type: str, id: str, **params) -> dict:
"""
Retrieve only the document source (_source field).
Parameters:
- index: Index name
- doc_type: Document type
- id: Document identifier
- _source_excludes: Fields to exclude
- _source_includes: Fields to include
- routing: Routing value
- preference: Node preference
- realtime: Real-time retrieval flag
- refresh: Refresh before retrieval
- version: Expected version
- version_type: Version type
Returns:
dict: Document source content only
"""Check if documents exist without retrieving full content.
def exists(index: str, doc_type: str, id: str, **params) -> bool:
"""
Check if a document exists.
Parameters:
- index: Index name
- doc_type: Document type
- id: Document identifier
- routing: Routing value
- preference: Node preference
- realtime: Real-time check flag
- refresh: Refresh before check
- version: Expected version
- version_type: Version type
Returns:
bool: True if document exists, False otherwise
"""
def exists_source(index: str, doc_type: str, id: str, **params) -> bool:
"""
Check if document source exists.
Parameters: Same as exists()
Returns:
bool: True if document source exists
"""Update existing documents with partial updates or script-based modifications.
def update(index: str, doc_type: str, id: str, body: dict = None, **params) -> dict:
"""
Update an existing document.
Parameters:
- index: Index name
- doc_type: Document type
- id: Document identifier
- body: Update specification with 'doc', 'script', or 'upsert'
- retry_on_conflict: Number of retry attempts on version conflicts
- routing: Routing value
- timeout: Request timeout
- refresh: Control when changes are visible
- _source: Fields to return in response
- version: Expected current version
- version_type: Version type
- wait_for_active_shards: Wait for N shards to be active
Body structure:
{
"doc": {"field": "new_value"}, # Partial document update
"script": { # Script-based update
"source": "ctx._source.counter += params.increment",
"params": {"increment": 1}
},
"upsert": {"field": "default_value"} # Create if doesn't exist
}
Returns:
dict: Update result with '_version', 'result', and optionally 'get'
Raises:
NotFoundError: If document doesn't exist and no upsert provided
"""Delete documents by ID with support for routing and versioning.
def delete(index: str, doc_type: str, id: str, **params) -> dict:
"""
Delete a document by ID.
Parameters:
- index: Index name
- doc_type: Document type
- id: Document identifier
- routing: Routing value used when indexing
- timeout: Request timeout
- refresh: Control when changes are visible
- version: Expected document version
- version_type: Version type
- wait_for_active_shards: Wait for N shards to be active
Returns:
dict: Deletion result with '_version', 'result', and '_shards'
Raises:
NotFoundError: If document doesn't exist
"""Retrieve multiple documents in a single request for improved performance.
def mget(body: dict, index: str = None, doc_type: str = None, **params) -> dict:
"""
Retrieve multiple documents by their IDs.
Parameters:
- body: Multi-get request specification
- index: Default index name for documents without explicit index
- doc_type: Default document type
- _source: Default fields to include/exclude
- _source_excludes: Default fields to exclude
- _source_includes: Default fields to include
- preference: Node preference
- realtime: Real-time retrieval flag
- refresh: Refresh before retrieval
- routing: Default routing value
Body structure:
{
"docs": [
{"_index": "my_index", "_type": "_doc", "_id": "1"},
{"_index": "my_index", "_type": "_doc", "_id": "2", "_source": ["title"]},
{"_index": "other_index", "_type": "_doc", "_id": "3"}
]
}
Or with default index/type:
{
"ids": ["1", "2", "3"]
}
Returns:
dict: Response with 'docs' array containing each document or error
"""from elasticsearch5 import Elasticsearch
es = Elasticsearch(['localhost:9200'])
# Create a document
doc = {
'title': 'My Article',
'content': 'This is the article content',
'author': 'John Doe',
'created_at': '2023-01-01T12:00:00'
}
# Index with auto-generated ID
result = es.index(index='articles', doc_type='_doc', body=doc)
doc_id = result['_id']
# Create with explicit ID (fails if exists)
try:
es.create(index='articles', doc_type='_doc', id='article-1', body=doc)
except es.ConflictError:
print("Document already exists")
# Check if document exists
if es.exists(index='articles', doc_type='_doc', id=doc_id):
# Get the document
retrieved = es.get(index='articles', doc_type='_doc', id=doc_id)
print(f"Document: {retrieved['_source']}")# Partial document update
update_body = {
'doc': {
'content': 'Updated article content',
'updated_at': '2023-01-02T12:00:00'
}
}
es.update(index='articles', doc_type='_doc', id=doc_id, body=update_body)
# Script-based update
script_update = {
'script': {
'source': 'ctx._source.view_count = (ctx._source.view_count ?: 0) + 1'
}
}
es.update(index='articles', doc_type='_doc', id=doc_id, body=script_update)
# Upsert (update or insert)
upsert_body = {
'doc': {'title': 'New Title'},
'upsert': {'title': 'Default Title', 'created_at': '2023-01-01T00:00:00'}
}
es.update(index='articles', doc_type='_doc', id='new-article', body=upsert_body)# Retrieve multiple documents
mget_body = {
'docs': [
{'_index': 'articles', '_type': '_doc', '_id': doc_id},
{'_index': 'articles', '_type': '_doc', '_id': 'article-2', '_source': ['title', 'author']}
]
}
results = es.mget(body=mget_body)
for doc in results['docs']:
if doc['found']:
print(f"Found: {doc['_source']}")
else:
print(f"Not found: {doc['_id']}")Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch5