Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
The Inference API provides machine learning inference capabilities within Elasticsearch, supporting various AI services and models for text embeddings, completions, reranking, and sparse embeddings. It offers a unified interface for integrating with multiple AI providers.
Execute inference tasks using configured inference endpoints for various AI/ML tasks.
def inference(
self,
*,
inference_id: str,
input: Optional[Union[str, List[str]]] = None,
query: Optional[str] = None,
task_settings: Optional[Any] = None,
timeout: Optional[Union[str, int]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Perform general inference on a configured service.
Parameters:
- inference_id: The inference endpoint ID
- input: Input text(s) for inference
- query: Query text for reranking tasks
- task_settings: Task-specific settings
- timeout: Request timeout
Returns:
ObjectApiResponse with inference results
"""
def text_embedding(
self,
*,
inference_id: str,
input: Optional[Union[str, List[str]]] = None,
task_settings: Optional[Any] = None,
timeout: Optional[Union[str, int]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Generate text embeddings using the specified inference service.
Parameters:
- inference_id: The embedding model inference ID
- input: Text or list of texts to embed
- task_settings: Model-specific embedding settings
- timeout: Request timeout
Returns:
ObjectApiResponse with embedding vectors
"""
def sparse_embedding(
self,
*,
inference_id: str,
input: Optional[Union[str, List[str]]] = None,
task_settings: Optional[Any] = None,
timeout: Optional[Union[str, int]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Generate sparse embeddings (e.g., SPLADE) using the specified service.
Parameters:
- inference_id: The sparse embedding model inference ID
- input: Text or list of texts to embed
- task_settings: Model-specific settings
- timeout: Request timeout
Returns:
ObjectApiResponse with sparse embedding vectors
"""
def rerank(
self,
*,
inference_id: str,
input: Optional[List[str]] = None,
query: Optional[str] = None,
task_settings: Optional[Any] = None,
timeout: Optional[Union[str, int]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Rerank documents using the specified reranking service.
Parameters:
- inference_id: The reranking model inference ID
- input: List of documents to rerank
- query: Query text for relevance-based reranking
- task_settings: Reranking-specific settings
- timeout: Request timeout
Returns:
ObjectApiResponse with reranked documents and scores
"""
def completion(
self,
*,
inference_id: str,
input: Optional[Union[str, List[str]]] = None,
task_settings: Optional[Any] = None,
timeout: Optional[Union[str, int]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Generate text completions using the specified language model.
Parameters:
- inference_id: The completion model inference ID
- input: Prompt or list of prompts
- task_settings: Generation settings (temperature, max_tokens, etc.)
- timeout: Request timeout
Returns:
ObjectApiResponse with generated completions
"""from elasticsearch import Elasticsearch
client = Elasticsearch(['http://localhost:9200'])
# Text embeddings for semantic search
embedding_response = client.inference.text_embedding(
inference_id="my-embedding-model",
input=["Hello world", "Machine learning is fascinating"]
)
embeddings = embedding_response.body['embeddings']
# Single text embedding
single_embedding = client.inference.text_embedding(
inference_id="sentence-transformers",
input="This is a sample document for embedding"
)
# Sparse embeddings for keyword-aware search
sparse_response = client.inference.sparse_embedding(
inference_id="splade-model",
input="Natural language processing with transformers"
)
# Document reranking for search relevance
rerank_response = client.inference.rerank(
inference_id="cross-encoder-model",
query="machine learning algorithms",
input=[
"Introduction to machine learning",
"Deep learning with neural networks",
"Statistical analysis methods",
"Reinforcement learning concepts"
]
)
ranked_docs = rerank_response.body['reranked']
# Text completion/generation
completion_response = client.inference.completion(
inference_id="gpt-model",
input="Explain quantum computing in simple terms:",
task_settings={
"max_tokens": 150,
"temperature": 0.7
}
)
generated_text = completion_response.body['completion']Create, update, and manage inference endpoints for various AI services.
def put(
self,
*,
inference_id: str,
task_type: str,
inference_config: Dict[str, Any],
**kwargs
) -> ObjectApiResponse[Any]:
"""
Create or update a generic inference endpoint.
Parameters:
- inference_id: Unique identifier for the inference endpoint
- task_type: Type of task (text_embedding, completion, rerank, sparse_embedding)
- inference_config: Service-specific configuration
Returns:
ObjectApiResponse confirming endpoint creation
"""
def get(
self,
*,
inference_id: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Get inference endpoint configuration(s).
Parameters:
- inference_id: Specific endpoint ID (omit for all endpoints)
Returns:
ObjectApiResponse with endpoint configuration(s)
"""
def delete(
self,
*,
inference_id: str,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Delete an inference endpoint.
Parameters:
- inference_id: The inference endpoint ID to delete
Returns:
ObjectApiResponse confirming deletion
"""
def update(
self,
*,
inference_id: str,
inference_config: Optional[Dict[str, Any]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Update an existing inference endpoint configuration.
Parameters:
- inference_id: The inference endpoint ID to update
- inference_config: Updated configuration
Returns:
ObjectApiResponse confirming update
"""The Inference API provides specialized methods for configuring popular AI service providers.
def put_openai(
self,
*,
inference_id: str,
task_type: str,
api_key: Optional[str] = None,
model_id: Optional[str] = None,
organization_id: Optional[str] = None,
url: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure OpenAI inference endpoint."""
def put_azureopenai(
self,
*,
inference_id: str,
task_type: str,
api_key: Optional[str] = None,
api_version: Optional[str] = None,
deployment_id: Optional[str] = None,
resource_name: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Azure OpenAI inference endpoint."""
def put_hugging_face(
self,
*,
inference_id: str,
task_type: str,
api_key: Optional[str] = None,
model_id: Optional[str] = None,
url: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Hugging Face inference endpoint."""
def put_cohere(
self,
*,
inference_id: str,
task_type: str,
api_key: Optional[str] = None,
model_id: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Cohere inference endpoint."""
def put_anthropic(
self,
*,
inference_id: str,
task_type: str,
api_key: Optional[str] = None,
model_id: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Anthropic inference endpoint."""
def put_amazonbedrock(
self,
*,
inference_id: str,
task_type: str,
access_key_id: Optional[str] = None,
secret_access_key: Optional[str] = None,
region: Optional[str] = None,
model_id: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Amazon Bedrock inference endpoint."""
def put_googlevertexai(
self,
*,
inference_id: str,
task_type: str,
service_account_json: Optional[str] = None,
project_id: Optional[str] = None,
location: Optional[str] = None,
model_id: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Google Vertex AI inference endpoint."""
def put_googleaistudio(
self,
*,
inference_id: str,
task_type: str,
api_key: Optional[str] = None,
model_id: Optional[str] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Google AI Studio inference endpoint."""
def put_elasticsearch(
self,
*,
inference_id: str,
task_type: str,
model_id: str,
num_allocations: Optional[int] = None,
num_threads: Optional[int] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Elasticsearch built-in model inference endpoint."""
def put_elser(
self,
*,
inference_id: str,
num_allocations: Optional[int] = None,
num_threads: Optional[int] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""Configure Elasticsearch Learned Sparse Encoder (ELSER) endpoint."""# OpenAI embeddings
client.inference.put_openai(
inference_id="openai-embeddings",
task_type="text_embedding",
api_key="sk-...",
model_id="text-embedding-ada-002"
)
# Azure OpenAI completions
client.inference.put_azureopenai(
inference_id="azure-gpt4",
task_type="completion",
api_key="...",
api_version="2024-02-01",
resource_name="my-resource",
deployment_id="gpt-4-deployment"
)
# Hugging Face sentence transformers
client.inference.put_hugging_face(
inference_id="sentence-transformers",
task_type="text_embedding",
api_key="hf_...",
model_id="sentence-transformers/all-MiniLM-L6-v2"
)
# Cohere reranking
client.inference.put_cohere(
inference_id="cohere-rerank",
task_type="rerank",
api_key="...",
model_id="rerank-english-v2.0"
)
# Elasticsearch ELSER for sparse embeddings
client.inference.put_elser(
inference_id="elser-sparse",
num_allocations=1,
num_threads=2
)
# Amazon Bedrock
client.inference.put_amazonbedrock(
inference_id="bedrock-titan",
task_type="text_embedding",
access_key_id="AKIA...",
secret_access_key="...",
region="us-east-1",
model_id="amazon.titan-embed-text-v1"
)
# Google Vertex AI
client.inference.put_googlevertexai(
inference_id="vertex-palm",
task_type="completion",
service_account_json='{"type": "service_account", ...}',
project_id="my-project",
location="us-central1",
model_id="text-bison@001"
)Extended support for more AI service providers:
def put_mistral(self, *, inference_id: str, task_type: str, api_key: str, model_id: str, **kwargs):
"""Configure Mistral AI inference endpoint."""
def put_voyageai(self, *, inference_id: str, task_type: str, api_key: str, model_id: str, **kwargs):
"""Configure VoyageAI inference endpoint."""
def put_jinaai(self, *, inference_id: str, task_type: str, api_key: str, model_id: str, **kwargs):
"""Configure Jina AI inference endpoint."""
def put_deepseek(self, *, inference_id: str, task_type: str, api_key: str, model_id: str, **kwargs):
"""Configure DeepSeek inference endpoint."""
def put_watsonx(self, *, inference_id: str, task_type: str, api_key: str, project_id: str, model_id: str, **kwargs):
"""Configure IBM watsonx inference endpoint."""
def put_azureaistudio(self, *, inference_id: str, task_type: str, api_key: str, target: str, **kwargs):
"""Configure Azure AI Studio inference endpoint."""
def put_alibabacloud(self, *, inference_id: str, task_type: str, api_key: str, model_id: str, **kwargs):
"""Configure Alibaba Cloud inference endpoint."""
def put_amazonsagemaker(self, *, inference_id: str, task_type: str, access_key_id: str, secret_access_key: str, region: str, endpoint_name: str, **kwargs):
"""Configure Amazon SageMaker inference endpoint."""
def put_custom(self, *, inference_id: str, task_type: str, url: str, **kwargs):
"""Configure custom inference endpoint."""# 1. Configure embedding service
client.inference.put_openai(
inference_id="embeddings",
task_type="text_embedding",
api_key="sk-...",
model_id="text-embedding-ada-002"
)
# 2. Create index with dense vector field
client.indices.create(
index="documents",
mappings={
"properties": {
"content": {"type": "text"},
"embedding": {
"type": "dense_vector",
"dims": 1536,
"index": True,
"similarity": "cosine"
}
}
}
)
# 3. Index documents with embeddings
doc = "Machine learning transforms data into insights"
embedding = client.inference.text_embedding(
inference_id="embeddings",
input=doc
)
client.index(
index="documents",
document={
"content": doc,
"embedding": embedding.body['embeddings'][0]['embedding']
}
)
# 4. Search with semantic similarity
query_embedding = client.inference.text_embedding(
inference_id="embeddings",
input="AI and data analysis"
)
results = client.search(
index="documents",
knn={
"field": "embedding",
"query_vector": query_embedding.body['embeddings'][0]['embedding'],
"k": 10,
"num_candidates": 100
}
)# 1. Retrieve relevant documents
query = "What is quantum computing?"
query_embedding = client.inference.text_embedding(
inference_id="embeddings",
input=query
)
search_results = client.search(
index="knowledge_base",
knn={
"field": "embedding",
"query_vector": query_embedding.body['embeddings'][0]['embedding'],
"k": 5
}
)
# 2. Rerank results for better relevance
documents = [hit['_source']['content'] for hit in search_results.body['hits']['hits']]
reranked = client.inference.rerank(
inference_id="cohere-rerank",
query=query,
input=documents
)
# 3. Generate response with context
top_docs = [documents[idx] for idx in reranked.body['reranked'][:3]]
context = "\n\n".join(top_docs)
prompt = f"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
response = client.inference.completion(
inference_id="gpt-4",
input=prompt,
task_settings={"max_tokens": 200, "temperature": 0.3}
)
answer = response.body['completion']from typing import Any, Dict, List, Optional, Union
# Task types
TaskType = Literal["text_embedding", "sparse_embedding", "completion", "rerank"]
# Service configurations
class InferenceConfig:
service: str # Service provider name
service_settings: Dict[str, Any] # Provider-specific settings
task_settings: Dict[str, Any] # Task-specific settings
# Response types
class EmbeddingResponse:
embeddings: List[Dict[str, Any]] # Embedding vectors with metadata
class CompletionResponse:
completion: str # Generated text
usage: Optional[Dict[str, int]] # Token usage statistics
class RerankResponse:
reranked: List[int] # Reordered document indices
scores: List[float] # Relevance scores
class SparseEmbeddingResponse:
embeddings: List[Dict[str, Dict[str, float]]] # Sparse vector representationsInstall with Tessl CLI
npx tessl i tessl/pypi-elasticsearch