0
# Cluster Operations
1
2
Cluster-level operations and monitoring through the `cluster` client. Provides health status, cluster state, node information, statistics, and administrative tasks for cluster management and maintenance.
3
4
## Capabilities
5
6
### Cluster Health and Status
7
8
Monitor cluster health and get comprehensive state information.
9
10
```python { .api }
11
# Accessed via es.cluster
12
def health(index: str = None, **params) -> dict:
13
"""
14
Get cluster health status and metrics.
15
16
Parameters:
17
- index: Index name(s) to check health (all if None)
18
- level: Detail level ('cluster', 'indices', 'shards')
19
- local: Execute on local node only
20
- master_timeout: Timeout for master node response
21
- timeout: Request timeout
22
- wait_for_active_shards: Wait for N active shards
23
- wait_for_events: Wait for specific events ('immediate', 'urgent', 'high', 'normal', 'low', 'languid')
24
- wait_for_no_relocating_shards: Wait for no relocating shards
25
- wait_for_nodes: Wait for N nodes (e.g., '>=3')
26
- wait_for_status: Wait for specific status ('green', 'yellow', 'red')
27
28
Returns:
29
dict: Health information including:
30
- cluster_name: Name of the cluster
31
- status: Overall health ('green', 'yellow', 'red')
32
- timed_out: Whether request timed out
33
- number_of_nodes: Total nodes in cluster
34
- number_of_data_nodes: Data nodes count
35
- active_primary_shards: Primary shards count
36
- active_shards: Total active shards
37
- relocating_shards: Shards being relocated
38
- initializing_shards: Shards being initialized
39
- unassigned_shards: Unassigned shards count
40
- delayed_unassigned_shards: Delayed unassigned shards
41
- number_of_pending_tasks: Pending tasks count
42
- number_of_in_flight_fetch: In-flight fetch operations
43
- task_max_waiting_in_queue_millis: Max task wait time
44
- active_shards_percent_as_number: Active shards percentage
45
"""
46
47
def state(metric: str = None, index: str = None, **params) -> dict:
48
"""
49
Get detailed cluster state information.
50
51
Parameters:
52
- metric: Specific metrics to retrieve ('_all', 'blocks', 'metadata', 'nodes', 'routing_table', 'routing_nodes', 'master_node', 'version')
53
- index: Index name(s) to include in state
54
- allow_no_indices: Handle no matching indices
55
- expand_wildcards: Wildcard expansion ('open', 'closed', 'none', 'all')
56
- flat_settings: Return flattened settings
57
- ignore_unavailable: Ignore unavailable indices
58
- local: Execute on local node
59
- master_timeout: Master node timeout
60
- wait_for_metadata_version: Wait for specific metadata version
61
- wait_for_timeout: Timeout for waiting
62
63
Returns:
64
dict: Comprehensive cluster state including:
65
- cluster_name: Cluster name
66
- version: State version
67
- state_uuid: Unique state identifier
68
- master_node: Master node ID
69
- blocks: Cluster blocks
70
- nodes: Node information
71
- metadata: Index metadata and templates
72
- routing_table: Shard routing information
73
"""
74
75
def stats(node_id: str = None, **params) -> dict:
76
"""
77
Get cluster-wide statistics.
78
79
Parameters:
80
- node_id: Specific node(s) to get stats from
81
- flat_settings: Return flattened settings
82
- human: Human-readable format for numbers
83
- timeout: Request timeout
84
85
Returns:
86
dict: Cluster statistics including:
87
- timestamp: Statistics timestamp
88
- cluster_name: Cluster name
89
- status: Cluster status
90
- indices: Index-level statistics
91
- nodes: Node statistics
92
"""
93
94
def pending_tasks(**params) -> dict:
95
"""
96
Get information about pending cluster tasks.
97
98
Parameters:
99
- local: Execute on local node only
100
- master_timeout: Master node timeout
101
102
Returns:
103
dict: Pending tasks information with:
104
- tasks: Array of pending tasks with priority, source, and time_in_queue
105
"""
106
```
107
108
### Cluster Settings Management
109
110
Configure persistent and transient cluster settings.
111
112
```python { .api }
113
def get_settings(**params) -> dict:
114
"""
115
Get current cluster settings.
116
117
Parameters:
118
- flat_settings: Return settings in flat format
119
- include_defaults: Include default settings
120
- master_timeout: Master node timeout
121
- timeout: Request timeout
122
123
Returns:
124
dict: Cluster settings organized by type:
125
- persistent: Persistent settings (survive cluster restart)
126
- transient: Transient settings (reset on restart)
127
- defaults: Default settings (if include_defaults=True)
128
"""
129
130
def put_settings(body: dict = None, **params) -> dict:
131
"""
132
Update cluster settings.
133
134
Parameters:
135
- body: Settings to update
136
- flat_settings: Accept flat setting format
137
- master_timeout: Master node timeout
138
- timeout: Request timeout
139
140
Body structure:
141
{
142
"persistent": {
143
"indices.recovery.max_bytes_per_sec": "50mb",
144
"cluster.routing.allocation.enable": "all"
145
},
146
"transient": {
147
"logger.discovery": "DEBUG"
148
}
149
}
150
151
Common settings:
152
- cluster.routing.allocation.enable: Enable/disable shard allocation
153
- cluster.routing.rebalance.enable: Enable/disable shard rebalancing
154
- indices.recovery.max_bytes_per_sec: Recovery speed limit
155
- cluster.max_shards_per_node: Maximum shards per node
156
- search.default_search_timeout: Default search timeout
157
158
Returns:
159
dict: Settings update confirmation with acknowledged status
160
"""
161
```
162
163
### Shard Management
164
165
Control shard allocation and routing across the cluster.
166
167
```python { .api }
168
def reroute(body: dict = None, **params) -> dict:
169
"""
170
Manually reroute shards in the cluster.
171
172
Parameters:
173
- body: Reroute commands specification
174
- dry_run: Show what would happen without executing
175
- explain: Provide detailed explanation
176
- master_timeout: Master node timeout
177
- metric: Metrics to return in response
178
- retry_failed: Retry failed shard allocations
179
- timeout: Request timeout
180
181
Body structure:
182
{
183
"commands": [
184
{
185
"move": {
186
"index": "my_index",
187
"shard": 0,
188
"from_node": "node1",
189
"to_node": "node2"
190
}
191
},
192
{
193
"allocate_replica": {
194
"index": "my_index",
195
"shard": 1,
196
"node": "node3"
197
}
198
},
199
{
200
"cancel": {
201
"index": "my_index",
202
"shard": 2,
203
"node": "node1",
204
"allow_primary": false
205
}
206
}
207
]
208
}
209
210
Available commands:
211
- move: Move shard to different node
212
- allocate_replica: Allocate replica shard
213
- allocate_stale_primary: Allocate stale primary
214
- allocate_empty_primary: Allocate empty primary
215
- cancel: Cancel shard allocation
216
217
Returns:
218
dict: Reroute results with state information
219
"""
220
221
def allocation_explain(body: dict = None, **params) -> dict:
222
"""
223
Explain shard allocation decisions.
224
225
Parameters:
226
- body: Shard specification to explain
227
- include_disk_info: Include disk usage information
228
- include_yes_decisions: Include positive decisions
229
230
Body structure:
231
{
232
"index": "my_index",
233
"shard": 0,
234
"primary": true
235
}
236
237
Returns:
238
dict: Detailed explanation of allocation decisions including:
239
- index: Index name
240
- shard: Shard number
241
- primary: Whether it's primary shard
242
- current_state: Current allocation state
243
- unassigned_info: Why shard is unassigned (if applicable)
244
- can_allocate: Whether allocation is possible
245
- allocate_explanation: Detailed allocation decision
246
- can_remain_on_current_node: Whether shard can stay
247
- can_rebalance_cluster: Whether rebalancing is possible
248
- can_rebalance_to_other_node: Whether rebalancing to other nodes is possible
249
"""
250
```
251
252
## Node Management
253
254
Monitor and manage individual nodes through the `nodes` client.
255
256
```python { .api }
257
# Accessed via es.nodes
258
def info(node_id: str = None, metric: str = None, **params) -> dict:
259
"""
260
Get information about cluster nodes.
261
262
Parameters:
263
- node_id: Specific node(s) to get info ('_local', '_master', 'node1,node2', etc.)
264
- metric: Specific metrics ('settings', 'os', 'process', 'jvm', 'thread_pool', 'transport', 'http', 'plugins', 'ingest')
265
- flat_settings: Return flattened settings
266
- human: Human-readable format
267
- timeout: Request timeout
268
269
Returns:
270
dict: Node information including:
271
- cluster_name: Cluster name
272
- nodes: Dictionary of node information with:
273
- name: Node name
274
- transport_address: Network address
275
- host: Hostname
276
- ip: IP address
277
- version: Elasticsearch version
278
- build_hash: Build information
279
- roles: Node roles (master, data, ingest, etc.)
280
- attributes: Custom node attributes
281
- settings: Node settings (if requested)
282
- os: Operating system info (if requested)
283
- process: Process information (if requested)
284
- jvm: JVM information (if requested)
285
"""
286
287
def stats(node_id: str = None, metric: str = None, index_metric: str = None, **params) -> dict:
288
"""
289
Get statistics for cluster nodes.
290
291
Parameters:
292
- node_id: Specific node(s) to get stats
293
- metric: Node-level metrics ('indices', 'os', 'process', 'jvm', 'transport', 'http', 'fs', 'thread_pool', 'breaker', 'script', 'discovery', 'ingest')
294
- index_metric: Index-level metrics ('completion', 'docs', 'fielddata', 'query_cache', 'flush', 'get', 'indexing', 'merge', 'request_cache', 'search', 'segments', 'store', 'translog', 'warmer')
295
- completion_fields: Fields for completion stats
296
- fielddata_fields: Fields for fielddata stats
297
- fields: Specific fields for stats
298
- groups: Stats groups
299
- human: Human-readable format
300
- level: Statistics level ('node', 'indices', 'shards')
301
- timeout: Request timeout
302
- types: Document types for stats
303
304
Returns:
305
dict: Detailed node statistics including:
306
- cluster_name: Cluster name
307
- nodes: Per-node statistics with:
308
- timestamp: Statistics timestamp
309
- name: Node name
310
- transport_address: Network address
311
- host: Hostname
312
- roles: Node roles
313
- indices: Index statistics (docs, store, indexing, search, etc.)
314
- os: OS statistics (CPU, memory, swap)
315
- process: Process statistics (CPU, memory)
316
- jvm: JVM statistics (heap, GC, threads)
317
- thread_pool: Thread pool statistics
318
- fs: Filesystem statistics
319
- transport: Transport layer statistics
320
- http: HTTP statistics
321
- breakers: Circuit breaker statistics
322
"""
323
324
def hot_threads(node_id: str = None, **params) -> dict:
325
"""
326
Get information about hot threads on nodes.
327
328
Parameters:
329
- node_id: Specific node(s) to analyze
330
- ignore_idle_threads: Ignore idle threads
331
- interval: Sampling interval
332
- snapshots: Number of thread snapshots
333
- threads: Number of hot threads to show
334
- timeout: Request timeout
335
- type: Thread type ('cpu', 'wait', 'block')
336
337
Returns:
338
dict: Hot thread analysis for performance troubleshooting
339
"""
340
```
341
342
## Cat API for Human-Readable Output
343
344
Quick cluster information in tabular format through the `cat` client.
345
346
```python { .api }
347
# Accessed via es.cat - Returns human-readable tabular data
348
def health(**params) -> str:
349
"""Get cluster health in tabular format."""
350
351
def nodes(**params) -> str:
352
"""Get node information in tabular format."""
353
354
def master(**params) -> str:
355
"""Get master node information."""
356
357
def indices(index: str = None, **params) -> str:
358
"""Get index information in tabular format."""
359
360
def shards(index: str = None, **params) -> str:
361
"""Get shard information."""
362
363
def allocation(node_id: str = None, **params) -> str:
364
"""Get shard allocation per node."""
365
366
def pending_tasks(**params) -> str:
367
"""Get pending cluster tasks."""
368
369
def thread_pool(thread_pool_patterns: str = None, **params) -> str:
370
"""Get thread pool information."""
371
```
372
373
## Usage Examples
374
375
### Cluster Health Monitoring
376
377
```python
378
from elasticsearch5 import Elasticsearch
379
380
es = Elasticsearch(['localhost:9200'])
381
382
# Basic health check
383
health = es.cluster.health()
384
print(f"Cluster status: {health['status']}")
385
print(f"Active shards: {health['active_shards']}")
386
print(f"Unassigned shards: {health['unassigned_shards']}")
387
388
# Wait for green status
389
health = es.cluster.health(
390
wait_for_status='green',
391
timeout='30s'
392
)
393
394
# Detailed health per index
395
health = es.cluster.health(level='indices')
396
for index, stats in health['indices'].items():
397
print(f"Index {index}: {stats['status']} - {stats['active_shards']} shards")
398
```
399
400
### Cluster Settings Management
401
402
```python
403
# Get current settings
404
current_settings = es.cluster.get_settings(include_defaults=True)
405
print("Persistent settings:", current_settings['persistent'])
406
print("Transient settings:", current_settings['transient'])
407
408
# Update cluster settings
409
new_settings = {
410
"persistent": {
411
"cluster.routing.allocation.enable": "all",
412
"indices.recovery.max_bytes_per_sec": "100mb"
413
},
414
"transient": {
415
"logger.discovery": "INFO"
416
}
417
}
418
es.cluster.put_settings(body=new_settings)
419
420
# Disable shard allocation (useful before maintenance)
421
maintenance_settings = {
422
"transient": {
423
"cluster.routing.allocation.enable": "primaries"
424
}
425
}
426
es.cluster.put_settings(body=maintenance_settings)
427
428
# Re-enable after maintenance
429
enable_settings = {
430
"transient": {
431
"cluster.routing.allocation.enable": "all"
432
}
433
}
434
es.cluster.put_settings(body=enable_settings)
435
```
436
437
### Shard Management
438
439
```python
440
# Get allocation explanation for unassigned shard
441
explanation = es.cluster.allocation_explain(
442
body={
443
"index": "my_index",
444
"shard": 0,
445
"primary": True
446
},
447
include_yes_decisions=True
448
)
449
print("Allocation decision:", explanation['allocate_explanation'])
450
451
# Manually move a shard
452
reroute_commands = {
453
"commands": [
454
{
455
"move": {
456
"index": "my_index",
457
"shard": 0,
458
"from_node": "node-1",
459
"to_node": "node-2"
460
}
461
}
462
]
463
}
464
result = es.cluster.reroute(body=reroute_commands)
465
print("Reroute result:", result['acknowledged'])
466
467
# Dry run to see what would happen
468
dry_run = es.cluster.reroute(
469
body=reroute_commands,
470
dry_run=True,
471
explain=True
472
)
473
```
474
475
### Node Monitoring
476
477
```python
478
# Get basic node information
479
nodes_info = es.nodes.info()
480
for node_id, node in nodes_info['nodes'].items():
481
print(f"Node: {node['name']} ({node['host']}) - Roles: {node['roles']}")
482
483
# Get detailed node statistics
484
node_stats = es.nodes.stats(metric='os,jvm,indices')
485
for node_id, stats in node_stats['nodes'].items():
486
node_name = stats['name']
487
heap_used = stats['jvm']['mem']['heap_used_percent']
488
cpu_percent = stats['os']['cpu']['percent']
489
print(f"{node_name}: CPU {cpu_percent}%, Heap {heap_used}%")
490
491
# Check for hot threads (performance issues)
492
hot_threads = es.nodes.hot_threads(
493
threads=3,
494
type='cpu',
495
interval='500ms'
496
)
497
print("Hot threads analysis:", hot_threads)
498
```
499
500
### Cluster State Analysis
501
502
```python
503
# Get cluster state overview
504
state = es.cluster.state(metric='metadata,routing_table')
505
print(f"Master node: {state['master_node']}")
506
print(f"Cluster UUID: {state['cluster_uuid']}")
507
508
# Check index metadata
509
for index_name, index_info in state['metadata']['indices'].items():
510
settings = index_info['settings']['index']
511
print(f"Index {index_name}: {settings['number_of_shards']} shards, {settings['number_of_replicas']} replicas")
512
513
# Analyze routing table
514
routing = state['routing_table']['indices']
515
for index_name, index_routing in routing.items():
516
for shard_id, shard_info in index_routing['shards'].items():
517
for shard in shard_info:
518
print(f"Index {index_name}, shard {shard_id}: {shard['state']} on {shard['node']}")
519
```
520
521
### Using Cat API for Quick Checks
522
523
```python
524
# Quick cluster overview
525
print("=== Cluster Health ===")
526
print(es.cat.health(v=True))
527
528
print("\n=== Nodes ===")
529
print(es.cat.nodes(v=True, h='name,heap.percent,ram.percent,cpu,load_1m,master'))
530
531
print("\n=== Indices ===")
532
print(es.cat.indices(v=True, h='index,docs.count,store.size,health'))
533
534
print("\n=== Shards ===")
535
print(es.cat.shards(v=True, h='index,shard,prirep,state,docs,store,node'))
536
537
print("\n=== Allocation ===")
538
print(es.cat.allocation(v=True, h='node,shards,disk.used_percent,disk.avail'))
539
```
540
541
### Cluster Troubleshooting
542
543
```python
544
# Check for common issues
545
health = es.cluster.health(level='shards')
546
547
if health['status'] != 'green':
548
print(f"Cluster status: {health['status']}")
549
print(f"Unassigned shards: {health['unassigned_shards']}")
550
551
# Get pending tasks that might be causing issues
552
pending = es.cluster.pending_tasks()
553
if pending['tasks']:
554
print("Pending tasks:")
555
for task in pending['tasks']:
556
print(f" {task['source']}: {task['time_in_queue_millis']}ms")
557
558
# Check allocation explanations for unassigned shards
559
if health['unassigned_shards'] > 0:
560
# Find unassigned shards
561
state = es.cluster.state(metric='routing_table')
562
for index_name, index_routing in state['routing_table']['indices'].items():
563
for shard_id, shard_info in index_routing['shards'].items():
564
for shard in shard_info:
565
if shard['state'] == 'UNASSIGNED':
566
explanation = es.cluster.allocation_explain(
567
body={
568
"index": index_name,
569
"shard": int(shard_id),
570
"primary": shard['primary']
571
}
572
)
573
print(f"Unassigned shard {index_name}[{shard_id}]: {explanation['unassigned_info']['reason']}")
574
```