0
# Namespaced APIs
1
2
Specialized client interfaces for different aspects of OpenSearch cluster management. These namespaced APIs are accessible as properties on both `OpenSearch` and `AsyncOpenSearch` client instances, providing organized access to related functionality.
3
4
## Capabilities
5
6
### Indices Management
7
8
Comprehensive index lifecycle management including creation, deletion, mapping management, and settings configuration.
9
10
```python { .api }
11
class IndicesClient:
12
def create(self, index, body=None, **kwargs):
13
"""
14
Create an index with optional settings and mappings.
15
16
Parameters:
17
- index (str): Index name
18
- body (dict, optional): Index settings and mappings
19
- wait_for_active_shards (str/int, optional): Wait for active shards
20
- timeout (str, optional): Operation timeout
21
- master_timeout (str, optional): Master node timeout
22
23
Returns:
24
dict: Index creation response
25
"""
26
27
def delete(self, index, **kwargs):
28
"""Delete one or more indices."""
29
30
def exists(self, index, **kwargs):
31
"""Check if an index exists."""
32
33
def get(self, index, **kwargs):
34
"""Get index information including settings and mappings."""
35
36
def get_mapping(self, index=None, **kwargs):
37
"""Get mapping definitions for indices."""
38
39
def put_mapping(self, index, body, **kwargs):
40
"""Update mapping for existing indices."""
41
42
def get_settings(self, index=None, name=None, **kwargs):
43
"""Get index settings."""
44
45
def put_settings(self, body, index=None, **kwargs):
46
"""Update index settings."""
47
48
def refresh(self, index=None, **kwargs):
49
"""Refresh indices to make recent changes visible."""
50
51
def flush(self, index=None, **kwargs):
52
"""Flush indices to ensure data is written to disk."""
53
54
def forcemerge(self, index=None, **kwargs):
55
"""Force merge index segments."""
56
57
def open(self, index, **kwargs):
58
"""Open a closed index."""
59
60
def close(self, index, **kwargs):
61
"""Close an index."""
62
63
def clear_cache(self, index=None, **kwargs):
64
"""Clear index caches."""
65
66
def analyze(self, body=None, index=None, **kwargs):
67
"""Analyze text using index analyzers."""
68
69
def validate_query(self, body=None, index=None, **kwargs):
70
"""Validate a query without executing it."""
71
```
72
73
### Cluster Management
74
75
Cluster-wide operations for monitoring health, managing settings, and administrative tasks.
76
77
```python { .api }
78
class ClusterClient:
79
def health(self, index=None, **kwargs):
80
"""
81
Get cluster health status.
82
83
Parameters:
84
- index (str/list, optional): Index name(s) to check
85
- level (str, optional): Detail level ('cluster', 'indices', 'shards')
86
- local (bool, optional): Return local cluster state
87
- master_timeout (str, optional): Master node timeout
88
- timeout (str, optional): Operation timeout
89
- wait_for_active_shards (str/int, optional): Wait for active shards
90
- wait_for_nodes (str, optional): Wait for N nodes
91
- wait_for_events (str, optional): Wait for priority events
92
- wait_for_no_relocating_shards (bool, optional): Wait for no relocating shards
93
- wait_for_no_initializing_shards (bool, optional): Wait for no initializing shards
94
- wait_for_status (str, optional): Wait for status ('green', 'yellow', 'red')
95
96
Returns:
97
dict: Cluster health information
98
"""
99
100
def state(self, metric=None, index=None, **kwargs):
101
"""Get comprehensive cluster state information."""
102
103
def stats(self, node_id=None, **kwargs):
104
"""Get cluster statistics."""
105
106
def pending_tasks(self, **kwargs):
107
"""Get cluster pending tasks."""
108
109
def get_settings(self, **kwargs):
110
"""Get cluster settings."""
111
112
def put_settings(self, body, **kwargs):
113
"""Update cluster settings."""
114
115
def reroute(self, body=None, **kwargs):
116
"""Manually reroute cluster shards."""
117
118
def allocation_explain(self, body=None, **kwargs):
119
"""Explain shard allocation decisions."""
120
121
def remote_info(self, **kwargs):
122
"""Get remote cluster information."""
123
```
124
125
### Node Information
126
127
Node-level operations for monitoring and managing individual cluster nodes.
128
129
```python { .api }
130
class NodesClient:
131
def info(self, node_id=None, metric=None, **kwargs):
132
"""
133
Get node information.
134
135
Parameters:
136
- node_id (str/list, optional): Node ID(s) or names
137
- metric (str/list, optional): Information metrics to retrieve
138
- flat_settings (bool, optional): Return flat settings format
139
- timeout (str, optional): Operation timeout
140
141
Returns:
142
dict: Node information
143
"""
144
145
def stats(self, node_id=None, metric=None, **kwargs):
146
"""Get node statistics."""
147
148
def hot_threads(self, node_id=None, **kwargs):
149
"""Get hot threads information for nodes."""
150
151
def usage(self, node_id=None, metric=None, **kwargs):
152
"""Get node feature usage statistics."""
153
154
def reload_secure_settings(self, node_id=None, body=None, **kwargs):
155
"""Reload secure settings on nodes."""
156
```
157
158
### Cat APIs
159
160
Human-readable cluster information in tabular format, useful for monitoring and debugging.
161
162
```python { .api }
163
class CatClient:
164
def aliases(self, name=None, **kwargs):
165
"""Show index aliases in tabular format."""
166
167
def allocation(self, node_id=None, **kwargs):
168
"""Show shard allocation across nodes."""
169
170
def count(self, index=None, **kwargs):
171
"""Show document counts per index."""
172
173
def health(self, **kwargs):
174
"""Show cluster health in tabular format."""
175
176
def indices(self, index=None, **kwargs):
177
"""
178
Show indices information in tabular format.
179
180
Parameters:
181
- index (str/list, optional): Index name(s)
182
- bytes (str, optional): Unit for byte values ('b', 'k', 'm', 'g', 't', 'p')
183
- format (str, optional): Response format ('json', 'yaml')
184
- h (str/list, optional): Column headers to display
185
- help (bool, optional): Show column descriptions
186
- local (bool, optional): Return local information
187
- master_timeout (str, optional): Master node timeout
188
- pri (bool, optional): Show only primary shards
189
- s (str/list, optional): Sort columns
190
- time (str, optional): Time unit ('d', 'h', 'm', 's', 'ms', 'micros', 'nanos')
191
- v (bool, optional): Verbose output with headers
192
193
Returns:
194
str: Tabular index information
195
"""
196
197
def master(self, **kwargs):
198
"""Show master node information."""
199
200
def nodes(self, **kwargs):
201
"""Show node information in tabular format."""
202
203
def pending_tasks(self, **kwargs):
204
"""Show pending cluster tasks."""
205
206
def plugins(self, **kwargs):
207
"""Show installed plugins per node."""
208
209
def recovery(self, index=None, **kwargs):
210
"""Show index recovery information."""
211
212
def repositories(self, **kwargs):
213
"""Show snapshot repositories."""
214
215
def segments(self, index=None, **kwargs):
216
"""Show index segment information."""
217
218
def shards(self, index=None, **kwargs):
219
"""Show shard information."""
220
221
def snapshots(self, repository=None, **kwargs):
222
"""Show snapshot information."""
223
224
def tasks(self, **kwargs):
225
"""Show currently running tasks."""
226
227
def templates(self, name=None, **kwargs):
228
"""Show index templates."""
229
230
def thread_pool(self, thread_pool_patterns=None, **kwargs):
231
"""Show thread pool statistics."""
232
```
233
234
### Ingest Pipelines
235
236
Management of ingest pipelines for preprocessing documents before indexing.
237
238
```python { .api }
239
class IngestClient:
240
def put_pipeline(self, id, body, **kwargs):
241
"""
242
Create or update an ingest pipeline.
243
244
Parameters:
245
- id (str): Pipeline ID
246
- body (dict): Pipeline definition with processors
247
- master_timeout (str, optional): Master node timeout
248
- timeout (str, optional): Operation timeout
249
250
Body format:
251
{
252
"description": "Pipeline description",
253
"processors": [
254
{
255
"set": {
256
"field": "processed",
257
"value": true
258
}
259
}
260
]
261
}
262
263
Returns:
264
dict: Pipeline creation response
265
"""
266
267
def get_pipeline(self, id=None, **kwargs):
268
"""Get ingest pipeline definitions."""
269
270
def delete_pipeline(self, id, **kwargs):
271
"""Delete an ingest pipeline."""
272
273
def simulate(self, body, id=None, **kwargs):
274
"""Simulate pipeline execution on sample documents."""
275
276
def processor_grok(self, **kwargs):
277
"""Get grok processor patterns."""
278
```
279
280
### Snapshot and Restore
281
282
Backup and restore operations for indices and cluster state.
283
284
```python { .api }
285
class SnapshotClient:
286
def create_repository(self, repository, body, **kwargs):
287
"""
288
Create a snapshot repository.
289
290
Parameters:
291
- repository (str): Repository name
292
- body (dict): Repository configuration
293
- master_timeout (str, optional): Master node timeout
294
- timeout (str, optional): Operation timeout
295
- verify (bool, optional): Verify repository after creation
296
297
Returns:
298
dict: Repository creation response
299
"""
300
301
def get_repository(self, repository=None, **kwargs):
302
"""Get snapshot repository information."""
303
304
def delete_repository(self, repository, **kwargs):
305
"""Delete a snapshot repository."""
306
307
def verify_repository(self, repository, **kwargs):
308
"""Verify repository integrity."""
309
310
def create(self, repository, snapshot, body=None, **kwargs):
311
"""Create a snapshot of indices."""
312
313
def get(self, repository, snapshot, **kwargs):
314
"""Get snapshot information."""
315
316
def delete(self, repository, snapshot, **kwargs):
317
"""Delete a snapshot."""
318
319
def restore(self, repository, snapshot, body=None, **kwargs):
320
"""Restore indices from a snapshot."""
321
322
def status(self, repository=None, snapshot=None, **kwargs):
323
"""Get snapshot status information."""
324
325
def cleanup_repository(self, repository, **kwargs):
326
"""Clean up stale repository data."""
327
```
328
329
### Task Management
330
331
Monitor and manage long-running operations and background tasks.
332
333
```python { .api }
334
class TasksClient:
335
def list(self, **kwargs):
336
"""
337
List currently running tasks.
338
339
Parameters:
340
- nodes (str/list, optional): Node IDs to query
341
- actions (str/list, optional): Action types to filter
342
- detailed (bool, optional): Return detailed task information
343
- parent_task_id (str, optional): Filter by parent task ID
344
- wait_for_completion (bool, optional): Wait for task completion
345
- timeout (str, optional): Operation timeout
346
- group_by (str, optional): Group tasks by ('nodes', 'parents', 'none')
347
348
Returns:
349
dict: Task information
350
"""
351
352
def get(self, task_id, **kwargs):
353
"""Get information about a specific task."""
354
355
def cancel(self, task_id=None, **kwargs):
356
"""Cancel tasks."""
357
```
358
359
### Security Management
360
361
Security-related operations when the security plugin is installed.
362
363
```python { .api }
364
class SecurityClient:
365
def get_account_details(self, **kwargs):
366
"""Get current user account details."""
367
368
def change_password(self, body, **kwargs):
369
"""Change current user password."""
370
371
def get_user(self, username=None, **kwargs):
372
"""Get user information."""
373
374
def create_user(self, username, body, **kwargs):
375
"""Create a new user."""
376
377
def patch_user(self, username, body, **kwargs):
378
"""Update user attributes."""
379
380
def delete_user(self, username, **kwargs):
381
"""Delete a user."""
382
383
def get_role(self, role=None, **kwargs):
384
"""Get role information."""
385
386
def create_role(self, role, body, **kwargs):
387
"""Create a new role."""
388
389
def patch_role(self, role, body, **kwargs):
390
"""Update role permissions."""
391
392
def delete_role(self, role, **kwargs):
393
"""Delete a role."""
394
395
def get_role_mapping(self, role=None, **kwargs):
396
"""Get role mapping information."""
397
398
def create_role_mapping(self, role, body, **kwargs):
399
"""Create role mapping."""
400
401
def patch_role_mapping(self, role, body, **kwargs):
402
"""Update role mapping."""
403
404
def delete_role_mapping(self, role, **kwargs):
405
"""Delete role mapping."""
406
```
407
408
### Extended Namespaced APIs
409
410
Additional specialized APIs for advanced OpenSearch features.
411
412
```python { .api }
413
class DanglingIndicesClient:
414
def list_dangling_indices(self, **kwargs):
415
"""List dangling indices."""
416
417
def import_dangling_index(self, index_uuid, **kwargs):
418
"""Import a dangling index."""
419
420
def delete_dangling_index(self, index_uuid, **kwargs):
421
"""Delete a dangling index."""
422
423
class FeaturesClient:
424
def get_features(self, **kwargs):
425
"""Get available features."""
426
427
def reset_features(self, **kwargs):
428
"""Reset features to default state."""
429
430
class RemoteClient:
431
def info(self, **kwargs):
432
"""Get remote cluster information."""
433
434
class SearchPipelineClient:
435
def put(self, id, body, **kwargs):
436
"""Create or update a search pipeline."""
437
438
def get(self, id=None, **kwargs):
439
"""Get search pipeline definitions."""
440
441
def delete(self, id, **kwargs):
442
"""Delete a search pipeline."""
443
444
class WlmClient:
445
def create_query_group(self, body, **kwargs):
446
"""Create a workload management query group."""
447
448
def get_query_group(self, name=None, **kwargs):
449
"""Get query group information."""
450
451
def update_query_group(self, name, body, **kwargs):
452
"""Update query group settings."""
453
454
def delete_query_group(self, name, **kwargs):
455
"""Delete a query group."""
456
```
457
458
## Usage Examples
459
460
### Index Management
461
462
```python
463
from opensearchpy import OpenSearch
464
465
client = OpenSearch([{'host': 'localhost', 'port': 9200}])
466
467
# Create an index with settings and mappings
468
index_body = {
469
"settings": {
470
"number_of_shards": 1,
471
"number_of_replicas": 0,
472
"analysis": {
473
"analyzer": {
474
"custom_analyzer": {
475
"type": "custom",
476
"tokenizer": "standard",
477
"filter": ["lowercase", "stop"]
478
}
479
}
480
}
481
},
482
"mappings": {
483
"properties": {
484
"title": {
485
"type": "text",
486
"analyzer": "custom_analyzer"
487
},
488
"publish_date": {
489
"type": "date"
490
},
491
"category": {
492
"type": "keyword"
493
}
494
}
495
}
496
}
497
498
response = client.indices.create(index='my-index', body=index_body)
499
print(f"Index created: {response['acknowledged']}")
500
501
# Update index settings
502
settings_update = {
503
"settings": {
504
"refresh_interval": "30s"
505
}
506
}
507
client.indices.put_settings(index='my-index', body=settings_update)
508
509
# Add a field to existing mapping
510
mapping_update = {
511
"properties": {
512
"tags": {
513
"type": "keyword"
514
}
515
}
516
}
517
client.indices.put_mapping(index='my-index', body=mapping_update)
518
519
# Check if index exists
520
if client.indices.exists(index='my-index'):
521
print("Index exists")
522
523
# Get index information
524
index_info = client.indices.get(index='my-index')
525
print(f"Index settings: {index_info['my-index']['settings']}")
526
```
527
528
### Cluster Monitoring
529
530
```python
531
# Check cluster health
532
health = client.cluster.health()
533
print(f"Cluster status: {health['status']}")
534
print(f"Active shards: {health['active_shards']}")
535
536
# Get detailed cluster state
537
state = client.cluster.state(metric=['nodes', 'routing_table'])
538
print(f"Master node: {state['master_node']}")
539
540
# Monitor cluster statistics
541
stats = client.cluster.stats()
542
print(f"Total indices: {stats['indices']['count']}")
543
print(f"Total nodes: {stats['nodes']['count']['total']}")
544
545
# Check pending tasks
546
tasks = client.cluster.pending_tasks()
547
if tasks['tasks']:
548
print(f"Pending tasks: {len(tasks['tasks'])}")
549
```
550
551
### Node Information
552
553
```python
554
# Get all node information
555
nodes_info = client.nodes.info()
556
for node_id, node_info in nodes_info['nodes'].items():
557
print(f"Node: {node_info['name']} ({node_info['version']})")
558
559
# Get node statistics
560
nodes_stats = client.nodes.stats(metric=['jvm', 'indices'])
561
for node_id, stats in nodes_stats['nodes'].items():
562
jvm_mem = stats['jvm']['mem']
563
print(f"Node JVM heap used: {jvm_mem['heap_used_percent']}%")
564
```
565
566
### Cat API Usage
567
568
```python
569
# Get indices in tabular format
570
indices_info = client.cat.indices(v=True, h=['index', 'docs.count', 'store.size'])
571
print(indices_info)
572
573
# Monitor cluster health
574
health_info = client.cat.health(v=True)
575
print(health_info)
576
577
# Check shard allocation
578
allocation_info = client.cat.allocation(v=True, h=['node', 'shards', 'disk.used_percent'])
579
print(allocation_info)
580
```
581
582
### Ingest Pipeline Management
583
584
```python
585
# Create an ingest pipeline
586
pipeline_body = {
587
"description": "Log processing pipeline",
588
"processors": [
589
{
590
"grok": {
591
"field": "message",
592
"patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"]
593
}
594
},
595
{
596
"date": {
597
"field": "timestamp",
598
"formats": ["ISO8601"]
599
}
600
},
601
{
602
"lowercase": {
603
"field": "level"
604
}
605
}
606
]
607
}
608
609
client.ingest.put_pipeline(id='log-pipeline', body=pipeline_body)
610
611
# Test the pipeline with sample data
612
simulate_body = {
613
"pipeline": pipeline_body,
614
"docs": [
615
{
616
"_source": {
617
"message": "2024-01-01T10:00:00.000Z ERROR Database connection failed"
618
}
619
}
620
]
621
}
622
623
result = client.ingest.simulate(body=simulate_body)
624
print(f"Processed document: {result['docs'][0]['doc']['_source']}")
625
```
626
627
### Snapshot Operations
628
629
```python
630
# Create a filesystem snapshot repository
631
repo_body = {
632
"type": "fs",
633
"settings": {
634
"location": "/backup/opensearch",
635
"compress": True
636
}
637
}
638
639
client.snapshot.create_repository(repository='backup-repo', body=repo_body)
640
641
# Create a snapshot of specific indices
642
snapshot_body = {
643
"indices": "my-index,another-index",
644
"ignore_unavailable": True,
645
"include_global_state": False,
646
"metadata": {
647
"taken_by": "python-client",
648
"taken_because": "daily backup"
649
}
650
}
651
652
client.snapshot.create(
653
repository='backup-repo',
654
snapshot='daily-snapshot-2024-01-01',
655
body=snapshot_body,
656
wait_for_completion=True
657
)
658
659
# List snapshots
660
snapshots = client.snapshot.get(repository='backup-repo', snapshot='*')
661
for snapshot in snapshots['snapshots']:
662
print(f"Snapshot: {snapshot['snapshot']} - State: {snapshot['state']}")
663
```