0
# Helper Functions
1
2
High-level utility functions for common operations like bulk indexing, scanning large result sets, and data reindexing with built-in error handling and performance optimizations. These functions provide simplified interfaces for complex operations.
3
4
## Capabilities
5
6
### Bulk Operations
7
8
Efficient bulk processing for high-throughput document operations.
9
10
```python { .api }
11
def bulk(client, actions, index=None, doc_type=None, **kwargs):
12
"""
13
Perform bulk indexing, updating, and deleting operations.
14
15
Parameters:
16
- client: OpenSearch client instance
17
- actions: Iterable of action dictionaries or generator
18
- index (str, optional): Default index for actions without _index
19
- doc_type (str, optional): Default document type (deprecated)
20
- chunk_size (int, optional): Number of docs per chunk (default: 500)
21
- max_chunk_bytes (int, optional): Maximum size per chunk in bytes
22
- thread_count (int, optional): Number of parallel threads (default: 4)
23
- queue_size (int, optional): Size of the task queue (default: 4)
24
- refresh (str/bool, optional): Refresh policy for operations
25
- timeout (str, optional): Request timeout
26
- max_retries (int, optional): Maximum number of retries (default: 0)
27
- initial_backoff (int, optional): Initial backoff time in seconds (default: 2)
28
- max_backoff (int, optional): Maximum backoff time in seconds (default: 600)
29
- yield_ok (bool, optional): Yield successful operations (default: True)
30
31
Action format:
32
{
33
'_op_type': 'index', # 'index', 'create', 'update', 'delete'
34
'_index': 'my-index',
35
'_id': 'doc-id',
36
'_source': {'field': 'value'} # For index/create/update
37
}
38
39
Returns:
40
Iterator yielding tuples of (success_count, failed_actions)
41
42
Raises:
43
BulkIndexError: If there are failed operations and errors are not ignored
44
"""
45
46
def async_bulk(client, actions, **kwargs):
47
"""
48
Asynchronous version of bulk operations.
49
50
Parameters: Same as bulk() function
51
52
Returns:
53
Async iterator yielding operation results
54
"""
55
56
def streaming_bulk(client, actions, **kwargs):
57
"""
58
Streaming bulk operations that yield results as they complete.
59
60
Parameters: Same as bulk() function
61
62
Yields:
63
Tuples of (success, action_result) for each operation
64
"""
65
66
def async_streaming_bulk(client, actions, **kwargs):
67
"""
68
Asynchronous streaming bulk operations.
69
70
Parameters: Same as bulk() function
71
72
Async yields:
73
Tuples of (success, action_result) for each operation
74
"""
75
76
def parallel_bulk(client, actions, thread_count=4, **kwargs):
77
"""
78
Parallel bulk operations using threading for improved performance.
79
80
Parameters:
81
- client: OpenSearch client instance
82
- actions: Iterable of action dictionaries
83
- thread_count (int): Number of parallel threads (default: 4)
84
- Other parameters same as bulk() function
85
86
Yields:
87
Tuples of (success, action_result) for each operation
88
"""
89
```
90
91
### Scanning Operations
92
93
Efficient scanning for processing large result sets.
94
95
```python { .api }
96
def scan(client, query=None, scroll='5m', **kwargs):
97
"""
98
Scan search results for large datasets using scroll API.
99
100
Parameters:
101
- client: OpenSearch client instance
102
- query (dict, optional): Search query body
103
- scroll (str, optional): Scroll timeout (default: '5m')
104
- index (str/list, optional): Index name(s)
105
- doc_type (str/list, optional): Document type(s) (deprecated)
106
- size (int, optional): Number of results per shard (default: 1000)
107
- request_timeout (float, optional): Request timeout in seconds
108
- clear_scroll (bool, optional): Clear scroll context on completion (default: True)
109
- scroll_kwargs (dict, optional): Additional scroll parameters
110
- preserve_order (bool, optional): Preserve result order (default: False)
111
112
Query format:
113
{
114
'query': {
115
'match_all': {}
116
},
117
'sort': ['_doc'] # Recommended for performance
118
}
119
120
Yields:
121
Individual document hits from search results
122
123
Raises:
124
ScanError: If scan operation fails
125
"""
126
127
def async_scan(client, query=None, scroll='5m', **kwargs):
128
"""
129
Asynchronous version of scan operations.
130
131
Parameters: Same as scan() function
132
133
Async yields:
134
Individual document hits from search results
135
"""
136
```
137
138
### Reindexing Operations
139
140
Copy documents between indices with optional transformations.
141
142
```python { .api }
143
def reindex(client, source_index, target_index, query=None, **kwargs):
144
"""
145
Reindex documents from source to target index.
146
147
Parameters:
148
- client: OpenSearch client instance
149
- source_index (str): Source index name
150
- target_index (str): Target index name
151
- query (dict, optional): Query to filter source documents
152
- chunk_size (int, optional): Bulk operation chunk size (default: 500)
153
- scroll (str, optional): Scroll timeout (default: '5m')
154
- op_type (str, optional): Operation type ('index' or 'create', default: 'index')
155
- transform (callable, optional): Function to transform documents
156
- target_client: Different client for target index
157
158
Transform function signature:
159
def transform_doc(doc):
160
# Modify doc['_source'], doc['_id'], etc.
161
return doc
162
163
Returns:
164
Tuple of (success_count, failed_operations)
165
166
Raises:
167
ReindexError: If reindexing fails
168
"""
169
170
def async_reindex(client, source_index, target_index, **kwargs):
171
"""
172
Asynchronous version of reindex operations.
173
174
Parameters: Same as reindex() function
175
176
Returns:
177
Tuple of (success_count, failed_operations)
178
"""
179
```
180
181
### Utility Functions
182
183
Additional helper functions for common operations.
184
185
```python { .api }
186
def expand_action(data):
187
"""
188
Expand a single document into a bulk action format.
189
190
Parameters:
191
- data: Document data or action dictionary
192
193
Returns:
194
Properly formatted bulk action
195
"""
196
197
def _chunk_actions(actions, chunk_size, max_chunk_bytes):
198
"""
199
Internal function to chunk actions for bulk operations.
200
201
Parameters:
202
- actions: Iterable of actions
203
- chunk_size: Maximum actions per chunk
204
- max_chunk_bytes: Maximum bytes per chunk
205
206
Yields:
207
Chunks of actions
208
"""
209
210
def _process_bulk_chunk(client, chunk, **kwargs):
211
"""
212
Internal function to process a single bulk chunk.
213
214
Parameters:
215
- client: OpenSearch client instance
216
- chunk: List of actions to process
217
- kwargs: Additional bulk parameters
218
219
Returns:
220
Processed results
221
"""
222
```
223
224
## Usage Examples
225
226
### Basic Bulk Operations
227
228
```python
229
from opensearchpy import OpenSearch
230
from opensearchpy.helpers import bulk
231
232
client = OpenSearch([{'host': 'localhost', 'port': 9200}])
233
234
# Prepare bulk actions
235
actions = [
236
{
237
'_op_type': 'index',
238
'_index': 'products',
239
'_id': '1',
240
'_source': {
241
'title': 'Laptop Computer',
242
'price': 999.99,
243
'category': 'Electronics'
244
}
245
},
246
{
247
'_op_type': 'index',
248
'_index': 'products',
249
'_id': '2',
250
'_source': {
251
'title': 'Wireless Mouse',
252
'price': 29.99,
253
'category': 'Electronics'
254
}
255
},
256
{
257
'_op_type': 'update',
258
'_index': 'products',
259
'_id': '1',
260
'_source': {
261
'doc': {
262
'in_stock': True
263
}
264
}
265
},
266
{
267
'_op_type': 'delete',
268
'_index': 'products',
269
'_id': '3'
270
}
271
]
272
273
# Execute bulk operations
274
successes, failures = bulk(
275
client,
276
actions,
277
chunk_size=100,
278
thread_count=4,
279
timeout='60s'
280
)
281
282
print(f"Successful operations: {successes}")
283
if failures:
284
print(f"Failed operations: {len(failures)}")
285
for failure in failures:
286
print(f" Error: {failure}")
287
```
288
289
### Streaming Bulk with Generator
290
291
```python
292
from opensearchpy.helpers import streaming_bulk
293
294
def generate_docs():
295
"""Generate documents from data source."""
296
for i in range(10000):
297
yield {
298
'_op_type': 'index',
299
'_index': 'large-dataset',
300
'_id': str(i),
301
'_source': {
302
'id': i,
303
'value': f'Document {i}',
304
'timestamp': '2024-01-01T00:00:00Z'
305
}
306
}
307
308
# Stream bulk operations
309
for success, info in streaming_bulk(
310
client,
311
generate_docs(),
312
chunk_size=500,
313
max_retries=3
314
):
315
if not success:
316
print(f"Failed to index: {info}")
317
else:
318
print(f"Indexed document: {info['index']['_id']}")
319
```
320
321
### Large Dataset Scanning
322
323
```python
324
from opensearchpy.helpers import scan
325
326
# Scan all documents in an index
327
query = {
328
'query': {
329
'range': {
330
'timestamp': {
331
'gte': '2024-01-01',
332
'lte': '2024-12-31'
333
}
334
}
335
},
336
'sort': ['_doc'] # More efficient than default scoring
337
}
338
339
total_processed = 0
340
for doc in scan(
341
client,
342
query=query,
343
index='large-index',
344
size=1000, # Documents per shard per request
345
scroll='10m'
346
):
347
# Process each document
348
process_document(doc['_source'])
349
total_processed += 1
350
351
if total_processed % 10000 == 0:
352
print(f"Processed {total_processed} documents")
353
354
print(f"Total processed: {total_processed} documents")
355
```
356
357
### Reindexing with Transformation
358
359
```python
360
from opensearchpy.helpers import reindex
361
362
def transform_document(doc):
363
"""Transform document during reindexing."""
364
# Add new fields
365
doc['_source']['processed_at'] = '2024-01-01T00:00:00Z'
366
367
# Rename fields
368
if 'old_field' in doc['_source']:
369
doc['_source']['new_field'] = doc['_source'].pop('old_field')
370
371
# Filter out unwanted fields
372
doc['_source'].pop('temp_field', None)
373
374
# Change document ID format
375
doc['_id'] = f"new_{doc['_id']}"
376
377
return doc
378
379
# Reindex with transformation
380
query = {
381
'query': {
382
'bool': {
383
'must': [
384
{'term': {'status': 'active'}}
385
]
386
}
387
}
388
}
389
390
success_count, failed_ops = reindex(
391
client,
392
source_index='old-index',
393
target_index='new-index',
394
query=query,
395
transform=transform_document,
396
chunk_size=200
397
)
398
399
print(f"Successfully reindexed: {success_count} documents")
400
if failed_ops:
401
print(f"Failed operations: {len(failed_ops)}")
402
```
403
404
### Parallel Bulk Processing
405
406
```python
407
from opensearchpy.helpers import parallel_bulk
408
import json
409
410
def read_json_file(filename):
411
"""Read documents from JSON file."""
412
with open(filename, 'r') as f:
413
for line in f:
414
doc = json.loads(line)
415
yield {
416
'_op_type': 'index',
417
'_index': 'imported-data',
418
'_source': doc
419
}
420
421
# Process large file with parallel bulk
422
processed = 0
423
errors = []
424
425
for success, info in parallel_bulk(
426
client,
427
read_json_file('large_dataset.jsonl'),
428
thread_count=8,
429
chunk_size=1000,
430
max_retries=3,
431
initial_backoff=2,
432
max_backoff=600
433
):
434
if success:
435
processed += 1
436
else:
437
errors.append(info)
438
439
if processed % 10000 == 0:
440
print(f"Processed: {processed}, Errors: {len(errors)}")
441
442
print(f"Final: Processed {processed}, Errors: {len(errors)}")
443
```
444
445
### Async Bulk Operations
446
447
```python
448
import asyncio
449
from opensearchpy import AsyncOpenSearch
450
from opensearchpy.helpers import async_bulk
451
452
async def async_bulk_example():
453
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
454
455
actions = [
456
{
457
'_op_type': 'index',
458
'_index': 'async-index',
459
'_id': str(i),
460
'_source': {'value': i}
461
}
462
for i in range(1000)
463
]
464
465
# Async bulk operations
466
success_count, failed_ops = await async_bulk(
467
client,
468
actions,
469
chunk_size=100
470
)
471
472
print(f"Async bulk: {success_count} successful, {len(failed_ops)} failed")
473
474
await client.close()
475
476
# Run async example
477
asyncio.run(async_bulk_example())
478
```
479
480
### Error Handling and Retry Logic
481
482
```python
483
from opensearchpy.helpers import bulk
484
from opensearchpy.exceptions import BulkIndexError, ConnectionError
485
486
def robust_bulk_index(client, documents, max_attempts=3):
487
"""Robust bulk indexing with retry logic."""
488
actions = [
489
{
490
'_op_type': 'index',
491
'_index': 'robust-index',
492
'_source': doc
493
}
494
for doc in documents
495
]
496
497
for attempt in range(max_attempts):
498
try:
499
success_count, failed_ops = bulk(
500
client,
501
actions,
502
max_retries=2,
503
initial_backoff=2,
504
max_backoff=60
505
)
506
507
if not failed_ops:
508
print(f"All {success_count} documents indexed successfully")
509
return success_count, []
510
511
# Retry only failed operations
512
actions = failed_ops
513
print(f"Attempt {attempt + 1}: {len(failed_ops)} operations failed, retrying...")
514
515
except (BulkIndexError, ConnectionError) as e:
516
print(f"Attempt {attempt + 1} failed: {e}")
517
if attempt == max_attempts - 1:
518
raise
519
520
# Wait before retry
521
import time
522
time.sleep(2 ** attempt)
523
524
return success_count, failed_ops
525
526
# Use robust bulk indexing
527
documents = [{'id': i, 'data': f'value_{i}'} for i in range(1000)]
528
success, failures = robust_bulk_index(client, documents)
529
```