CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch

Python client for Elasticsearch with comprehensive API coverage and both sync and async support

Pending
Overview
Eval results
Files

machine-learning.mddocs/

Machine Learning

Machine learning job management, anomaly detection, data analysis, and model operations for Elasticsearch's ML capabilities. These operations provide comprehensive machine learning functionality for detecting anomalies, forecasting, and data analysis.

Capabilities

Job Management

Create and manage machine learning jobs for anomaly detection.

def put_job(
    self,
    job_id: str,
    analysis_config: Dict[str, Any],
    data_description: Dict[str, Any],
    description: Optional[str] = None,
    results_index_name: Optional[str] = None,
    groups: Optional[List[str]] = None,
    model_snapshot_retention_days: Optional[int] = None,
    daily_model_snapshot_retention_after_days: Optional[int] = None,
    analysis_limits: Optional[Dict[str, Any]] = None,
    background_persist_interval: Optional[str] = None,
    custom_settings: Optional[Dict[str, Any]] = None,
    model_plot_config: Optional[Dict[str, Any]] = None,
    renormalization_window_days: Optional[int] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Create a machine learning job.
    
    Parameters:
    - job_id: Unique identifier for the job
    - analysis_config: Analysis configuration including detectors
    - data_description: Description of input data format
    - description: Human-readable job description
    - results_index_name: Index for storing results
    - groups: Job groups for organization
    - model_snapshot_retention_days: Retention period for model snapshots
    - daily_model_snapshot_retention_after_days: Daily snapshot retention threshold
    - analysis_limits: Memory and processing limits
    - background_persist_interval: Interval for persisting model updates
    - custom_settings: Custom job settings
    - model_plot_config: Model plot configuration
    - renormalization_window_days: Window for model renormalization
    
    Returns:
    ObjectApiResponse with job creation result
    """

def get_jobs(
    self,
    job_id: Optional[str] = None,
    allow_no_match: Optional[bool] = None,
    exclude_generated: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get machine learning job information.
    
    Parameters:
    - job_id: Job ID or pattern to retrieve
    - allow_no_match: Whether to ignore if no jobs match
    - exclude_generated: Whether to exclude generated configurations
    
    Returns:
    ObjectApiResponse with job information
    """

def delete_job(
    self,
    job_id: str,
    force: Optional[bool] = None,
    wait_for_completion: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete a machine learning job.
    
    Parameters:
    - job_id: Job ID to delete
    - force: Whether to force deletion of running job
    - wait_for_completion: Whether to wait for deletion to complete
    
    Returns:
    ObjectApiResponse with deletion result
    """

def open_job(
    self,
    job_id: str,
    ignore_downtime: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Open a machine learning job.
    
    Parameters:
    - job_id: Job ID to open
    - ignore_downtime: Whether to ignore downtime when opening
    - timeout: Timeout for opening job
    
    Returns:
    ObjectApiResponse with job opening result
    """

def close_job(
    self,
    job_id: str,
    allow_no_match: Optional[bool] = None,
    force: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Close a machine learning job.
    
    Parameters:
    - job_id: Job ID to close
    - allow_no_match: Whether to ignore if job doesn't exist
    - force: Whether to force close running job
    - timeout: Timeout for closing job
    
    Returns:
    ObjectApiResponse with job closing result
    """

def update_job(
    self,
    job_id: str,
    description: Optional[str] = None,
    analysis_limits: Optional[Dict[str, Any]] = None,
    background_persist_interval: Optional[str] = None,
    custom_settings: Optional[Dict[str, Any]] = None,
    model_plot_config: Optional[Dict[str, Any]] = None,
    model_snapshot_retention_days: Optional[int] = None,
    daily_model_snapshot_retention_after_days: Optional[int] = None,
    groups: Optional[List[str]] = None,
    detectors: Optional[List[Dict[str, Any]]] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Update a machine learning job.
    
    Parameters:
    - job_id: Job ID to update
    - description: Updated job description
    - analysis_limits: Updated analysis limits
    - background_persist_interval: Updated persist interval
    - custom_settings: Updated custom settings
    - model_plot_config: Updated model plot configuration
    - model_snapshot_retention_days: Updated snapshot retention
    - daily_model_snapshot_retention_after_days: Updated daily retention
    - groups: Updated job groups
    - detectors: Updated detector configurations
    
    Returns:
    ObjectApiResponse with job update result
    """

Datafeed Management

Manage datafeeds that supply data to machine learning jobs.

def put_datafeed(
    self,
    datafeed_id: str,
    job_id: str,
    indices: List[str],
    aggregations: Optional[Dict[str, Any]] = None,
    chunking_config: Optional[Dict[str, Any]] = None,
    frequency: Optional[str] = None,
    query: Optional[Dict[str, Any]] = None,
    query_delay: Optional[str] = None,
    runtime_mappings: Optional[Dict[str, Any]] = None,
    script_fields: Optional[Dict[str, Any]] = None,
    scroll_size: Optional[int] = None,
    delayed_data_check_config: Optional[Dict[str, Any]] = None,
    max_empty_searches: Optional[int] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Create a datafeed for a machine learning job.
    
    Parameters:
    - datafeed_id: Unique identifier for the datafeed
    - job_id: Associated job ID
    - indices: List of indices to read from
    - aggregations: Aggregations to apply to data
    - chunking_config: Data chunking configuration
    - frequency: Frequency of data checks
    - query: Query to filter data
    - query_delay: Delay between data time and analysis
    - runtime_mappings: Runtime field mappings
    - script_fields: Script-based field definitions
    - scroll_size: Scroll size for data retrieval
    - delayed_data_check_config: Configuration for delayed data checks
    - max_empty_searches: Maximum consecutive empty searches
    
    Returns:
    ObjectApiResponse with datafeed creation result
    """

def get_datafeeds(
    self,
    datafeed_id: Optional[str] = None,
    allow_no_match: Optional[bool] = None,
    exclude_generated: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get datafeed information.
    
    Parameters:
    - datafeed_id: Datafeed ID or pattern to retrieve
    - allow_no_match: Whether to ignore if no datafeeds match
    - exclude_generated: Whether to exclude generated configurations
    
    Returns:
    ObjectApiResponse with datafeed information
    """

def start_datafeed(
    self,
    datafeed_id: str,
    start: Optional[str] = None,
    end: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Start a datafeed.
    
    Parameters:
    - datafeed_id: Datafeed ID to start
    - start: Start time for data processing
    - end: End time for data processing
    - timeout: Timeout for starting datafeed
    
    Returns:
    ObjectApiResponse with datafeed start result
    """

def stop_datafeed(
    self,
    datafeed_id: str,
    allow_no_match: Optional[bool] = None,
    force: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Stop a datafeed.
    
    Parameters:
    - datafeed_id: Datafeed ID to stop
    - allow_no_match: Whether to ignore if datafeed doesn't exist
    - force: Whether to force stop
    - timeout: Timeout for stopping datafeed
    
    Returns:
    ObjectApiResponse with datafeed stop result
    """

def delete_datafeed(
    self,
    datafeed_id: str,
    force: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete a datafeed.
    
    Parameters:
    - datafeed_id: Datafeed ID to delete
    - force: Whether to force deletion
    
    Returns:
    ObjectApiResponse with deletion result
    """

Model Management

Manage machine learning models and snapshots.

def get_model_snapshots(
    self,
    job_id: str,
    snapshot_id: Optional[str] = None,
    from_: Optional[int] = None,
    size: Optional[int] = None,
    start: Optional[str] = None,
    end: Optional[str] = None,
    sort: Optional[str] = None,
    desc: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get model snapshots for a job.
    
    Parameters:
    - job_id: Job ID to get snapshots for
    - snapshot_id: Specific snapshot ID
    - from_: Starting offset for results
    - size: Number of results to return
    - start: Start time for snapshot range
    - end: End time for snapshot range
    - sort: Sort field for results
    - desc: Whether to sort in descending order
    
    Returns:
    ObjectApiResponse with model snapshots
    """

def update_model_snapshot(
    self,
    job_id: str,
    snapshot_id: str,
    description: Optional[str] = None,
    retain: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Update a model snapshot.
    
    Parameters:
    - job_id: Job ID containing the snapshot
    - snapshot_id: Snapshot ID to update
    - description: Updated description
    - retain: Whether to retain the snapshot
    
    Returns:
    ObjectApiResponse with update result
    """

def delete_model_snapshot(
    self,
    job_id: str,
    snapshot_id: str,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete a model snapshot.
    
    Parameters:
    - job_id: Job ID containing the snapshot
    - snapshot_id: Snapshot ID to delete
    
    Returns:
    ObjectApiResponse with deletion result
    """

def revert_model_snapshot(
    self,
    job_id: str,
    snapshot_id: str,
    delete_intervening_results: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Revert to a previous model snapshot.
    
    Parameters:
    - job_id: Job ID to revert
    - snapshot_id: Snapshot ID to revert to
    - delete_intervening_results: Whether to delete results after snapshot
    
    Returns:
    ObjectApiResponse with revert result
    """

Anomaly Detection Results

Retrieve and analyze anomaly detection results.

def get_buckets(
    self,
    job_id: str,
    timestamp: Optional[str] = None,
    anomaly_score: Optional[float] = None,
    from_: Optional[int] = None,
    size: Optional[int] = None,
    start: Optional[str] = None,
    end: Optional[str] = None,
    exclude_interim: Optional[bool] = None,
    expand: Optional[bool] = None,
    sort: Optional[str] = None,
    desc: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get anomaly detection buckets for a job.
    
    Parameters:
    - job_id: Job ID to get buckets for
    - timestamp: Specific timestamp to retrieve
    - anomaly_score: Minimum anomaly score threshold
    - from_: Starting offset for results
    - size: Number of results to return
    - start: Start time for bucket range
    - end: End time for bucket range
    - exclude_interim: Whether to exclude interim results
    - expand: Whether to expand bucket details
    - sort: Sort field for results
    - desc: Whether to sort in descending order
    
    Returns:
    ObjectApiResponse with anomaly buckets
    """

def get_records(
    self,
    job_id: str,
    from_: Optional[int] = None,
    size: Optional[int] = None,
    start: Optional[str] = None,
    end: Optional[str] = None,
    record_score: Optional[float] = None,
    sort: Optional[str] = None,
    desc: Optional[bool] = None,
    exclude_interim: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get anomaly records for a job.
    
    Parameters:
    - job_id: Job ID to get records for
    - from_: Starting offset for results
    - size: Number of results to return
    - start: Start time for record range
    - end: End time for record range
    - record_score: Minimum record score threshold
    - sort: Sort field for results
    - desc: Whether to sort in descending order
    - exclude_interim: Whether to exclude interim results
    
    Returns:
    ObjectApiResponse with anomaly records
    """

def get_influencers(
    self,
    job_id: str,
    from_: Optional[int] = None,
    size: Optional[int] = None,
    start: Optional[str] = None,
    end: Optional[str] = None,
    influencer_score: Optional[float] = None,
    sort: Optional[str] = None,
    desc: Optional[bool] = None,
    exclude_interim: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get influencers for a job.
    
    Parameters:
    - job_id: Job ID to get influencers for
    - from_: Starting offset for results
    - size: Number of results to return
    - start: Start time for influencer range
    - end: End time for influencer range
    - influencer_score: Minimum influencer score threshold
    - sort: Sort field for results
    - desc: Whether to sort in descending order
    - exclude_interim: Whether to exclude interim results
    
    Returns:
    ObjectApiResponse with influencers
    """

def get_categories(
    self,
    job_id: str,
    category_id: Optional[str] = None,
    from_: Optional[int] = None,
    size: Optional[int] = None,
    partition_field_value: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get categories for a job.
    
    Parameters:
    - job_id: Job ID to get categories for
    - category_id: Specific category ID
    - from_: Starting offset for results
    - size: Number of results to return
    - partition_field_value: Partition field value filter
    
    Returns:
    ObjectApiResponse with categories
    """

Data Frame Analytics

Manage data frame analytics jobs for supervised learning.

def put_data_frame_analytics(
    self,
    id: str,
    source: Dict[str, Any],
    dest: Dict[str, Any],
    analysis: Dict[str, Any],
    description: Optional[str] = None,
    model_memory_limit: Optional[str] = None,
    max_num_threads: Optional[int] = None,
    analyzed_fields: Optional[Dict[str, Any]] = None,
    allow_lazy_start: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Create a data frame analytics job.
    
    Parameters:
    - id: Unique identifier for the analytics job
    - source: Source configuration including index and query
    - dest: Destination configuration for results
    - analysis: Analysis configuration (classification, regression, outlier_detection)
    - description: Human-readable job description
    - model_memory_limit: Memory limit for analysis
    - max_num_threads: Maximum number of threads
    - analyzed_fields: Fields to include/exclude from analysis
    - allow_lazy_start: Whether to allow lazy start
    
    Returns:
    ObjectApiResponse with job creation result
    """

def get_data_frame_analytics(
    self,
    id: Optional[str] = None,
    allow_no_match: Optional[bool] = None,
    from_: Optional[int] = None,
    size: Optional[int] = None,
    exclude_generated: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get data frame analytics job information.
    
    Parameters:
    - id: Job ID or pattern to retrieve
    - allow_no_match: Whether to ignore if no jobs match
    - from_: Starting offset for results
    - size: Number of results to return
    - exclude_generated: Whether to exclude generated configurations
    
    Returns:
    ObjectApiResponse with analytics job information
    """

def start_data_frame_analytics(
    self,
    id: str,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Start a data frame analytics job.
    
    Parameters:
    - id: Job ID to start
    - timeout: Timeout for starting job
    
    Returns:
    ObjectApiResponse with start result
    """

def stop_data_frame_analytics(
    self,
    id: str,
    allow_no_match: Optional[bool] = None,
    force: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Stop a data frame analytics job.
    
    Parameters:
    - id: Job ID to stop
    - allow_no_match: Whether to ignore if job doesn't exist
    - force: Whether to force stop
    - timeout: Timeout for stopping job
    
    Returns:
    ObjectApiResponse with stop result
    """

def delete_data_frame_analytics(
    self,
    id: str,
    force: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete a data frame analytics job.
    
    Parameters:
    - id: Job ID to delete
    - force: Whether to force deletion
    - timeout: Timeout for deletion
    
    Returns:
    ObjectApiResponse with deletion result
    """

Usage Examples

Anomaly Detection Job

from elasticsearch import Elasticsearch

client = Elasticsearch(hosts=['http://localhost:9200'])

# Create an anomaly detection job
client.ml.put_job(
    job_id="transaction_anomalies",
    description="Detect anomalies in transaction amounts",
    analysis_config={
        "bucket_span": "15m",
        "detectors": [
            {
                "function": "mean",
                "field_name": "amount",
                "by_field_name": "user_id"
            },
            {
                "function": "high_count",
                "by_field_name": "merchant_category"
            }
        ],
        "influencers": ["user_id", "merchant_category"]
    },
    data_description={
        "time_field": "@timestamp",
        "time_format": "epoch_ms"
    },
    analysis_limits={
        "model_memory_limit": "100mb"
    },
    model_plot_config={
        "enabled": True,
        "terms": "user_id"
    }
)

# Create datafeed for the job
client.ml.put_datafeed(
    datafeed_id="transaction_anomalies_feed",
    job_id="transaction_anomalies",
    indices=["transactions-*"],
    query={
        "bool": {
            "must": [
                {"range": {"amount": {"gt": 0}}},
                {"term": {"status": "completed"}}
            ]
        }
    },
    frequency="5m",
    query_delay="30s",
    scroll_size=1000
)

# Open the job and start the datafeed
client.ml.open_job(job_id="transaction_anomalies")
client.ml.start_datafeed(
    datafeed_id="transaction_anomalies_feed",
    start="2024-01-01T00:00:00Z"
)

# Wait for analysis, then get results
import time
time.sleep(60)  # Allow some processing time

# Get anomaly buckets
buckets = client.ml.get_buckets(
    job_id="transaction_anomalies",
    anomaly_score=75,  # Only high-score anomalies
    size=10,
    sort="anomaly_score",
    desc=True
)

for bucket in buckets.body['buckets']:
    print(f"Anomaly at {bucket['timestamp']}, score: {bucket['anomaly_score']}")

# Get detailed anomaly records
records = client.ml.get_records(
    job_id="transaction_anomalies",
    record_score=50,
    size=20
)

for record in records.body['records']:
    print(f"Record anomaly: {record['function']} on {record['field_name']}")
    print(f"  Typical: {record.get('typical', 'N/A')}, Actual: {record.get('actual', 'N/A')}")
    print(f"  User: {record.get('by_field_value', 'N/A')}")

Data Frame Analytics for Classification

# Create a classification job to predict customer churn
client.ml.put_data_frame_analytics(
    id="customer_churn_prediction",
    description="Predict customer churn based on usage patterns",
    source={
        "index": ["customer_data"],
        "query": {
            "bool": {
                "must": [
                    {"range": {"account_age_days": {"gte": 30}}},
                    {"exists": {"field": "churned"}}
                ]
            }
        }
    },
    dest={
        "index": "customer_churn_results",
        "results_field": "ml_results"
    },
    analysis={
        "classification": {
            "dependent_variable": "churned",
            "training_percent": 80,
            "num_top_feature_importance_values": 5,
            "prediction_field_name": "churn_prediction"
        }
    },
    analyzed_fields={
        "includes": [
            "monthly_spend", "support_tickets", "login_frequency",
            "feature_usage_score", "account_age_days", "churned"
        ]
    },
    model_memory_limit="200mb"
)

# Start the analytics job
client.ml.start_data_frame_analytics(id="customer_churn_prediction")

# Monitor job progress
job_stats = client.ml.get_data_frame_analytics_stats(id="customer_churn_prediction")
progress = job_stats.body['data_frame_analytics'][0]['progress']
print(f"Progress: {progress}")

# Once complete, examine results
results = client.search(
    index="customer_churn_results",
    query={"match_all": {}},
    size=10,
    sort=[{"ml_results.churn_prediction_probability": "desc"}]
)

for hit in results.body['hits']['hits']:
    customer = hit['_source']
    ml_results = customer['ml_results']
    print(f"Customer ID: {customer.get('customer_id', 'N/A')}")
    print(f"  Churn prediction: {ml_results['churn_prediction']}")
    print(f"  Probability: {ml_results['churn_prediction_probability']:.3f}")
    print(f"  Top features: {ml_results.get('feature_importance', [])[:3]}")

Outlier Detection

# Create outlier detection job for fraud detection
client.ml.put_data_frame_analytics(
    id="fraud_outlier_detection",
    description="Detect fraudulent transactions using outlier detection",
    source={
        "index": ["transactions"],
        "query": {
            "range": {
                "@timestamp": {
                    "gte": "now-7d"
                }
            }
        }
    },
    dest={
        "index": "fraud_outliers",
        "results_field": "outlier_score"
    },
    analysis={
        "outlier_detection": {
            "n_neighbors": 20,
            "method": "lof",  # Local Outlier Factor
            "feature_influence_threshold": 0.1,
            "outlier_fraction": 0.05
        }
    },
    analyzed_fields={
        "includes": [
            "amount", "transaction_hour", "merchant_category_code",
            "days_since_last_transaction", "amount_deviation_from_avg"
        ]
    },
    model_memory_limit="150mb"
)

# Start outlier detection
client.ml.start_data_frame_analytics(id="fraud_outlier_detection")

# Query for high outlier scores (potential fraud)
outliers = client.search(
    index="fraud_outliers",
    query={
        "range": {
            "outlier_score.outlier_score": {"gte": 0.7}
        }
    },
    sort=[{"outlier_score.outlier_score": "desc"}],
    size=50
)

for hit in outliers.body['hits']['hits']:
    transaction = hit['_source']
    score = transaction['outlier_score']['outlier_score']
    print(f"Transaction ID: {transaction.get('transaction_id', 'N/A')}")
    print(f"  Outlier score: {score:.3f}")
    print(f"  Amount: ${transaction.get('amount', 'N/A')}")
    print(f"  Merchant: {transaction.get('merchant_name', 'N/A')}")

Model Management and Monitoring

# Get job statistics and health
job_stats = client.ml.get_job_stats(job_id="transaction_anomalies")
job_info = job_stats.body['jobs'][0]

print(f"Job state: {job_info['state']}")
print(f"Data counts: {job_info['data_counts']}")
print(f"Model size: {job_info['model_size_stats']}")
print(f"Processed records: {job_info['data_counts']['processed_record_count']}")

# Get model snapshots
snapshots = client.ml.get_model_snapshots(
    job_id="transaction_anomalies",
    size=5,
    sort="timestamp",
    desc=True
)

latest_snapshot = snapshots.body['model_snapshots'][0]
print(f"Latest snapshot: {latest_snapshot['snapshot_id']}")
print(f"Model size: {latest_snapshot['model_size_stats']['total_by_field_count']}")

# Update job configuration
client.ml.update_job(
    job_id="transaction_anomalies",
    description="Updated: Detect anomalies in transaction amounts with enhanced settings",
    analysis_limits={
        "model_memory_limit": "150mb"  # Increase memory limit
    },
    model_plot_config={
        "enabled": True,
        "terms": "user_id,merchant_category"  # Add more terms
    }
)

# Close job and datafeed when done
client.ml.stop_datafeed(datafeed_id="transaction_anomalies_feed")
client.ml.close_job(job_id="transaction_anomalies")

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch

docs

client-operations.md

cluster-management.md

esql-operations.md

exception-handling.md

helper-functions.md

index-management.md

index.md

inference-api.md

lifecycle-management.md

machine-learning.md

query-dsl.md

search-operations.md

security-operations.md

vectorstore-helpers.md

tile.json