CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch

Python client for Elasticsearch with comprehensive API coverage and both sync and async support

Pending
Overview
Eval results
Files

cluster-management.mddocs/

Cluster Management

Cluster-level operations for monitoring health, managing settings, shard allocation, and node management. These operations provide comprehensive control over Elasticsearch cluster behavior and configuration.

Capabilities

Cluster Health Monitoring

Monitor cluster health status, index health, and shard allocation.

def health(
    self,
    index: Optional[Union[str, List[str]]] = None,
    level: Optional[str] = None,
    local: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    wait_for_active_shards: Optional[str] = None,
    wait_for_events: Optional[str] = None,
    wait_for_no_initializing_shards: Optional[bool] = None,
    wait_for_no_relocating_shards: Optional[bool] = None,
    wait_for_nodes: Optional[str] = None,
    wait_for_status: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get cluster health information.
    
    Parameters:
    - index: Index name(s) to check health for
    - level: Level of detail (cluster, indices, shards)
    - local: Whether to return local information only
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    - wait_for_active_shards: Wait for specific number of active shards
    - wait_for_events: Wait for specific events to complete
    - wait_for_no_initializing_shards: Wait for no initializing shards
    - wait_for_no_relocating_shards: Wait for no relocating shards
    - wait_for_nodes: Wait for specific number of nodes
    - wait_for_status: Wait for specific cluster status (green, yellow, red)
    
    Returns:
    ObjectApiResponse with cluster health information
    """

Cluster State Management

Retrieve and monitor cluster state information.

def state(
    self,
    metric: Optional[Union[str, List[str]]] = None,
    index: Optional[Union[str, List[str]]] = None,
    allow_no_indices: Optional[bool] = None,
    expand_wildcards: Optional[str] = None,
    flat_settings: Optional[bool] = None,
    ignore_unavailable: Optional[bool] = None,
    local: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    wait_for_metadata_version: Optional[int] = None,
    wait_for_timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get cluster state information.
    
    Parameters:
    - metric: Specific metrics to return (metadata, routing_table, etc.)
    - index: Index name(s) to include in state
    - allow_no_indices: Whether empty index list is acceptable
    - expand_wildcards: How to expand wildcard patterns
    - flat_settings: Whether to return settings in flat format
    - ignore_unavailable: Whether to ignore unavailable indices
    - local: Whether to return local information only
    - master_timeout: Timeout for master node response
    - wait_for_metadata_version: Wait for specific metadata version
    - wait_for_timeout: Timeout for wait conditions
    
    Returns:
    ObjectApiResponse with cluster state
    """

Cluster Statistics

Retrieve comprehensive cluster statistics and metrics.

def stats(
    self,
    node_id: Optional[Union[str, List[str]]] = None,
    flat_settings: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get cluster statistics.
    
    Parameters:
    - node_id: Node ID(s) to include in statistics
    - flat_settings: Whether to return settings in flat format
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with cluster statistics
    """

Cluster Settings Management

Configure cluster-wide settings for behavior control.

def put_settings(
    self,
    persistent: Optional[Dict[str, Any]] = None,
    transient: Optional[Dict[str, Any]] = None,
    flat_settings: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Update cluster settings.
    
    Parameters:
    - persistent: Persistent cluster settings (survive cluster restart)
    - transient: Transient cluster settings (reset on restart)
    - flat_settings: Whether settings are in flat format
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with settings update result
    """

def get_settings(
    self,
    flat_settings: Optional[bool] = None,
    include_defaults: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get cluster settings.
    
    Parameters:
    - flat_settings: Whether to return settings in flat format
    - include_defaults: Whether to include default settings
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with cluster settings
    """

Shard Allocation Management

Control and monitor shard allocation across cluster nodes.

def allocation_explain(
    self,
    current_node: Optional[str] = None,
    index: Optional[str] = None,
    primary: Optional[bool] = None,
    shard: Optional[int] = None,
    include_disk_info: Optional[bool] = None,
    include_yes_decisions: Optional[bool] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Explain shard allocation decisions.
    
    Parameters:
    - current_node: Node where shard currently resides
    - index: Index name for shard
    - primary: Whether to explain primary shard allocation
    - shard: Shard number to explain
    - include_disk_info: Whether to include disk usage information
    - include_yes_decisions: Whether to include positive decisions
    
    Returns:
    ObjectApiResponse with allocation explanation
    """

def reroute(
    self,
    commands: Optional[List[Dict[str, Any]]] = None,
    dry_run: Optional[bool] = None,
    explain: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    metric: Optional[Union[str, List[str]]] = None,
    retry_failed: Optional[bool] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Manually reroute shards in the cluster.
    
    Parameters:
    - commands: List of reroute commands to execute
    - dry_run: Whether to perform dry run without executing
    - explain: Whether to return explanation of routing
    - master_timeout: Timeout for master node response
    - metric: Specific metrics to return
    - retry_failed: Whether to retry failed shard allocations
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with reroute result
    """

Task Management

Monitor and manage cluster-level tasks.

def pending_tasks(
    self,
    local: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get pending cluster tasks.
    
    Parameters:
    - local: Whether to return local information only
    - master_timeout: Timeout for master node response
    
    Returns:
    ObjectApiResponse with pending tasks
    """

Component Template Management

Manage component templates for composable index templates.

def put_component_template(
    self,
    name: str,
    template: Dict[str, Any],
    create: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Create or update a component template.
    
    Parameters:
    - name: Component template name
    - template: Template definition with settings, mappings, aliases
    - create: Whether to fail if template already exists
    - master_timeout: Timeout for master node response
    
    Returns:
    ObjectApiResponse with template creation result
    """

def get_component_template(
    self,
    name: Optional[str] = None,
    local: Optional[bool] = None,
    master_timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Get component templates.
    
    Parameters:
    - name: Component template name (or pattern)
    - local: Whether to return local information only
    - master_timeout: Timeout for master node response
    
    Returns:
    ObjectApiResponse with component templates
    """

def delete_component_template(
    self,
    name: str,
    master_timeout: Optional[str] = None,
    timeout: Optional[str] = None,
    **kwargs
) -> ObjectApiResponse:
    """
    Delete a component template.
    
    Parameters:
    - name: Component template name
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    
    Returns:
    ObjectApiResponse with deletion result
    """

Usage Examples

Basic Cluster Monitoring

from elasticsearch import Elasticsearch

client = Elasticsearch(hosts=['http://localhost:9200'])

# Check cluster health
health = client.cluster.health()
print(f"Cluster status: {health.body['status']}")
print(f"Active shards: {health.body['active_shards']}")
print(f"Number of nodes: {health.body['number_of_nodes']}")

# Wait for yellow status
health = client.cluster.health(
    wait_for_status='yellow',
    timeout='30s'
)

# Check specific index health
index_health = client.cluster.health(
    index='products',
    level='indices'
)
print(f"Products index status: {index_health.body['indices']['products']['status']}")

Cluster Settings Management

# Update cluster settings
client.cluster.put_settings(
    persistent={
        "cluster.routing.allocation.enable": "all",
        "cluster.max_shards_per_node": 2000
    },
    transient={
        "indices.recovery.max_bytes_per_sec": "100mb"
    }
)

# Get current settings
settings = client.cluster.get_settings(include_defaults=True)
print(f"Current routing allocation: {settings.body['persistent']}")

# Reset transient settings
client.cluster.put_settings(
    transient={
        "indices.recovery.max_bytes_per_sec": None
    }
)

Shard Allocation Analysis

# Get allocation explanation for unassigned shard
explanation = client.cluster.allocation_explain(
    index="my_index",
    shard=0,
    primary=True,
    include_disk_info=True
)

print(f"Allocation decision: {explanation.body['allocation_delay']}")
for decision in explanation.body['node_allocation_decisions']:
    node_name = decision['node_name']
    decision_type = decision['deciders'][0]['decision']
    print(f"Node {node_name}: {decision_type}")

# Manual shard reroute
reroute_result = client.cluster.reroute(
    commands=[
        {
            "move": {
                "index": "my_index",
                "shard": 0,
                "from_node": "node-1",
                "to_node": "node-2"
            }
        }
    ],
    explain=True
)

Advanced Cluster Management

# Monitor cluster state changes
current_state = client.cluster.state(
    metric=['metadata', 'routing_table'],
    wait_for_metadata_version=10,
    wait_for_timeout='30s'
)

# Get detailed cluster statistics
stats = client.cluster.stats()
print(f"Total documents: {stats.body['indices']['docs']['count']}")
print(f"Total store size: {stats.body['indices']['store']['size_in_bytes']}")
print(f"Available nodes: {stats.body['nodes']['count']['total']}")

# Check pending tasks
pending = client.cluster.pending_tasks()
if pending.body['tasks']:
    for task in pending.body['tasks']:
        print(f"Pending task: {task['source']} - {task['time_in_queue']}")
else:
    print("No pending cluster tasks")

Component Template Management

# Create a component template
client.cluster.put_component_template(
    name="logs_settings",
    template={
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0,
            "index.lifecycle.name": "logs_policy"
        }
    }
)

client.cluster.put_component_template(
    name="logs_mappings", 
    template={
        "mappings": {
            "properties": {
                "@timestamp": {"type": "date"},
                "level": {"type": "keyword"},
                "message": {"type": "text"},
                "service": {"type": "keyword"}
            }
        }
    }
)

# Create composable index template using components
client.indices.put_index_template(
    name="logs_template",
    index_patterns=["logs-*"],
    composed_of=["logs_settings", "logs_mappings"],
    priority=200
)

# List component templates
templates = client.cluster.get_component_template()
for template_name in templates.body['component_templates']:
    print(f"Component template: {template_name['name']}")

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch

docs

client-operations.md

cluster-management.md

esql-operations.md

exception-handling.md

helper-functions.md

index-management.md

index.md

inference-api.md

lifecycle-management.md

machine-learning.md

query-dsl.md

search-operations.md

security-operations.md

vectorstore-helpers.md

tile.json