CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch5

Python client for Elasticsearch 5.x providing comprehensive access to all Elasticsearch APIs and features.

Pending
Overview
Eval results
Files

cluster-operations.mddocs/

Cluster Operations

Cluster-level operations and monitoring through the cluster client. Provides health status, cluster state, node information, statistics, and administrative tasks for cluster management and maintenance.

Capabilities

Cluster Health and Status

Monitor cluster health and get comprehensive state information.

# Accessed via es.cluster
def health(index: str = None, **params) -> dict:
    """
    Get cluster health status and metrics.
    
    Parameters:
    - index: Index name(s) to check health (all if None)
    - level: Detail level ('cluster', 'indices', 'shards')
    - local: Execute on local node only
    - master_timeout: Timeout for master node response
    - timeout: Request timeout
    - wait_for_active_shards: Wait for N active shards
    - wait_for_events: Wait for specific events ('immediate', 'urgent', 'high', 'normal', 'low', 'languid')
    - wait_for_no_relocating_shards: Wait for no relocating shards
    - wait_for_nodes: Wait for N nodes (e.g., '>=3')
    - wait_for_status: Wait for specific status ('green', 'yellow', 'red')
    
    Returns:
    dict: Health information including:
    - cluster_name: Name of the cluster
    - status: Overall health ('green', 'yellow', 'red')
    - timed_out: Whether request timed out
    - number_of_nodes: Total nodes in cluster
    - number_of_data_nodes: Data nodes count
    - active_primary_shards: Primary shards count
    - active_shards: Total active shards
    - relocating_shards: Shards being relocated
    - initializing_shards: Shards being initialized
    - unassigned_shards: Unassigned shards count
    - delayed_unassigned_shards: Delayed unassigned shards
    - number_of_pending_tasks: Pending tasks count
    - number_of_in_flight_fetch: In-flight fetch operations
    - task_max_waiting_in_queue_millis: Max task wait time
    - active_shards_percent_as_number: Active shards percentage
    """

def state(metric: str = None, index: str = None, **params) -> dict:
    """
    Get detailed cluster state information.
    
    Parameters:
    - metric: Specific metrics to retrieve ('_all', 'blocks', 'metadata', 'nodes', 'routing_table', 'routing_nodes', 'master_node', 'version')
    - index: Index name(s) to include in state
    - allow_no_indices: Handle no matching indices
    - expand_wildcards: Wildcard expansion ('open', 'closed', 'none', 'all')
    - flat_settings: Return flattened settings
    - ignore_unavailable: Ignore unavailable indices
    - local: Execute on local node
    - master_timeout: Master node timeout
    - wait_for_metadata_version: Wait for specific metadata version
    - wait_for_timeout: Timeout for waiting
    
    Returns:
    dict: Comprehensive cluster state including:
    - cluster_name: Cluster name
    - version: State version
    - state_uuid: Unique state identifier
    - master_node: Master node ID
    - blocks: Cluster blocks
    - nodes: Node information
    - metadata: Index metadata and templates
    - routing_table: Shard routing information
    """

def stats(node_id: str = None, **params) -> dict:
    """
    Get cluster-wide statistics.
    
    Parameters:
    - node_id: Specific node(s) to get stats from
    - flat_settings: Return flattened settings
    - human: Human-readable format for numbers
    - timeout: Request timeout
    
    Returns:
    dict: Cluster statistics including:
    - timestamp: Statistics timestamp
    - cluster_name: Cluster name
    - status: Cluster status
    - indices: Index-level statistics
    - nodes: Node statistics
    """

def pending_tasks(**params) -> dict:
    """
    Get information about pending cluster tasks.
    
    Parameters:
    - local: Execute on local node only
    - master_timeout: Master node timeout
    
    Returns:
    dict: Pending tasks information with:
    - tasks: Array of pending tasks with priority, source, and time_in_queue
    """

Cluster Settings Management

Configure persistent and transient cluster settings.

def get_settings(**params) -> dict:
    """
    Get current cluster settings.
    
    Parameters:
    - flat_settings: Return settings in flat format
    - include_defaults: Include default settings
    - master_timeout: Master node timeout
    - timeout: Request timeout
    
    Returns:
    dict: Cluster settings organized by type:
    - persistent: Persistent settings (survive cluster restart)
    - transient: Transient settings (reset on restart)
    - defaults: Default settings (if include_defaults=True)
    """

def put_settings(body: dict = None, **params) -> dict:
    """
    Update cluster settings.
    
    Parameters:
    - body: Settings to update
    - flat_settings: Accept flat setting format
    - master_timeout: Master node timeout
    - timeout: Request timeout
    
    Body structure:
    {
        "persistent": {
            "indices.recovery.max_bytes_per_sec": "50mb",
            "cluster.routing.allocation.enable": "all"
        },
        "transient": {
            "logger.discovery": "DEBUG"
        }
    }
    
    Common settings:
    - cluster.routing.allocation.enable: Enable/disable shard allocation
    - cluster.routing.rebalance.enable: Enable/disable shard rebalancing
    - indices.recovery.max_bytes_per_sec: Recovery speed limit
    - cluster.max_shards_per_node: Maximum shards per node
    - search.default_search_timeout: Default search timeout
    
    Returns:
    dict: Settings update confirmation with acknowledged status
    """

Shard Management

Control shard allocation and routing across the cluster.

def reroute(body: dict = None, **params) -> dict:
    """
    Manually reroute shards in the cluster.
    
    Parameters:
    - body: Reroute commands specification
    - dry_run: Show what would happen without executing
    - explain: Provide detailed explanation
    - master_timeout: Master node timeout
    - metric: Metrics to return in response
    - retry_failed: Retry failed shard allocations
    - timeout: Request timeout
    
    Body structure:
    {
        "commands": [
            {
                "move": {
                    "index": "my_index",
                    "shard": 0,
                    "from_node": "node1",
                    "to_node": "node2"
                }
            },
            {
                "allocate_replica": {
                    "index": "my_index", 
                    "shard": 1,
                    "node": "node3"
                }
            },
            {
                "cancel": {
                    "index": "my_index",
                    "shard": 2,
                    "node": "node1",
                    "allow_primary": false
                }
            }
        ]
    }
    
    Available commands:
    - move: Move shard to different node
    - allocate_replica: Allocate replica shard
    - allocate_stale_primary: Allocate stale primary
    - allocate_empty_primary: Allocate empty primary  
    - cancel: Cancel shard allocation
    
    Returns:
    dict: Reroute results with state information
    """

def allocation_explain(body: dict = None, **params) -> dict:
    """
    Explain shard allocation decisions.
    
    Parameters:
    - body: Shard specification to explain
    - include_disk_info: Include disk usage information
    - include_yes_decisions: Include positive decisions
    
    Body structure:
    {
        "index": "my_index",
        "shard": 0,
        "primary": true
    }
    
    Returns:
    dict: Detailed explanation of allocation decisions including:
    - index: Index name
    - shard: Shard number
    - primary: Whether it's primary shard
    - current_state: Current allocation state
    - unassigned_info: Why shard is unassigned (if applicable)
    - can_allocate: Whether allocation is possible
    - allocate_explanation: Detailed allocation decision
    - can_remain_on_current_node: Whether shard can stay
    - can_rebalance_cluster: Whether rebalancing is possible
    - can_rebalance_to_other_node: Whether rebalancing to other nodes is possible
    """

Node Management

Monitor and manage individual nodes through the nodes client.

# Accessed via es.nodes
def info(node_id: str = None, metric: str = None, **params) -> dict:
    """
    Get information about cluster nodes.
    
    Parameters:
    - node_id: Specific node(s) to get info ('_local', '_master', 'node1,node2', etc.)
    - metric: Specific metrics ('settings', 'os', 'process', 'jvm', 'thread_pool', 'transport', 'http', 'plugins', 'ingest')
    - flat_settings: Return flattened settings
    - human: Human-readable format
    - timeout: Request timeout
    
    Returns:
    dict: Node information including:
    - cluster_name: Cluster name
    - nodes: Dictionary of node information with:
      - name: Node name
      - transport_address: Network address
      - host: Hostname
      - ip: IP address
      - version: Elasticsearch version
      - build_hash: Build information
      - roles: Node roles (master, data, ingest, etc.)
      - attributes: Custom node attributes
      - settings: Node settings (if requested)
      - os: Operating system info (if requested)
      - process: Process information (if requested)
      - jvm: JVM information (if requested)
    """

def stats(node_id: str = None, metric: str = None, index_metric: str = None, **params) -> dict:
    """
    Get statistics for cluster nodes.
    
    Parameters:
    - node_id: Specific node(s) to get stats
    - metric: Node-level metrics ('indices', 'os', 'process', 'jvm', 'transport', 'http', 'fs', 'thread_pool', 'breaker', 'script', 'discovery', 'ingest')
    - index_metric: Index-level metrics ('completion', 'docs', 'fielddata', 'query_cache', 'flush', 'get', 'indexing', 'merge', 'request_cache', 'search', 'segments', 'store', 'translog', 'warmer')
    - completion_fields: Fields for completion stats
    - fielddata_fields: Fields for fielddata stats
    - fields: Specific fields for stats
    - groups: Stats groups
    - human: Human-readable format
    - level: Statistics level ('node', 'indices', 'shards')
    - timeout: Request timeout
    - types: Document types for stats
    
    Returns:
    dict: Detailed node statistics including:
    - cluster_name: Cluster name
    - nodes: Per-node statistics with:
      - timestamp: Statistics timestamp
      - name: Node name
      - transport_address: Network address
      - host: Hostname
      - roles: Node roles
      - indices: Index statistics (docs, store, indexing, search, etc.)
      - os: OS statistics (CPU, memory, swap)
      - process: Process statistics (CPU, memory)
      - jvm: JVM statistics (heap, GC, threads)
      - thread_pool: Thread pool statistics
      - fs: Filesystem statistics
      - transport: Transport layer statistics
      - http: HTTP statistics
      - breakers: Circuit breaker statistics
    """

def hot_threads(node_id: str = None, **params) -> dict:
    """
    Get information about hot threads on nodes.
    
    Parameters:
    - node_id: Specific node(s) to analyze
    - ignore_idle_threads: Ignore idle threads
    - interval: Sampling interval
    - snapshots: Number of thread snapshots
    - threads: Number of hot threads to show
    - timeout: Request timeout
    - type: Thread type ('cpu', 'wait', 'block')
    
    Returns:
    dict: Hot thread analysis for performance troubleshooting
    """

Cat API for Human-Readable Output

Quick cluster information in tabular format through the cat client.

# Accessed via es.cat - Returns human-readable tabular data
def health(**params) -> str:
    """Get cluster health in tabular format."""

def nodes(**params) -> str:
    """Get node information in tabular format."""

def master(**params) -> str:
    """Get master node information."""

def indices(index: str = None, **params) -> str:
    """Get index information in tabular format."""

def shards(index: str = None, **params) -> str:
    """Get shard information."""

def allocation(node_id: str = None, **params) -> str:
    """Get shard allocation per node."""

def pending_tasks(**params) -> str:
    """Get pending cluster tasks."""

def thread_pool(thread_pool_patterns: str = None, **params) -> str:
    """Get thread pool information."""

Usage Examples

Cluster Health Monitoring

from elasticsearch5 import Elasticsearch

es = Elasticsearch(['localhost:9200'])

# Basic health check
health = es.cluster.health()
print(f"Cluster status: {health['status']}")
print(f"Active shards: {health['active_shards']}")
print(f"Unassigned shards: {health['unassigned_shards']}")

# Wait for green status
health = es.cluster.health(
    wait_for_status='green',
    timeout='30s'
)

# Detailed health per index
health = es.cluster.health(level='indices')
for index, stats in health['indices'].items():
    print(f"Index {index}: {stats['status']} - {stats['active_shards']} shards")

Cluster Settings Management

# Get current settings
current_settings = es.cluster.get_settings(include_defaults=True)
print("Persistent settings:", current_settings['persistent'])
print("Transient settings:", current_settings['transient'])

# Update cluster settings
new_settings = {
    "persistent": {
        "cluster.routing.allocation.enable": "all",
        "indices.recovery.max_bytes_per_sec": "100mb"
    },
    "transient": {
        "logger.discovery": "INFO"
    }
}
es.cluster.put_settings(body=new_settings)

# Disable shard allocation (useful before maintenance)
maintenance_settings = {
    "transient": {
        "cluster.routing.allocation.enable": "primaries"
    }
}
es.cluster.put_settings(body=maintenance_settings)

# Re-enable after maintenance
enable_settings = {
    "transient": {
        "cluster.routing.allocation.enable": "all"
    }
}
es.cluster.put_settings(body=enable_settings)

Shard Management

# Get allocation explanation for unassigned shard
explanation = es.cluster.allocation_explain(
    body={
        "index": "my_index",
        "shard": 0,
        "primary": True
    },
    include_yes_decisions=True
)
print("Allocation decision:", explanation['allocate_explanation'])

# Manually move a shard
reroute_commands = {
    "commands": [
        {
            "move": {
                "index": "my_index",
                "shard": 0,
                "from_node": "node-1",
                "to_node": "node-2"
            }
        }
    ]
}
result = es.cluster.reroute(body=reroute_commands)
print("Reroute result:", result['acknowledged'])

# Dry run to see what would happen
dry_run = es.cluster.reroute(
    body=reroute_commands,
    dry_run=True,
    explain=True
)

Node Monitoring

# Get basic node information
nodes_info = es.nodes.info()
for node_id, node in nodes_info['nodes'].items():
    print(f"Node: {node['name']} ({node['host']}) - Roles: {node['roles']}")

# Get detailed node statistics
node_stats = es.nodes.stats(metric='os,jvm,indices')
for node_id, stats in node_stats['nodes'].items():
    node_name = stats['name']
    heap_used = stats['jvm']['mem']['heap_used_percent']
    cpu_percent = stats['os']['cpu']['percent']
    print(f"{node_name}: CPU {cpu_percent}%, Heap {heap_used}%")

# Check for hot threads (performance issues)
hot_threads = es.nodes.hot_threads(
    threads=3,
    type='cpu',
    interval='500ms'
)
print("Hot threads analysis:", hot_threads)

Cluster State Analysis

# Get cluster state overview
state = es.cluster.state(metric='metadata,routing_table')
print(f"Master node: {state['master_node']}")
print(f"Cluster UUID: {state['cluster_uuid']}")

# Check index metadata
for index_name, index_info in state['metadata']['indices'].items():
    settings = index_info['settings']['index']
    print(f"Index {index_name}: {settings['number_of_shards']} shards, {settings['number_of_replicas']} replicas")

# Analyze routing table
routing = state['routing_table']['indices']
for index_name, index_routing in routing.items():
    for shard_id, shard_info in index_routing['shards'].items():
        for shard in shard_info:
            print(f"Index {index_name}, shard {shard_id}: {shard['state']} on {shard['node']}")

Using Cat API for Quick Checks

# Quick cluster overview
print("=== Cluster Health ===")
print(es.cat.health(v=True))

print("\n=== Nodes ===")
print(es.cat.nodes(v=True, h='name,heap.percent,ram.percent,cpu,load_1m,master'))

print("\n=== Indices ===")
print(es.cat.indices(v=True, h='index,docs.count,store.size,health'))

print("\n=== Shards ===")
print(es.cat.shards(v=True, h='index,shard,prirep,state,docs,store,node'))

print("\n=== Allocation ===")
print(es.cat.allocation(v=True, h='node,shards,disk.used_percent,disk.avail'))

Cluster Troubleshooting

# Check for common issues
health = es.cluster.health(level='shards')

if health['status'] != 'green':
    print(f"Cluster status: {health['status']}")
    print(f"Unassigned shards: {health['unassigned_shards']}")
    
    # Get pending tasks that might be causing issues
    pending = es.cluster.pending_tasks()
    if pending['tasks']:
        print("Pending tasks:")
        for task in pending['tasks']:
            print(f"  {task['source']}: {task['time_in_queue_millis']}ms")
    
    # Check allocation explanations for unassigned shards
    if health['unassigned_shards'] > 0:
        # Find unassigned shards
        state = es.cluster.state(metric='routing_table')
        for index_name, index_routing in state['routing_table']['indices'].items():
            for shard_id, shard_info in index_routing['shards'].items():
                for shard in shard_info:
                    if shard['state'] == 'UNASSIGNED':
                        explanation = es.cluster.allocation_explain(
                            body={
                                "index": index_name,
                                "shard": int(shard_id),
                                "primary": shard['primary']
                            }
                        )
                        print(f"Unassigned shard {index_name}[{shard_id}]: {explanation['unassigned_info']['reason']}")

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch5

docs

bulk-operations.md

cluster-operations.md

document-operations.md

index-management.md

index.md

search-operations.md

transport-connection.md

tile.json