Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
Machine learning job management, anomaly detection, data analysis, and model operations for Elasticsearch's ML capabilities. These operations provide comprehensive machine learning functionality for detecting anomalies, forecasting, and data analysis.
Create and manage machine learning jobs for anomaly detection.
def put_job(
self,
job_id: str,
analysis_config: Dict[str, Any],
data_description: Dict[str, Any],
description: Optional[str] = None,
results_index_name: Optional[str] = None,
groups: Optional[List[str]] = None,
model_snapshot_retention_days: Optional[int] = None,
daily_model_snapshot_retention_after_days: Optional[int] = None,
analysis_limits: Optional[Dict[str, Any]] = None,
background_persist_interval: Optional[str] = None,
custom_settings: Optional[Dict[str, Any]] = None,
model_plot_config: Optional[Dict[str, Any]] = None,
renormalization_window_days: Optional[int] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create a machine learning job.
Parameters:
- job_id: Unique identifier for the job
- analysis_config: Analysis configuration including detectors
- data_description: Description of input data format
- description: Human-readable job description
- results_index_name: Index for storing results
- groups: Job groups for organization
- model_snapshot_retention_days: Retention period for model snapshots
- daily_model_snapshot_retention_after_days: Daily snapshot retention threshold
- analysis_limits: Memory and processing limits
- background_persist_interval: Interval for persisting model updates
- custom_settings: Custom job settings
- model_plot_config: Model plot configuration
- renormalization_window_days: Window for model renormalization
Returns:
ObjectApiResponse with job creation result
"""
def get_jobs(
self,
job_id: Optional[str] = None,
allow_no_match: Optional[bool] = None,
exclude_generated: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get machine learning job information.
Parameters:
- job_id: Job ID or pattern to retrieve
- allow_no_match: Whether to ignore if no jobs match
- exclude_generated: Whether to exclude generated configurations
Returns:
ObjectApiResponse with job information
"""
def delete_job(
self,
job_id: str,
force: Optional[bool] = None,
wait_for_completion: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Delete a machine learning job.
Parameters:
- job_id: Job ID to delete
- force: Whether to force deletion of running job
- wait_for_completion: Whether to wait for deletion to complete
Returns:
ObjectApiResponse with deletion result
"""
def open_job(
self,
job_id: str,
ignore_downtime: Optional[bool] = None,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Open a machine learning job.
Parameters:
- job_id: Job ID to open
- ignore_downtime: Whether to ignore downtime when opening
- timeout: Timeout for opening job
Returns:
ObjectApiResponse with job opening result
"""
def close_job(
self,
job_id: str,
allow_no_match: Optional[bool] = None,
force: Optional[bool] = None,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Close a machine learning job.
Parameters:
- job_id: Job ID to close
- allow_no_match: Whether to ignore if job doesn't exist
- force: Whether to force close running job
- timeout: Timeout for closing job
Returns:
ObjectApiResponse with job closing result
"""
def update_job(
self,
job_id: str,
description: Optional[str] = None,
analysis_limits: Optional[Dict[str, Any]] = None,
background_persist_interval: Optional[str] = None,
custom_settings: Optional[Dict[str, Any]] = None,
model_plot_config: Optional[Dict[str, Any]] = None,
model_snapshot_retention_days: Optional[int] = None,
daily_model_snapshot_retention_after_days: Optional[int] = None,
groups: Optional[List[str]] = None,
detectors: Optional[List[Dict[str, Any]]] = None,
**kwargs
) -> ObjectApiResponse:
"""
Update a machine learning job.
Parameters:
- job_id: Job ID to update
- description: Updated job description
- analysis_limits: Updated analysis limits
- background_persist_interval: Updated persist interval
- custom_settings: Updated custom settings
- model_plot_config: Updated model plot configuration
- model_snapshot_retention_days: Updated snapshot retention
- daily_model_snapshot_retention_after_days: Updated daily retention
- groups: Updated job groups
- detectors: Updated detector configurations
Returns:
ObjectApiResponse with job update result
"""Manage datafeeds that supply data to machine learning jobs.
def put_datafeed(
self,
datafeed_id: str,
job_id: str,
indices: List[str],
aggregations: Optional[Dict[str, Any]] = None,
chunking_config: Optional[Dict[str, Any]] = None,
frequency: Optional[str] = None,
query: Optional[Dict[str, Any]] = None,
query_delay: Optional[str] = None,
runtime_mappings: Optional[Dict[str, Any]] = None,
script_fields: Optional[Dict[str, Any]] = None,
scroll_size: Optional[int] = None,
delayed_data_check_config: Optional[Dict[str, Any]] = None,
max_empty_searches: Optional[int] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create a datafeed for a machine learning job.
Parameters:
- datafeed_id: Unique identifier for the datafeed
- job_id: Associated job ID
- indices: List of indices to read from
- aggregations: Aggregations to apply to data
- chunking_config: Data chunking configuration
- frequency: Frequency of data checks
- query: Query to filter data
- query_delay: Delay between data time and analysis
- runtime_mappings: Runtime field mappings
- script_fields: Script-based field definitions
- scroll_size: Scroll size for data retrieval
- delayed_data_check_config: Configuration for delayed data checks
- max_empty_searches: Maximum consecutive empty searches
Returns:
ObjectApiResponse with datafeed creation result
"""
def get_datafeeds(
self,
datafeed_id: Optional[str] = None,
allow_no_match: Optional[bool] = None,
exclude_generated: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get datafeed information.
Parameters:
- datafeed_id: Datafeed ID or pattern to retrieve
- allow_no_match: Whether to ignore if no datafeeds match
- exclude_generated: Whether to exclude generated configurations
Returns:
ObjectApiResponse with datafeed information
"""
def start_datafeed(
self,
datafeed_id: str,
start: Optional[str] = None,
end: Optional[str] = None,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Start a datafeed.
Parameters:
- datafeed_id: Datafeed ID to start
- start: Start time for data processing
- end: End time for data processing
- timeout: Timeout for starting datafeed
Returns:
ObjectApiResponse with datafeed start result
"""
def stop_datafeed(
self,
datafeed_id: str,
allow_no_match: Optional[bool] = None,
force: Optional[bool] = None,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Stop a datafeed.
Parameters:
- datafeed_id: Datafeed ID to stop
- allow_no_match: Whether to ignore if datafeed doesn't exist
- force: Whether to force stop
- timeout: Timeout for stopping datafeed
Returns:
ObjectApiResponse with datafeed stop result
"""
def delete_datafeed(
self,
datafeed_id: str,
force: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Delete a datafeed.
Parameters:
- datafeed_id: Datafeed ID to delete
- force: Whether to force deletion
Returns:
ObjectApiResponse with deletion result
"""Manage machine learning models and snapshots.
def get_model_snapshots(
self,
job_id: str,
snapshot_id: Optional[str] = None,
from_: Optional[int] = None,
size: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
sort: Optional[str] = None,
desc: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get model snapshots for a job.
Parameters:
- job_id: Job ID to get snapshots for
- snapshot_id: Specific snapshot ID
- from_: Starting offset for results
- size: Number of results to return
- start: Start time for snapshot range
- end: End time for snapshot range
- sort: Sort field for results
- desc: Whether to sort in descending order
Returns:
ObjectApiResponse with model snapshots
"""
def update_model_snapshot(
self,
job_id: str,
snapshot_id: str,
description: Optional[str] = None,
retain: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Update a model snapshot.
Parameters:
- job_id: Job ID containing the snapshot
- snapshot_id: Snapshot ID to update
- description: Updated description
- retain: Whether to retain the snapshot
Returns:
ObjectApiResponse with update result
"""
def delete_model_snapshot(
self,
job_id: str,
snapshot_id: str,
**kwargs
) -> ObjectApiResponse:
"""
Delete a model snapshot.
Parameters:
- job_id: Job ID containing the snapshot
- snapshot_id: Snapshot ID to delete
Returns:
ObjectApiResponse with deletion result
"""
def revert_model_snapshot(
self,
job_id: str,
snapshot_id: str,
delete_intervening_results: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Revert to a previous model snapshot.
Parameters:
- job_id: Job ID to revert
- snapshot_id: Snapshot ID to revert to
- delete_intervening_results: Whether to delete results after snapshot
Returns:
ObjectApiResponse with revert result
"""Retrieve and analyze anomaly detection results.
def get_buckets(
self,
job_id: str,
timestamp: Optional[str] = None,
anomaly_score: Optional[float] = None,
from_: Optional[int] = None,
size: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
exclude_interim: Optional[bool] = None,
expand: Optional[bool] = None,
sort: Optional[str] = None,
desc: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get anomaly detection buckets for a job.
Parameters:
- job_id: Job ID to get buckets for
- timestamp: Specific timestamp to retrieve
- anomaly_score: Minimum anomaly score threshold
- from_: Starting offset for results
- size: Number of results to return
- start: Start time for bucket range
- end: End time for bucket range
- exclude_interim: Whether to exclude interim results
- expand: Whether to expand bucket details
- sort: Sort field for results
- desc: Whether to sort in descending order
Returns:
ObjectApiResponse with anomaly buckets
"""
def get_records(
self,
job_id: str,
from_: Optional[int] = None,
size: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
record_score: Optional[float] = None,
sort: Optional[str] = None,
desc: Optional[bool] = None,
exclude_interim: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get anomaly records for a job.
Parameters:
- job_id: Job ID to get records for
- from_: Starting offset for results
- size: Number of results to return
- start: Start time for record range
- end: End time for record range
- record_score: Minimum record score threshold
- sort: Sort field for results
- desc: Whether to sort in descending order
- exclude_interim: Whether to exclude interim results
Returns:
ObjectApiResponse with anomaly records
"""
def get_influencers(
self,
job_id: str,
from_: Optional[int] = None,
size: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
influencer_score: Optional[float] = None,
sort: Optional[str] = None,
desc: Optional[bool] = None,
exclude_interim: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get influencers for a job.
Parameters:
- job_id: Job ID to get influencers for
- from_: Starting offset for results
- size: Number of results to return
- start: Start time for influencer range
- end: End time for influencer range
- influencer_score: Minimum influencer score threshold
- sort: Sort field for results
- desc: Whether to sort in descending order
- exclude_interim: Whether to exclude interim results
Returns:
ObjectApiResponse with influencers
"""
def get_categories(
self,
job_id: str,
category_id: Optional[str] = None,
from_: Optional[int] = None,
size: Optional[int] = None,
partition_field_value: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get categories for a job.
Parameters:
- job_id: Job ID to get categories for
- category_id: Specific category ID
- from_: Starting offset for results
- size: Number of results to return
- partition_field_value: Partition field value filter
Returns:
ObjectApiResponse with categories
"""Manage data frame analytics jobs for supervised learning.
def put_data_frame_analytics(
self,
id: str,
source: Dict[str, Any],
dest: Dict[str, Any],
analysis: Dict[str, Any],
description: Optional[str] = None,
model_memory_limit: Optional[str] = None,
max_num_threads: Optional[int] = None,
analyzed_fields: Optional[Dict[str, Any]] = None,
allow_lazy_start: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Create a data frame analytics job.
Parameters:
- id: Unique identifier for the analytics job
- source: Source configuration including index and query
- dest: Destination configuration for results
- analysis: Analysis configuration (classification, regression, outlier_detection)
- description: Human-readable job description
- model_memory_limit: Memory limit for analysis
- max_num_threads: Maximum number of threads
- analyzed_fields: Fields to include/exclude from analysis
- allow_lazy_start: Whether to allow lazy start
Returns:
ObjectApiResponse with job creation result
"""
def get_data_frame_analytics(
self,
id: Optional[str] = None,
allow_no_match: Optional[bool] = None,
from_: Optional[int] = None,
size: Optional[int] = None,
exclude_generated: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get data frame analytics job information.
Parameters:
- id: Job ID or pattern to retrieve
- allow_no_match: Whether to ignore if no jobs match
- from_: Starting offset for results
- size: Number of results to return
- exclude_generated: Whether to exclude generated configurations
Returns:
ObjectApiResponse with analytics job information
"""
def start_data_frame_analytics(
self,
id: str,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Start a data frame analytics job.
Parameters:
- id: Job ID to start
- timeout: Timeout for starting job
Returns:
ObjectApiResponse with start result
"""
def stop_data_frame_analytics(
self,
id: str,
allow_no_match: Optional[bool] = None,
force: Optional[bool] = None,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Stop a data frame analytics job.
Parameters:
- id: Job ID to stop
- allow_no_match: Whether to ignore if job doesn't exist
- force: Whether to force stop
- timeout: Timeout for stopping job
Returns:
ObjectApiResponse with stop result
"""
def delete_data_frame_analytics(
self,
id: str,
force: Optional[bool] = None,
timeout: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Delete a data frame analytics job.
Parameters:
- id: Job ID to delete
- force: Whether to force deletion
- timeout: Timeout for deletion
Returns:
ObjectApiResponse with deletion result
"""from elasticsearch import Elasticsearch
client = Elasticsearch(hosts=['http://localhost:9200'])
# Create an anomaly detection job
client.ml.put_job(
job_id="transaction_anomalies",
description="Detect anomalies in transaction amounts",
analysis_config={
"bucket_span": "15m",
"detectors": [
{
"function": "mean",
"field_name": "amount",
"by_field_name": "user_id"
},
{
"function": "high_count",
"by_field_name": "merchant_category"
}
],
"influencers": ["user_id", "merchant_category"]
},
data_description={
"time_field": "@timestamp",
"time_format": "epoch_ms"
},
analysis_limits={
"model_memory_limit": "100mb"
},
model_plot_config={
"enabled": True,
"terms": "user_id"
}
)
# Create datafeed for the job
client.ml.put_datafeed(
datafeed_id="transaction_anomalies_feed",
job_id="transaction_anomalies",
indices=["transactions-*"],
query={
"bool": {
"must": [
{"range": {"amount": {"gt": 0}}},
{"term": {"status": "completed"}}
]
}
},
frequency="5m",
query_delay="30s",
scroll_size=1000
)
# Open the job and start the datafeed
client.ml.open_job(job_id="transaction_anomalies")
client.ml.start_datafeed(
datafeed_id="transaction_anomalies_feed",
start="2024-01-01T00:00:00Z"
)
# Wait for analysis, then get results
import time
time.sleep(60) # Allow some processing time
# Get anomaly buckets
buckets = client.ml.get_buckets(
job_id="transaction_anomalies",
anomaly_score=75, # Only high-score anomalies
size=10,
sort="anomaly_score",
desc=True
)
for bucket in buckets.body['buckets']:
print(f"Anomaly at {bucket['timestamp']}, score: {bucket['anomaly_score']}")
# Get detailed anomaly records
records = client.ml.get_records(
job_id="transaction_anomalies",
record_score=50,
size=20
)
for record in records.body['records']:
print(f"Record anomaly: {record['function']} on {record['field_name']}")
print(f" Typical: {record.get('typical', 'N/A')}, Actual: {record.get('actual', 'N/A')}")
print(f" User: {record.get('by_field_value', 'N/A')}")# Create a classification job to predict customer churn
client.ml.put_data_frame_analytics(
id="customer_churn_prediction",
description="Predict customer churn based on usage patterns",
source={
"index": ["customer_data"],
"query": {
"bool": {
"must": [
{"range": {"account_age_days": {"gte": 30}}},
{"exists": {"field": "churned"}}
]
}
}
},
dest={
"index": "customer_churn_results",
"results_field": "ml_results"
},
analysis={
"classification": {
"dependent_variable": "churned",
"training_percent": 80,
"num_top_feature_importance_values": 5,
"prediction_field_name": "churn_prediction"
}
},
analyzed_fields={
"includes": [
"monthly_spend", "support_tickets", "login_frequency",
"feature_usage_score", "account_age_days", "churned"
]
},
model_memory_limit="200mb"
)
# Start the analytics job
client.ml.start_data_frame_analytics(id="customer_churn_prediction")
# Monitor job progress
job_stats = client.ml.get_data_frame_analytics_stats(id="customer_churn_prediction")
progress = job_stats.body['data_frame_analytics'][0]['progress']
print(f"Progress: {progress}")
# Once complete, examine results
results = client.search(
index="customer_churn_results",
query={"match_all": {}},
size=10,
sort=[{"ml_results.churn_prediction_probability": "desc"}]
)
for hit in results.body['hits']['hits']:
customer = hit['_source']
ml_results = customer['ml_results']
print(f"Customer ID: {customer.get('customer_id', 'N/A')}")
print(f" Churn prediction: {ml_results['churn_prediction']}")
print(f" Probability: {ml_results['churn_prediction_probability']:.3f}")
print(f" Top features: {ml_results.get('feature_importance', [])[:3]}")# Create outlier detection job for fraud detection
client.ml.put_data_frame_analytics(
id="fraud_outlier_detection",
description="Detect fraudulent transactions using outlier detection",
source={
"index": ["transactions"],
"query": {
"range": {
"@timestamp": {
"gte": "now-7d"
}
}
}
},
dest={
"index": "fraud_outliers",
"results_field": "outlier_score"
},
analysis={
"outlier_detection": {
"n_neighbors": 20,
"method": "lof", # Local Outlier Factor
"feature_influence_threshold": 0.1,
"outlier_fraction": 0.05
}
},
analyzed_fields={
"includes": [
"amount", "transaction_hour", "merchant_category_code",
"days_since_last_transaction", "amount_deviation_from_avg"
]
},
model_memory_limit="150mb"
)
# Start outlier detection
client.ml.start_data_frame_analytics(id="fraud_outlier_detection")
# Query for high outlier scores (potential fraud)
outliers = client.search(
index="fraud_outliers",
query={
"range": {
"outlier_score.outlier_score": {"gte": 0.7}
}
},
sort=[{"outlier_score.outlier_score": "desc"}],
size=50
)
for hit in outliers.body['hits']['hits']:
transaction = hit['_source']
score = transaction['outlier_score']['outlier_score']
print(f"Transaction ID: {transaction.get('transaction_id', 'N/A')}")
print(f" Outlier score: {score:.3f}")
print(f" Amount: ${transaction.get('amount', 'N/A')}")
print(f" Merchant: {transaction.get('merchant_name', 'N/A')}")# Get job statistics and health
job_stats = client.ml.get_job_stats(job_id="transaction_anomalies")
job_info = job_stats.body['jobs'][0]
print(f"Job state: {job_info['state']}")
print(f"Data counts: {job_info['data_counts']}")
print(f"Model size: {job_info['model_size_stats']}")
print(f"Processed records: {job_info['data_counts']['processed_record_count']}")
# Get model snapshots
snapshots = client.ml.get_model_snapshots(
job_id="transaction_anomalies",
size=5,
sort="timestamp",
desc=True
)
latest_snapshot = snapshots.body['model_snapshots'][0]
print(f"Latest snapshot: {latest_snapshot['snapshot_id']}")
print(f"Model size: {latest_snapshot['model_size_stats']['total_by_field_count']}")
# Update job configuration
client.ml.update_job(
job_id="transaction_anomalies",
description="Updated: Detect anomalies in transaction amounts with enhanced settings",
analysis_limits={
"model_memory_limit": "150mb" # Increase memory limit
},
model_plot_config={
"enabled": True,
"terms": "user_id,merchant_category" # Add more terms
}
)
# Close job and datafeed when done
client.ml.stop_datafeed(datafeed_id="transaction_anomalies_feed")
client.ml.close_job(job_id="transaction_anomalies")Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch