CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch

Python client for Elasticsearch with comprehensive API coverage and both sync and async support

Pending
Overview
Eval results
Files

lifecycle-management.mddocs/

Lifecycle Management

Index lifecycle management (ILM), snapshot lifecycle management (SLM), and data stream operations for automated data management. These operations provide comprehensive automation for data retention, archival, and backup strategies.

Index Lifecycle Management (ILM)

Policy Management

Create and manage index lifecycle policies for automated data management.

def put_policy(
    self,
    name: str,
    policy: Dict[str, Any],
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Create or update an index lifecycle policy.
    
    Parameters:
    - name: Policy name
    - policy: Policy definition with phases and actions
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with policy creation result
    """

def get_policy(
    self,
    name: Optional[str] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get index lifecycle policies.
    
    Parameters:
    - name: Policy name to retrieve, or None for all policies
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with policy information
    """

def delete_policy(
    self,
    name: str,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete an index lifecycle policy.
    
    Parameters:
    - name: Policy name to delete
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with deletion result
    """

Policy Execution and Monitoring

Monitor and control index lifecycle policy execution.

def explain_lifecycle(
    self,
    index: Union[str, List[str]],
    only_errors: Optional[bool] = None,
    only_managed: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Explain the lifecycle status of indices.
    
    Parameters:
    - index: Index name(s) to explain
    - only_errors: Whether to only show indices with errors
    - only_managed: Whether to only show managed indices
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with lifecycle explanation for each index
    """

def move_to_step(
    self,
    index: str,
    current_step: Dict[str, Any],
    next_step: Dict[str, Any],
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Move an index to a specific lifecycle step.
    
    Parameters:
    - index: Index name to move
    - current_step: Current step information
    - next_step: Target step information
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with move result
    """

def retry(
    self,
    index: Union[str, List[str]],
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Retry failed lifecycle actions.
    
    Parameters:
    - index: Index name(s) to retry
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with retry result
    """

def remove_policy(
    self,
    index: Union[str, List[str]],
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Remove lifecycle policy from indices.
    
    Parameters:
    - index: Index name(s) to remove policy from
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with removal result
    """

def start(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Start the index lifecycle management service.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with start result
    """

def stop(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Stop the index lifecycle management service.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with stop result
    """

def get_status(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get the status of index lifecycle management.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with ILM status
    """

Snapshot Lifecycle Management (SLM)

SLM Policy Management

Create and manage snapshot lifecycle policies for automated backups.

def put_policy(
    self,
    name: str,
    schedule: str,
    name_pattern: str,
    repository: str,
    config: Optional[Dict[str, Any]] = None,
    retention: Optional[Dict[str, Any]] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Create or update a snapshot lifecycle policy.
    
    Parameters:
    - name: Policy name
    - schedule: Cron expression for snapshot schedule
    - name_pattern: Pattern for snapshot names
    - repository: Repository name for snapshots
    - config: Snapshot configuration options
    - retention: Retention policy for snapshots
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with policy creation result
    """

def get_policy(
    self,
    name: Optional[Union[str, List[str]]] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get snapshot lifecycle policies.
    
    Parameters:
    - name: Policy name(s) to retrieve, or None for all policies
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with policy information
    """

def delete_policy(
    self,
    name: str,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete a snapshot lifecycle policy.
    
    Parameters:
    - name: Policy name to delete
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with deletion result
    """

SLM Execution and Monitoring

Monitor and control snapshot lifecycle policy execution.

def execute_policy(
    self,
    name: str,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Execute a snapshot lifecycle policy immediately.
    
    Parameters:
    - name: Policy name to execute
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with execution result including snapshot name
    """

def get_stats(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get snapshot lifecycle management statistics.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with SLM statistics and policy execution history
    """

def get_status(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get the status of snapshot lifecycle management.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with SLM status
    """

def start(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Start the snapshot lifecycle management service.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with start result
    """

def stop(
    self,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Stop the snapshot lifecycle management service.
    
    Parameters:
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with stop result
    """

Usage Examples

Index Lifecycle Management

from elasticsearch import Elasticsearch

client = Elasticsearch(hosts=['http://localhost:9200'])

# Create a comprehensive ILM policy for log data
client.ilm.put_policy(
    name="logs_policy",
    policy={
        "phases": {
            "hot": {
                "min_age": "0ms",
                "actions": {
                    "rollover": {
                        "max_size": "5GB",
                        "max_age": "1d",
                        "max_docs": 10000000
                    },
                    "set_priority": {
                        "priority": 100
                    }
                }
            },
            "warm": {
                "min_age": "7d",
                "actions": {
                    "set_priority": {
                        "priority": 50
                    },
                    "allocate": {
                        "number_of_replicas": 0,
                        "require": {
                            "data_tier": "warm"
                        }
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    }
                }
            },
            "cold": {
                "min_age": "30d",
                "actions": {
                    "set_priority": {
                        "priority": 0
                    },
                    "allocate": {
                        "number_of_replicas": 0,
                        "require": {
                            "data_tier": "cold"
                        }
                    }
                }
            },
            "frozen": {
                "min_age": "90d",
                "actions": {
                    "searchable_snapshot": {
                        "snapshot_repository": "cold_repository"
                    }
                }
            },
            "delete": {
                "min_age": "365d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
)

# Create index template with ILM policy
client.indices.put_index_template(
    name="logs_template",
    index_patterns=["logs-*"],
    template={
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1,
            "index.lifecycle.name": "logs_policy",
            "index.lifecycle.rollover_alias": "logs"
        },
        "mappings": {
            "properties": {
                "@timestamp": {"type": "date"},
                "level": {"type": "keyword"},
                "message": {"type": "text"},
                "service": {"type": "keyword"}
            }
        }
    }
)

# Create the initial index and alias
client.indices.create(
    index="logs-000001",
    settings={
        "index.lifecycle.name": "logs_policy",
        "index.lifecycle.rollover_alias": "logs"
    },
    aliases={"logs": {"is_write_index": True}}
)

# Monitor ILM execution
explain_result = client.ilm.explain_lifecycle(index="logs-*")
for index_name, index_info in explain_result.body['indices'].items():
    print(f"Index: {index_name}")
    print(f"  Policy: {index_info.get('policy', 'None')}")
    print(f"  Phase: {index_info.get('phase', 'N/A')}")
    print(f"  Action: {index_info.get('action', 'N/A')}")
    
    if 'phase_execution' in index_info:
        execution = index_info['phase_execution']
        print(f"  Phase time: {execution.get('phase_time_millis', 'N/A')}ms")

Snapshot Lifecycle Management

# First, create a snapshot repository
client.snapshot.create_repository(
    repository="backup_repository",
    settings={
        "type": "fs",
        "settings": {
            "location": "/mount/backups/elasticsearch"
        }
    }
)

# Create an SLM policy for automated backups
client.slm.put_policy(
    name="daily_backup_policy",
    schedule="0 2 * * *",  # Daily at 2 AM
    name_pattern="backup-{now/d}",
    repository="backup_repository",
    config={
        "indices": ["important-*", "logs-*"],
        "ignore_unavailable": True,
        "include_global_state": False,
        "metadata": {
            "taken_by": "automated_slm",
            "purpose": "daily_backup"
        }
    },
    retention={
        "expire_after": "30d",
        "min_count": 5,
        "max_count": 50
    }
)

# Create a weekly backup policy with longer retention
client.slm.put_policy(
    name="weekly_backup_policy",
    schedule="0 3 * * 0",  # Weekly on Sunday at 3 AM
    name_pattern="weekly-backup-{now/w}",
    repository="backup_repository",
    config={
        "indices": ["*"],
        "ignore_unavailable": True,
        "include_global_state": True,
        "metadata": {
            "taken_by": "automated_slm",
            "purpose": "weekly_full_backup"
        }
    },
    retention={
        "expire_after": "180d",
        "min_count": 10,
        "max_count": 100
    }
)

# Execute a policy immediately for testing
execute_result = client.slm.execute_policy(name="daily_backup_policy")
snapshot_name = execute_result.body['snapshot_name']
print(f"Snapshot created: {snapshot_name}")

# Monitor SLM statistics
slm_stats = client.slm.get_stats()
print("SLM Statistics:")
print(f"  Retention runs: {slm_stats.body['retention_runs']}")
print(f"  Retention failed: {slm_stats.body['retention_failed']}")
print(f"  Retention timed out: {slm_stats.body['retention_timed_out']}")
print(f"  Total snapshots taken: {slm_stats.body['total_snapshots_taken']}")
print(f"  Total snapshots failed: {slm_stats.body['total_snapshots_failed']}")

# Check individual policy statistics
for policy_name, policy_stats in slm_stats.body['policy_stats'].items():
    print(f"Policy: {policy_name}")
    print(f"  Snapshots taken: {policy_stats['snapshots_taken']}")
    print(f"  Snapshots failed: {policy_stats['snapshots_failed']}")
    if 'last_success' in policy_stats:
        print(f"  Last success: {policy_stats['last_success']['snapshot_name']}")
    if 'last_failure' in policy_stats:
        print(f"  Last failure: {policy_stats['last_failure']['snapshot_name']}")

Advanced Lifecycle Management

# Create a complex ILM policy with multiple conditions
client.ilm.put_policy(
    name="complex_logs_policy",
    policy={
        "phases": {
            "hot": {
                "min_age": "0ms",
                "actions": {
                    "rollover": {
                        "max_size": "2GB",
                        "max_age": "6h",
                        "max_docs": 5000000
                    },
                    "set_priority": {
                        "priority": 100
                    }
                }
            },
            "warm": {
                "min_age": "12h",
                "actions": {
                    "readonly": {},
                    "set_priority": {
                        "priority": 50
                    },
                    "allocate": {
                        "number_of_replicas": 0,
                        "require": {
                            "data_tier": "warm"
                        }
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    },
                    "shrink": {
                        "number_of_shards": 1,
                        "allow_write_after_shrink": False
                    }
                }
            },
            "cold": {
                "min_age": "3d",
                "actions": {
                    "set_priority": {
                        "priority": 0
                    },
                    "allocate": {
                        "number_of_replicas": 0,
                        "require": {
                            "data_tier": "cold"
                        }
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "wait_for_snapshot": {
                        "policy": "daily_backup_policy"
                    },
                    "delete": {}
                }
            }
        }
    }
)

# Move an index to a specific phase manually
client.ilm.move_to_step(
    index="logs-000005",
    current_step={
        "phase": "hot",
        "action": "rollover",
        "name": "check-rollover-ready"
    },
    next_step={
        "phase": "warm",
        "action": "allocate",
        "name": "allocate"
    }
)

# Handle failed lifecycle actions
explain_result = client.ilm.explain_lifecycle(index="logs-*", only_errors=True)
failed_indices = []

for index_name, index_info in explain_result.body['indices'].items():
    if index_info.get('step_info', {}).get('type') == 'error':
        failed_indices.append(index_name)
        print(f"Failed index: {index_name}")
        print(f"  Error: {index_info['step_info']['reason']}")

# Retry failed actions
if failed_indices:
    retry_result = client.ilm.retry(index=failed_indices)
    print(f"Retried {len(failed_indices)} indices")

# Temporarily stop ILM for maintenance
client.ilm.stop()
print("ILM stopped")

# Perform maintenance operations...

# Resume ILM
client.ilm.start()
print("ILM resumed")

# Get ILM status
status = client.ilm.get_status()
print(f"ILM operation mode: {status.body['operation_mode']}")

Monitoring and Troubleshooting

# Comprehensive lifecycle monitoring function
def monitor_lifecycle_health():
    # Check ILM status
    ilm_status = client.ilm.get_status()
    print(f"ILM Status: {ilm_status.body['operation_mode']}")
    
    # Check SLM status
    slm_status = client.slm.get_status()
    print(f"SLM Status: {slm_status.body['operation_mode']}")
    
    # Get all ILM policies
    policies = client.ilm.get_policy()
    print(f"Active ILM policies: {len(policies.body)}")
    
    # Check for indices with lifecycle errors
    error_explain = client.ilm.explain_lifecycle(
        index="*",
        only_errors=True,
        only_managed=True
    )
    
    error_count = len(error_explain.body['indices'])
    if error_count > 0:
        print(f"WARNING: {error_count} indices have lifecycle errors")
        for index_name, index_info in error_explain.body['indices'].items():
            error_info = index_info.get('step_info', {})
            print(f"  {index_name}: {error_info.get('reason', 'Unknown error')}")
    else:
        print("No lifecycle errors detected")
    
    # Check SLM policy execution
    slm_stats = client.slm.get_stats()
    total_failed = slm_stats.body['total_snapshots_failed']
    if total_failed > 0:
        print(f"WARNING: {total_failed} snapshot(s) have failed")
        for policy_name, stats in slm_stats.body['policy_stats'].items():
            if stats['snapshots_failed'] > 0:
                print(f"  Policy '{policy_name}': {stats['snapshots_failed']} failures")
    
    return {
        'ilm_healthy': ilm_status.body['operation_mode'] == 'RUNNING',
        'slm_healthy': slm_status.body['operation_mode'] == 'RUNNING',
        'lifecycle_errors': error_count,
        'snapshot_failures': total_failed
    }

# Run monitoring
health_status = monitor_lifecycle_health()
print(f"Overall health: {'HEALTHY' if all(health_status.values()) else 'ISSUES DETECTED'}")

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch

docs

client-operations.md

cluster-management.md

esql-operations.md

exception-handling.md

helper-functions.md

index-management.md

index.md

inference-api.md

lifecycle-management.md

machine-learning.md

query-dsl.md

search-operations.md

security-operations.md

vectorstore-helpers.md

tile.json