0
# Helper Functions
1
2
Utility functions for common Elasticsearch operations including bulk indexing, scanning large result sets, and reindexing data. These helpers simplify complex operations and provide optimized implementations for common use cases.
3
4
## Capabilities
5
6
### Bulk Operations
7
8
Efficient bulk indexing with automatic batching, error handling, and progress tracking.
9
10
```python { .api }
11
def bulk(
12
client,
13
actions,
14
index: Optional[str] = None,
15
doc_type: Optional[str] = None,
16
routing: Optional[str] = None,
17
pipeline: Optional[str] = None,
18
refresh: Optional[str] = None,
19
timeout: Optional[str] = None,
20
chunk_size: int = 500,
21
max_chunk_bytes: int = 104857600,
22
thread_count: int = 4,
23
queue_size: int = 4,
24
expand_action_callback=None,
25
raise_on_exception: bool = True,
26
raise_on_error: bool = True,
27
ignore_status=(),
28
**kwargs
29
) -> Tuple[int, List[Dict]]:
30
"""
31
Perform bulk indexing operations.
32
33
Parameters:
34
- client: Elasticsearch client instance
35
- actions: Iterable of action dictionaries or documents
36
- index: Default index name for actions
37
- doc_type: Default document type (deprecated)
38
- routing: Default routing value
39
- pipeline: Default ingest pipeline
40
- refresh: Refresh policy for operations
41
- timeout: Request timeout
42
- chunk_size: Number of documents per chunk
43
- max_chunk_bytes: Maximum chunk size in bytes
44
- thread_count: Number of parallel threads
45
- queue_size: Thread pool queue size
46
- expand_action_callback: Callback to expand actions
47
- raise_on_exception: Whether to raise on exceptions
48
- raise_on_error: Whether to raise on API errors
49
- ignore_status: HTTP status codes to ignore
50
51
Returns:
52
Tuple of (success_count, failed_operations)
53
"""
54
55
def streaming_bulk(
56
client,
57
actions,
58
index: Optional[str] = None,
59
doc_type: Optional[str] = None,
60
routing: Optional[str] = None,
61
pipeline: Optional[str] = None,
62
refresh: Optional[str] = None,
63
timeout: Optional[str] = None,
64
chunk_size: int = 500,
65
max_chunk_bytes: int = 104857600,
66
expand_action_callback=None,
67
raise_on_exception: bool = True,
68
raise_on_error: bool = True,
69
ignore_status=(),
70
**kwargs
71
):
72
"""
73
Generator that yields bulk operation results as they complete.
74
75
Parameters: Same as bulk()
76
77
Yields:
78
Tuples of (success, info) for each chunk processed
79
"""
80
81
def parallel_bulk(
82
client,
83
actions,
84
index: Optional[str] = None,
85
doc_type: Optional[str] = None,
86
routing: Optional[str] = None,
87
pipeline: Optional[str] = None,
88
refresh: Optional[str] = None,
89
timeout: Optional[str] = None,
90
chunk_size: int = 500,
91
max_chunk_bytes: int = 104857600,
92
thread_count: int = 4,
93
queue_size: int = 4,
94
expand_action_callback=None,
95
ignore_status=(),
96
**kwargs
97
):
98
"""
99
Parallel bulk indexing using multiple threads.
100
101
Parameters: Same as bulk() with additional thread control
102
103
Yields:
104
Tuples of (success, info) for each chunk processed
105
"""
106
```
107
108
### Scanning Operations
109
110
Efficiently iterate through large result sets using scroll API.
111
112
```python { .api }
113
def scan(
114
client,
115
query: Optional[Dict[str, Any]] = None,
116
scroll: str = "5m",
117
raise_on_error: bool = True,
118
preserve_order: bool = False,
119
size: int = 1000,
120
request_timeout: Optional[float] = None,
121
clear_scroll: bool = True,
122
scroll_kwargs: Optional[Dict] = None,
123
**kwargs
124
):
125
"""
126
Scan and scroll through all documents matching a query.
127
128
Parameters:
129
- client: Elasticsearch client instance
130
- query: Query to execute (default: match_all)
131
- scroll: Scroll context timeout
132
- raise_on_error: Whether to raise on errors
133
- preserve_order: Whether to preserve result ordering
134
- size: Number of documents per shard per batch
135
- request_timeout: Request timeout
136
- clear_scroll: Whether to clear scroll context when done
137
- scroll_kwargs: Additional arguments for scroll requests
138
- **kwargs: Additional search parameters
139
140
Yields:
141
Individual document hits
142
"""
143
```
144
145
### Reindexing Operations
146
147
Copy documents between indices with optional transformation.
148
149
```python { .api }
150
def reindex(
151
client,
152
source_index: str,
153
target_index: str,
154
query: Optional[Dict[str, Any]] = None,
155
target_client: Optional[object] = None,
156
chunk_size: int = 500,
157
scroll: str = "5m",
158
scan_kwargs: Optional[Dict] = None,
159
bulk_kwargs: Optional[Dict] = None,
160
transform_doc_callback=None,
161
**kwargs
162
) -> Tuple[int, List[Dict]]:
163
"""
164
Reindex documents from source to target index.
165
166
Parameters:
167
- client: Source Elasticsearch client
168
- source_index: Source index name
169
- target_index: Target index name
170
- query: Query to filter source documents
171
- target_client: Target client (if different from source)
172
- chunk_size: Bulk operation chunk size
173
- scroll: Scroll timeout for scanning
174
- scan_kwargs: Additional scan arguments
175
- bulk_kwargs: Additional bulk arguments
176
- transform_doc_callback: Function to transform documents
177
178
Returns:
179
Tuple of (success_count, failed_operations)
180
"""
181
```
182
183
### Asynchronous Helper Functions
184
185
Async versions of helper functions for use with AsyncElasticsearch.
186
187
```python { .api }
188
async def async_bulk(
189
client,
190
actions,
191
index: Optional[str] = None,
192
doc_type: Optional[str] = None,
193
routing: Optional[str] = None,
194
pipeline: Optional[str] = None,
195
refresh: Optional[str] = None,
196
timeout: Optional[str] = None,
197
chunk_size: int = 500,
198
max_chunk_bytes: int = 104857600,
199
expand_action_callback=None,
200
raise_on_exception: bool = True,
201
raise_on_error: bool = True,
202
ignore_status=(),
203
**kwargs
204
) -> Tuple[int, List[Dict]]:
205
"""
206
Async version of bulk operation.
207
208
Parameters: Same as bulk()
209
210
Returns:
211
Tuple of (success_count, failed_operations)
212
"""
213
214
async def async_streaming_bulk(
215
client,
216
actions,
217
index: Optional[str] = None,
218
doc_type: Optional[str] = None,
219
routing: Optional[str] = None,
220
pipeline: Optional[str] = None,
221
refresh: Optional[str] = None,
222
timeout: Optional[str] = None,
223
chunk_size: int = 500,
224
max_chunk_bytes: int = 104857600,
225
expand_action_callback=None,
226
raise_on_exception: bool = True,
227
raise_on_error: bool = True,
228
ignore_status=(),
229
**kwargs
230
):
231
"""
232
Async generator for streaming bulk operations.
233
234
Parameters: Same as streaming_bulk()
235
236
Yields:
237
Tuples of (success, info) for each chunk processed
238
"""
239
240
async def async_scan(
241
client,
242
query: Optional[Dict[str, Any]] = None,
243
scroll: str = "5m",
244
raise_on_error: bool = True,
245
preserve_order: bool = False,
246
size: int = 1000,
247
request_timeout: Optional[float] = None,
248
clear_scroll: bool = True,
249
scroll_kwargs: Optional[Dict] = None,
250
**kwargs
251
):
252
"""
253
Async version of scan operation.
254
255
Parameters: Same as scan()
256
257
Yields:
258
Individual document hits
259
"""
260
261
async def async_reindex(
262
client,
263
source_index: str,
264
target_index: str,
265
query: Optional[Dict[str, Any]] = None,
266
target_client: Optional[object] = None,
267
chunk_size: int = 500,
268
scroll: str = "5m",
269
scan_kwargs: Optional[Dict] = None,
270
bulk_kwargs: Optional[Dict] = None,
271
transform_doc_callback=None,
272
**kwargs
273
) -> Tuple[int, List[Dict]]:
274
"""
275
Async version of reindex operation.
276
277
Parameters: Same as reindex()
278
279
Returns:
280
Tuple of (success_count, failed_operations)
281
"""
282
```
283
284
### Utility Functions
285
286
Additional utility functions for common operations.
287
288
```python { .api }
289
def expand_action(action: Dict[str, Any]) -> Dict[str, Any]:
290
"""
291
Expand a shorthand action dictionary to full format.
292
293
Parameters:
294
- action: Action dictionary to expand
295
296
Returns:
297
Expanded action dictionary
298
"""
299
```
300
301
### Helper Exceptions
302
303
Exception types specific to helper operations.
304
305
```python { .api }
306
class BulkIndexError(Exception):
307
"""
308
Exception raised when bulk operations encounter errors.
309
310
Attributes:
311
- errors: List of individual operation errors
312
"""
313
def __init__(self, message: str, errors: List[Dict]): ...
314
@property
315
def errors(self) -> List[Dict]: ...
316
317
class ScanError(Exception):
318
"""
319
Exception raised during scan operations.
320
321
Attributes:
322
- scroll_id: Scroll context ID if available
323
- partial_results: Results obtained before error
324
"""
325
def __init__(self, message: str, scroll_id: Optional[str] = None): ...
326
```
327
328
## Usage Examples
329
330
### Bulk Indexing
331
332
```python
333
from elasticsearch import Elasticsearch
334
from elasticsearch.helpers import bulk, BulkIndexError
335
336
client = Elasticsearch(hosts=['http://localhost:9200'])
337
338
# Prepare documents for bulk indexing
339
def generate_docs():
340
for i in range(10000):
341
yield {
342
"_index": "products",
343
"_id": i,
344
"_source": {
345
"name": f"Product {i}",
346
"price": i * 10.99,
347
"category": f"Category {i % 5}"
348
}
349
}
350
351
try:
352
# Bulk index documents
353
success_count, failed_ops = bulk(
354
client,
355
generate_docs(),
356
chunk_size=1000,
357
timeout='60s',
358
refresh='wait_for'
359
)
360
361
print(f"Successfully indexed: {success_count} documents")
362
if failed_ops:
363
print(f"Failed operations: {len(failed_ops)}")
364
365
except BulkIndexError as e:
366
print(f"Bulk indexing failed: {e}")
367
for error in e.errors:
368
print(f"Error: {error}")
369
```
370
371
### Streaming Bulk Operations
372
373
```python
374
from elasticsearch.helpers import streaming_bulk
375
376
def generate_large_dataset():
377
for i in range(100000):
378
yield {
379
"_index": "logs",
380
"_source": {
381
"timestamp": f"2024-01-01T{i:02d}:00:00Z",
382
"level": "INFO" if i % 2 == 0 else "WARN",
383
"message": f"Log message {i}",
384
"service": f"service-{i % 10}"
385
}
386
}
387
388
# Stream bulk operations with progress tracking
389
total_indexed = 0
390
for success, info in streaming_bulk(
391
client,
392
generate_large_dataset(),
393
chunk_size=5000,
394
max_chunk_bytes=10 * 1024 * 1024 # 10MB chunks
395
):
396
if success:
397
total_indexed += 1
398
if total_indexed % 10 == 0: # Progress every 10 chunks
399
print(f"Indexed {total_indexed * 5000} documents...")
400
else:
401
print(f"Failed chunk: {info}")
402
403
print(f"Total documents indexed: {total_indexed * 5000}")
404
```
405
406
### Scanning Large Result Sets
407
408
```python
409
from elasticsearch.helpers import scan
410
411
# Scan through all documents in an index
412
total_docs = 0
413
for doc in scan(
414
client,
415
index="products",
416
query={"match_all": {}},
417
scroll='2m',
418
size=1000
419
):
420
# Process each document
421
total_docs += 1
422
product_name = doc['_source']['name']
423
product_price = doc['_source']['price']
424
425
# Example: Update price with 10% discount
426
if product_price > 100:
427
client.update(
428
index="products",
429
id=doc['_id'],
430
document={"price": product_price * 0.9}
431
)
432
433
print(f"Processed {total_docs} documents")
434
```
435
436
### Advanced Scanning with Query
437
438
```python
439
# Scan with complex query and field filtering
440
query = {
441
"bool": {
442
"must": [
443
{"range": {"price": {"gte": 50, "lte": 1000}}},
444
{"term": {"status": "active"}}
445
],
446
"must_not": [
447
{"term": {"category.keyword": "discontinued"}}
448
]
449
}
450
}
451
452
for doc in scan(
453
client,
454
index="products",
455
query=query,
456
_source=["name", "price", "category"], # Only retrieve specific fields
457
scroll='5m',
458
size=2000,
459
preserve_order=True
460
):
461
print(f"Product: {doc['_source']['name']}, Price: {doc['_source']['price']}")
462
```
463
464
### Reindexing with Transformation
465
466
```python
467
from elasticsearch.helpers import reindex
468
469
def transform_document(doc):
470
"""Transform documents during reindex."""
471
source = doc['_source']
472
473
# Add new computed field
474
source['price_tier'] = 'high' if source['price'] > 1000 else 'low'
475
476
# Rename field
477
if 'desc' in source:
478
source['description'] = source.pop('desc')
479
480
# Convert price to integer cents
481
source['price_cents'] = int(source['price'] * 100)
482
483
return doc
484
485
# Reindex with transformation
486
success_count, failed_ops = reindex(
487
client,
488
source_index="products_v1",
489
target_index="products_v2",
490
query={"term": {"status": "active"}}, # Only reindex active products
491
transform_doc_callback=transform_document,
492
chunk_size=1000
493
)
494
495
print(f"Reindexed {success_count} documents")
496
if failed_ops:
497
print(f"Failed operations: {len(failed_ops)}")
498
```
499
500
### Parallel Bulk Processing
501
502
```python
503
from elasticsearch.helpers import parallel_bulk
504
import threading
505
506
def generate_docs_from_database():
507
"""Generate documents from database query."""
508
# Simulate database connection and query
509
for i in range(50000):
510
yield {
511
"_index": "analytics",
512
"_source": {
513
"user_id": i % 1000,
514
"event": f"action_{i % 10}",
515
"timestamp": "2024-01-01T00:00:00Z",
516
"value": i * 1.5
517
}
518
}
519
520
# Use parallel bulk for high-throughput indexing
521
results = []
522
for success, info in parallel_bulk(
523
client,
524
generate_docs_from_database(),
525
chunk_size=2000,
526
thread_count=8, # Use 8 parallel threads
527
queue_size=8,
528
timeout='120s'
529
):
530
results.append((success, info))
531
if len(results) % 100 == 0:
532
print(f"Processed {len(results)} chunks")
533
534
successful_chunks = sum(1 for success, _ in results if success)
535
print(f"Successfully processed {successful_chunks} chunks")
536
```
537
538
### Async Helper Usage
539
540
```python
541
import asyncio
542
from elasticsearch import AsyncElasticsearch
543
from elasticsearch.helpers import async_bulk, async_scan
544
545
async def async_bulk_example():
546
"""Example of async bulk operations."""
547
async_client = AsyncElasticsearch(hosts=['http://localhost:9200'])
548
549
# Prepare async documents
550
async def async_generate_docs():
551
for i in range(1000):
552
yield {
553
"_index": "async_test",
554
"_id": i,
555
"_source": {"value": i, "squared": i * i}
556
}
557
558
try:
559
# Async bulk indexing
560
success_count, failed_ops = await async_bulk(
561
async_client,
562
async_generate_docs(),
563
chunk_size=100
564
)
565
566
print(f"Async indexed: {success_count} documents")
567
568
# Async scanning
569
total_scanned = 0
570
async for doc in async_scan(
571
async_client,
572
index="async_test",
573
query={"match_all": {}}
574
):
575
total_scanned += 1
576
577
print(f"Async scanned: {total_scanned} documents")
578
579
finally:
580
await async_client.close()
581
582
# Run async example
583
asyncio.run(async_bulk_example())
584
```