0
# Plugin APIs
1
2
Dedicated client interfaces for OpenSearch plugins providing advanced functionality like machine learning, security analytics, alerting, and specialized search capabilities. These plugin APIs are accessible as properties on both `OpenSearch` and `AsyncOpenSearch` client instances.
3
4
## Capabilities
5
6
### Machine Learning Plugin
7
8
Advanced machine learning capabilities including model management, training, and inference operations.
9
10
```python { .api }
11
class MlClient:
12
def register_model(self, body, **kwargs):
13
"""
14
Register a machine learning model.
15
16
Parameters:
17
- body (dict): Model registration configuration
18
19
Body format:
20
{
21
"name": "my-model",
22
"version": "1.0.0",
23
"model_format": "TORCH_SCRIPT",
24
"model_config": {
25
"model_type": "bert",
26
"embedding_dimension": 768,
27
"framework_type": "sentence_transformers"
28
}
29
}
30
31
Returns:
32
dict: Model registration response with task_id
33
"""
34
35
def get_model(self, model_id, **kwargs):
36
"""Get model information and status."""
37
38
def delete_model(self, model_id, **kwargs):
39
"""Delete a registered model."""
40
41
def deploy_model(self, model_id, **kwargs):
42
"""Deploy a model to make it available for inference."""
43
44
def undeploy_model(self, model_id, **kwargs):
45
"""Undeploy a model to stop inference."""
46
47
def predict(self, model_id, body, **kwargs):
48
"""Run inference using a deployed model."""
49
50
def get_task(self, task_id, **kwargs):
51
"""Get ML task status and results."""
52
53
def search_models(self, body=None, **kwargs):
54
"""Search for registered models."""
55
```
56
57
### Neural Search Plugin
58
59
Neural search statistics and information for neural network-based search capabilities.
60
61
```python { .api }
62
class NeuralClient:
63
def stats(self, node_id=None, stat=None, **kwargs):
64
"""
65
Provide information about the current status of the neural-search plugin.
66
67
Parameters:
68
- node_id (str, optional): Comma-separated list of node IDs or names
69
- stat (str, optional): Comma-separated list of stats to retrieve
70
- flat_stat_paths (bool): Return stats in flat form for readability
71
- include_all_nodes (bool): Include aggregated statistics across all nodes
72
- include_individual_nodes (bool): Include statistics for individual nodes
73
- include_info (bool): Include cluster-wide information
74
- include_metadata (bool): Return stat metadata instead of raw values
75
76
Returns:
77
dict: Neural search plugin statistics and status information
78
"""
79
```
80
81
### K-NN Search Plugin
82
83
K-nearest neighbor search for vector similarity operations.
84
85
```python { .api }
86
class KnnClient:
87
def search(self, body, index=None, **kwargs):
88
"""
89
Perform k-NN vector search.
90
91
Parameters:
92
- body (dict): k-NN search query body
93
- index (str/list, optional): Index name(s)
94
95
Body format:
96
{
97
"query": {
98
"knn": {
99
"vector_field": {
100
"vector": [0.1, 0.2, 0.3, ...],
101
"k": 10
102
}
103
}
104
}
105
}
106
107
Returns:
108
dict: k-NN search results with similarity scores
109
"""
110
111
def train_model(self, model_id, body, **kwargs):
112
"""Train a k-NN model for vector search optimization."""
113
114
def get_model(self, model_id, **kwargs):
115
"""Get k-NN model information."""
116
117
def delete_model(self, model_id, **kwargs):
118
"""Delete a k-NN model."""
119
120
def warmup(self, index, **kwargs):
121
"""Warmup k-NN indices for improved performance."""
122
123
def stats(self, node_id=None, stat_name=None, **kwargs):
124
"""Get k-NN plugin statistics."""
125
```
126
127
### Security Analytics Plugin
128
129
Security threat detection and analytics capabilities.
130
131
```python { .api }
132
class SecurityAnalyticsClient:
133
def create_detector(self, body, **kwargs):
134
"""
135
Create a security analytics detector.
136
137
Parameters:
138
- body (dict): Detector configuration
139
140
Body format:
141
{
142
"name": "network-detector",
143
"detector_type": "network",
144
"enabled": true,
145
"schedule": {
146
"period": {
147
"interval": 5,
148
"unit": "MINUTES"
149
}
150
},
151
"inputs": [
152
{
153
"detector_input": {
154
"description": "Network logs",
155
"indices": ["network-logs*"],
156
"rules": []
157
}
158
}
159
]
160
}
161
162
Returns:
163
dict: Detector creation response
164
"""
165
166
def get_detector(self, detector_id, **kwargs):
167
"""Get detector configuration and status."""
168
169
def update_detector(self, detector_id, body, **kwargs):
170
"""Update detector configuration."""
171
172
def delete_detector(self, detector_id, **kwargs):
173
"""Delete a detector."""
174
175
def search_detectors(self, body=None, **kwargs):
176
"""Search for detectors."""
177
178
def get_findings(self, **kwargs):
179
"""Get security findings from detectors."""
180
181
def acknowledge_alerts(self, body, **kwargs):
182
"""Acknowledge security alerts."""
183
```
184
185
### Alerting Plugin
186
187
Comprehensive alerting system for monitoring and notifications.
188
189
```python { .api }
190
class AlertingClient:
191
def create_monitor(self, body, **kwargs):
192
"""
193
Create an alerting monitor.
194
195
Parameters:
196
- body (dict): Monitor configuration
197
198
Body format:
199
{
200
"name": "high-error-rate-monitor",
201
"type": "monitor",
202
"monitor_type": "query_level_monitor",
203
"enabled": true,
204
"schedule": {
205
"period": {
206
"interval": 1,
207
"unit": "MINUTES"
208
}
209
},
210
"inputs": [
211
{
212
"search": {
213
"indices": ["application-logs*"],
214
"query": {
215
"query": {
216
"bool": {
217
"filter": [
218
{
219
"range": {
220
"@timestamp": {
221
"gte": "now-5m"
222
}
223
}
224
},
225
{
226
"term": {
227
"level": "ERROR"
228
}
229
}
230
]
231
}
232
},
233
"aggs": {
234
"error_count": {
235
"value_count": {
236
"field": "level"
237
}
238
}
239
}
240
}
241
}
242
}
243
],
244
"triggers": [
245
{
246
"name": "high-error-trigger",
247
"severity": "2",
248
"condition": {
249
"script": {
250
"source": "ctx.results[0].aggregations.error_count.value > 10"
251
}
252
},
253
"actions": [
254
{
255
"name": "send-email",
256
"destination_id": "email-destination",
257
"message_template": {
258
"source": "High error rate detected: {{ctx.results.0.aggregations.error_count.value}} errors in the last 5 minutes"
259
}
260
}
261
]
262
}
263
]
264
}
265
266
Returns:
267
dict: Monitor creation response
268
"""
269
270
def get_monitor(self, monitor_id, **kwargs):
271
"""Get monitor configuration and status."""
272
273
def update_monitor(self, monitor_id, body, **kwargs):
274
"""Update monitor configuration."""
275
276
def delete_monitor(self, monitor_id, **kwargs):
277
"""Delete a monitor."""
278
279
def search_monitors(self, body=None, **kwargs):
280
"""Search for monitors."""
281
282
def run_monitor(self, monitor_id, **kwargs):
283
"""Manually run a monitor."""
284
285
def acknowledge_alert(self, monitor_id, body=None, **kwargs):
286
"""Acknowledge alerts from a monitor."""
287
288
def create_destination(self, body, **kwargs):
289
"""Create notification destination."""
290
291
def get_destination(self, destination_id, **kwargs):
292
"""Get notification destination."""
293
294
def update_destination(self, destination_id, body, **kwargs):
295
"""Update notification destination."""
296
297
def delete_destination(self, destination_id, **kwargs):
298
"""Delete notification destination."""
299
```
300
301
### SQL Plugin
302
303
SQL query interface for OpenSearch data.
304
305
```python { .api }
306
class SqlClient:
307
def query(self, body, **kwargs):
308
"""
309
Execute SQL query against OpenSearch indices.
310
311
Parameters:
312
- body (dict): SQL query request
313
314
Body format:
315
{
316
"query": "SELECT customer_name, order_total FROM orders WHERE order_date >= '2024-01-01' ORDER BY order_total DESC LIMIT 10",
317
"fetch_size": 1000,
318
"format": "json"
319
}
320
321
Returns:
322
dict: Query results in specified format
323
"""
324
325
def explain(self, body, **kwargs):
326
"""Explain SQL query execution plan."""
327
328
def close(self, cursor, **kwargs):
329
"""Close SQL cursor for pagination."""
330
331
def get_stats(self, **kwargs):
332
"""Get SQL plugin statistics."""
333
334
def post_stats(self, body, **kwargs):
335
"""Update SQL plugin statistics."""
336
```
337
338
### PPL (Piped Processing Language) Plugin
339
340
Piped processing language for data analysis and transformation.
341
342
```python { .api }
343
class PplClient:
344
def query(self, body, **kwargs):
345
"""
346
Execute PPL query for data processing.
347
348
Parameters:
349
- body (dict): PPL query request
350
351
Body format:
352
{
353
"query": "source=logs | where level='ERROR' | stats count() by service | sort count desc",
354
"format": "json"
355
}
356
357
Returns:
358
dict: PPL query results
359
"""
360
361
def explain(self, body, **kwargs):
362
"""Explain PPL query execution plan."""
363
```
364
365
### Additional Plugin APIs
366
367
Extended plugin capabilities for specialized use cases.
368
369
```python { .api }
370
class AsynchronousSearchClient:
371
def submit(self, body, index=None, **kwargs):
372
"""Submit asynchronous search request."""
373
374
def get(self, id, **kwargs):
375
"""Get asynchronous search results."""
376
377
def delete(self, id, **kwargs):
378
"""Delete asynchronous search."""
379
380
def stats(self, **kwargs):
381
"""Get asynchronous search statistics."""
382
383
class FlowFrameworkClient:
384
def create(self, body, **kwargs):
385
"""Create a workflow template."""
386
387
def get_template(self, workflow_id, **kwargs):
388
"""Get workflow template."""
389
390
def provision(self, workflow_id, **kwargs):
391
"""Provision resources for workflow."""
392
393
def deprovision(self, workflow_id, **kwargs):
394
"""Deprovision workflow resources."""
395
396
class IndexManagementClient:
397
def create_policy(self, policy_id, body, **kwargs):
398
"""Create index management policy."""
399
400
def get_policy(self, policy_id, **kwargs):
401
"""Get index management policy."""
402
403
def update_policy(self, policy_id, body, **kwargs):
404
"""Update index management policy."""
405
406
def delete_policy(self, policy_id, **kwargs):
407
"""Delete index management policy."""
408
409
def add_policy(self, index, body, **kwargs):
410
"""Add policy to index."""
411
412
def remove_policy(self, index, **kwargs):
413
"""Remove policy from index."""
414
415
def explain_index(self, index, **kwargs):
416
"""Explain index management status."""
417
418
class NotificationsClient:
419
def create_config(self, body, **kwargs):
420
"""Create notification configuration."""
421
422
def get_configs(self, **kwargs):
423
"""Get notification configurations."""
424
425
def update_config(self, config_id, body, **kwargs):
426
"""Update notification configuration."""
427
428
def delete_config(self, config_id, **kwargs):
429
"""Delete notification configuration."""
430
431
def send_test(self, config_id, **kwargs):
432
"""Send test notification."""
433
434
class RollupsClient:
435
def put(self, rollup_id, body, **kwargs):
436
"""Create rollup job."""
437
438
def get(self, rollup_id=None, **kwargs):
439
"""Get rollup job."""
440
441
def delete(self, rollup_id, **kwargs):
442
"""Delete rollup job."""
443
444
def start(self, rollup_id, **kwargs):
445
"""Start rollup job."""
446
447
def stop(self, rollup_id, **kwargs):
448
"""Stop rollup job."""
449
450
def explain(self, rollup_id, **kwargs):
451
"""Explain rollup job status."""
452
453
class GeospatialClient:
454
def put_geojson(self, index, body, **kwargs):
455
"""Index GeoJSON data for geospatial operations."""
456
457
def search_geospatial(self, body, index=None, **kwargs):
458
"""Search using geospatial queries and filters."""
459
460
def get_stats(self, **kwargs):
461
"""Get geospatial plugin statistics."""
462
463
class ObservabilityClient:
464
def create_object(self, body, **kwargs):
465
"""Create observability object (dashboard, visualization, etc.)."""
466
467
def get_object(self, object_id, **kwargs):
468
"""Get observability object configuration."""
469
470
def list_objects(self, **kwargs):
471
"""List observability objects."""
472
473
def delete_object(self, object_id, **kwargs):
474
"""Delete observability object."""
475
476
class ReplicationClient:
477
def start_replication(self, leader_index, follower_index, body, **kwargs):
478
"""Start cross-cluster replication."""
479
480
def stop_replication(self, follower_index, **kwargs):
481
"""Stop replication on follower index."""
482
483
def pause_replication(self, follower_index, **kwargs):
484
"""Pause replication."""
485
486
def resume_replication(self, follower_index, **kwargs):
487
"""Resume paused replication."""
488
489
def get_replication_status(self, index=None, **kwargs):
490
"""Get replication status."""
491
492
class SearchRelevanceClient:
493
def search_relevance(self, body, **kwargs):
494
"""Execute search relevance evaluation."""
495
496
def compare_search_results(self, body, **kwargs):
497
"""Compare search results between different configurations."""
498
499
class TransformsClient:
500
def put(self, transform_id, body, **kwargs):
501
"""Create transform job."""
502
503
def get(self, transform_id=None, **kwargs):
504
"""Get transform job."""
505
506
def delete(self, transform_id, **kwargs):
507
"""Delete transform job."""
508
509
def start(self, transform_id, **kwargs):
510
"""Start transform job."""
511
512
def stop(self, transform_id, **kwargs):
513
"""Stop transform job."""
514
515
def preview(self, body, **kwargs):
516
"""Preview transform results."""
517
```
518
519
## Usage Examples
520
521
### Machine Learning Operations
522
523
```python
524
from opensearchpy import OpenSearch
525
526
client = OpenSearch([{'host': 'localhost', 'port': 9200}])
527
528
# Register a pre-trained model
529
model_config = {
530
"name": "sentence-transformer-model",
531
"version": "1.0.0",
532
"model_format": "TORCH_SCRIPT",
533
"model_config": {
534
"model_type": "bert",
535
"embedding_dimension": 768,
536
"framework_type": "sentence_transformers"
537
},
538
"url": "https://example.com/model.zip"
539
}
540
541
# Register the model
542
response = client.ml.register_model(body=model_config)
543
task_id = response['task_id']
544
545
# Check registration status
546
task_status = client.ml.get_task(task_id=task_id)
547
print(f"Registration status: {task_status['state']}")
548
549
# Deploy the model once registration is complete
550
if task_status['state'] == 'COMPLETED':
551
model_id = task_status['model_id']
552
client.ml.deploy_model(model_id=model_id)
553
554
# Use model for inference
555
inference_body = {
556
"text_docs": ["What is machine learning?", "How does AI work?"]
557
}
558
559
results = client.ml.predict(model_id=model_id, body=inference_body)
560
print(f"Embeddings: {results['inference_results']}")
561
```
562
563
### K-NN Vector Search
564
565
```python
566
# Perform k-NN search with vector embeddings
567
knn_query = {
568
"size": 10,
569
"query": {
570
"knn": {
571
"document_embedding": {
572
"vector": [0.1, 0.2, 0.3, 0.4, 0.5], # 5-dimensional vector
573
"k": 10,
574
"filter": {
575
"term": {
576
"category": "technology"
577
}
578
}
579
}
580
}
581
}
582
}
583
584
results = client.knn.search(index='documents', body=knn_query)
585
for hit in results['hits']['hits']:
586
print(f"Score: {hit['_score']}, Document: {hit['_source']['title']}")
587
```
588
589
### Alerting Configuration
590
591
```python
592
# Create email notification destination
593
email_destination = {
594
"name": "operations-email",
595
"type": "email",
596
"email": {
597
"email_account_id": "default-email-account",
598
"recipients": ["ops-team@company.com"],
599
"subject": "OpenSearch Alert: {{ctx.monitor.name}}"
600
}
601
}
602
603
destination_response = client.alerting.create_destination(body=email_destination)
604
destination_id = destination_response['_id']
605
606
# Create error rate monitor
607
monitor_config = {
608
"name": "application-error-monitor",
609
"type": "monitor",
610
"monitor_type": "query_level_monitor",
611
"enabled": True,
612
"schedule": {
613
"period": {
614
"interval": 5,
615
"unit": "MINUTES"
616
}
617
},
618
"inputs": [
619
{
620
"search": {
621
"indices": ["application-logs*"],
622
"query": {
623
"query": {
624
"bool": {
625
"filter": [
626
{
627
"range": {
628
"@timestamp": {
629
"gte": "now-5m"
630
}
631
}
632
},
633
{
634
"term": {
635
"level": "ERROR"
636
}
637
}
638
]
639
}
640
},
641
"aggs": {
642
"error_count": {
643
"value_count": {
644
"field": "level"
645
}
646
}
647
}
648
}
649
}
650
}
651
],
652
"triggers": [
653
{
654
"name": "high-error-rate",
655
"severity": "2",
656
"condition": {
657
"script": {
658
"source": "ctx.results[0].aggregations.error_count.value > 50"
659
}
660
},
661
"actions": [
662
{
663
"name": "notify-ops-team",
664
"destination_id": destination_id,
665
"message_template": {
666
"source": "Alert: {{ctx.results.0.aggregations.error_count.value}} errors detected in the last 5 minutes"
667
}
668
}
669
]
670
}
671
]
672
}
673
674
monitor_response = client.alerting.create_monitor(body=monitor_config)
675
print(f"Monitor created: {monitor_response['_id']}")
676
```
677
678
### SQL Queries
679
680
```python
681
# Execute SQL query
682
sql_query = {
683
"query": """
684
SELECT
685
customer_name,
686
COUNT(*) as order_count,
687
SUM(order_total) as total_spent
688
FROM orders
689
WHERE order_date >= '2024-01-01'
690
GROUP BY customer_name
691
ORDER BY total_spent DESC
692
LIMIT 10
693
""",
694
"format": "json",
695
"fetch_size": 1000
696
}
697
698
results = client.sql.query(body=sql_query)
699
700
# Process results
701
for row in results['datarows']:
702
customer, count, total = row
703
print(f"Customer: {customer}, Orders: {count}, Total: ${total}")
704
705
# Get query execution plan
706
explain_query = {
707
"query": "SELECT * FROM orders WHERE customer_id = 123",
708
"format": "json"
709
}
710
711
plan = client.sql.explain(body=explain_query)
712
print(f"Execution plan: {plan}")
713
```
714
715
### Index Management Policies
716
717
```python
718
# Create index lifecycle policy
719
policy_body = {
720
"policy": {
721
"description": "Log retention policy",
722
"default_state": "hot",
723
"states": [
724
{
725
"name": "hot",
726
"actions": [
727
{
728
"rollover": {
729
"min_size": "50gb",
730
"min_doc_count": 1000000,
731
"min_index_age": "7d"
732
}
733
}
734
],
735
"transitions": [
736
{
737
"state_name": "warm",
738
"conditions": {
739
"min_index_age": "7d"
740
}
741
}
742
]
743
},
744
{
745
"name": "warm",
746
"actions": [
747
{
748
"replica_count": {
749
"number_of_replicas": 0
750
}
751
}
752
],
753
"transitions": [
754
{
755
"state_name": "delete",
756
"conditions": {
757
"min_index_age": "30d"
758
}
759
}
760
]
761
},
762
{
763
"name": "delete",
764
"actions": [
765
{
766
"delete": {}
767
}
768
]
769
}
770
]
771
}
772
}
773
774
# Create the policy
775
policy_response = client.index_management.create_policy(
776
policy_id='log-retention-policy',
777
body=policy_body
778
)
779
780
# Apply policy to index pattern
781
apply_policy = {
782
"policy_id": "log-retention-policy"
783
}
784
785
client.index_management.add_policy(
786
index='logs-*',
787
body=apply_policy
788
)
789
790
# Check policy status
791
status = client.index_management.explain_index(index='logs-2024-01-01')
792
print(f"Policy status: {status}")
793
```
794
795
### Learning to Rank (LTR) Plugin
796
797
Machine learning-based relevance ranking with feature stores, feature sets, and ranking models for advanced search result optimization.
798
799
```python { .api }
800
class LtrClient:
801
def cache_stats(self, **kwargs):
802
"""Retrieve cache statistics for all feature stores."""
803
804
def clear_cache(self, store=None, **kwargs):
805
"""Clear the store caches."""
806
807
def create_default_store(self, **kwargs):
808
"""Create the default feature store."""
809
810
def create_store(self, store, body, **kwargs):
811
"""Create a new feature store."""
812
813
def delete_default_store(self, **kwargs):
814
"""Delete the default feature store."""
815
816
def delete_store(self, store, **kwargs):
817
"""Delete a feature store."""
818
819
def get_store(self, store, **kwargs):
820
"""Get information about a feature store."""
821
822
def list_stores(self, **kwargs):
823
"""List all feature stores."""
824
825
def stats(self, store=None, **kwargs):
826
"""Get feature store statistics."""
827
828
def add_features_to_set(self, store, featureset, body, **kwargs):
829
"""Add features to an existing feature set."""
830
831
def add_features_to_set_by_query(self, store, featureset, body, **kwargs):
832
"""Add features to a feature set based on a query."""
833
834
def create_feature(self, store, name, body, **kwargs):
835
"""Create a new feature in a feature store."""
836
837
def create_featureset(self, store, name, body, **kwargs):
838
"""Create a new feature set in a feature store."""
839
840
def create_model(self, store, name, body, **kwargs):
841
"""Create a new ranking model in a feature store."""
842
843
def create_model_from_set(self, store, featureset, name, body, **kwargs):
844
"""Create a ranking model from a feature set."""
845
846
def delete_feature(self, store, name, **kwargs):
847
"""Delete a feature from a feature store."""
848
849
def delete_featureset(self, store, name, **kwargs):
850
"""Delete a feature set from a feature store."""
851
852
def delete_model(self, store, name, **kwargs):
853
"""Delete a ranking model from a feature store."""
854
855
def get_feature(self, store, name, **kwargs):
856
"""Get information about a specific feature."""
857
858
def get_featureset(self, store, name, **kwargs):
859
"""Get information about a specific feature set."""
860
861
def get_model(self, store, name, **kwargs):
862
"""Get information about a specific ranking model."""
863
864
def search_features(self, store, body=None, **kwargs):
865
"""Search for features in a feature store."""
866
867
def search_featuresets(self, store, body=None, **kwargs):
868
"""Search for feature sets in a feature store."""
869
870
def search_models(self, store, body=None, **kwargs):
871
"""Search for ranking models in a feature store."""
872
873
def update_feature(self, store, name, body, **kwargs):
874
"""Update an existing feature in a feature store."""
875
876
def update_featureset(self, store, name, body, **kwargs):
877
"""Update an existing feature set in a feature store."""
878
```
879
880
### Query Data Sources Plugin
881
882
Query data source management for connecting external data sources to OpenSearch.
883
884
```python { .api }
885
class QueryClient:
886
def create_data_source(self, **kwargs):
887
"""Create a new data source connection."""
888
889
def update_data_source(self, datasource_name, **kwargs):
890
"""Update an existing data source connection."""
891
892
def get_data_source(self, datasource_name, **kwargs):
893
"""Get information about a data source connection."""
894
895
def get_data_sources(self, **kwargs):
896
"""List all data source connections."""
897
898
def delete_data_source(self, datasource_name, **kwargs):
899
"""Delete a data source connection."""
900
```