0
# Bulk Operations
1
2
High-performance operations for processing multiple documents efficiently. Includes bulk indexing, updates, deletions, and specialized helper functions for streaming and parallel processing of large datasets.
3
4
## Capabilities
5
6
### Bulk Document Operations
7
8
Execute multiple document operations (index, create, update, delete) in a single request for improved performance.
9
10
```python { .api }
11
def bulk(body: list, index: str = None, doc_type: str = None, **params) -> dict:
12
"""
13
Execute multiple document operations in a single request.
14
15
Parameters:
16
- body: List of operations (action/metadata and optional source)
17
- index: Default index for operations without explicit index
18
- doc_type: Default document type
19
- pipeline: Default ingest pipeline
20
- refresh: Control when changes are visible ('true', 'false', 'wait_for')
21
- routing: Default routing value
22
- timeout: Request timeout
23
- wait_for_active_shards: Wait for N shards to be active
24
25
Body format (list of alternating action lines and document lines):
26
[
27
{"index": {"_index": "my_index", "_type": "_doc", "_id": "1"}},
28
{"title": "Document 1", "content": "Content here"},
29
{"create": {"_index": "my_index", "_type": "_doc", "_id": "2"}},
30
{"title": "Document 2", "content": "More content"},
31
{"update": {"_index": "my_index", "_type": "_doc", "_id": "3"}},
32
{"doc": {"title": "Updated Document 3"}},
33
{"delete": {"_index": "my_index", "_type": "_doc", "_id": "4"}}
34
]
35
36
Returns:
37
dict: Bulk response with 'items' array containing results for each operation
38
"""
39
```
40
41
### Update by Query
42
43
Update multiple documents matching a query using scripts or partial updates.
44
45
```python { .api }
46
def update_by_query(index: str, doc_type: str = None, body: dict = None, **params) -> dict:
47
"""
48
Update documents matching a query.
49
50
Parameters:
51
- index: Index name(s) to update
52
- doc_type: Document type(s)
53
- body: Update specification with query and script
54
- _source: Fields to include in response
55
- _source_excludes: Fields to exclude
56
- _source_includes: Fields to include
57
- allow_no_indices: Handle missing indices
58
- analyzer: Query analyzer
59
- analyze_wildcard: Analyze wildcards
60
- conflicts: How to handle version conflicts ('abort', 'proceed')
61
- default_operator: Default query operator
62
- df: Default field
63
- expand_wildcards: Wildcard expansion
64
- from_: Starting document
65
- ignore_unavailable: Ignore unavailable indices
66
- lenient: Ignore query failures
67
- pipeline: Ingest pipeline for updated documents
68
- preference: Node preference
69
- q: Query string
70
- refresh: Refresh after operation
71
- request_cache: Use request cache
72
- requests_per_second: Throttling rate (operations per second)
73
- routing: Routing values
74
- scroll: Scroll timeout for large updates
75
- scroll_size: Scroll batch size
76
- search_type: Search type
77
- search_timeout: Search timeout
78
- size: Maximum documents to update
79
- slices: Number of slices for parallel processing
80
- sort: Sort specification
81
- terminate_after: Terminate after N documents
82
- timeout: Request timeout
83
- version: Include versions in response
84
- version_type: Version type
85
- wait_for_active_shards: Wait for shards
86
- wait_for_completion: Wait for completion or return task
87
88
Body structure:
89
{
90
"query": {
91
"term": {"status": "draft"}
92
},
93
"script": {
94
"source": "ctx._source.status = 'published'; ctx._source.published_at = params.now",
95
"params": {"now": "2023-01-01T12:00:00Z"}
96
}
97
}
98
99
Returns:
100
dict: Update results with 'updated', 'version_conflicts', 'took', etc.
101
"""
102
```
103
104
### Delete by Query
105
106
Delete multiple documents matching a query efficiently.
107
108
```python { .api }
109
def delete_by_query(index: str, body: dict, doc_type: str = None, **params) -> dict:
110
"""
111
Delete documents matching a query.
112
113
Parameters:
114
- index: Index name(s) to delete from
115
- body: Delete specification with query
116
- doc_type: Document type(s)
117
- _source: Fields to include in response
118
- _source_excludes: Fields to exclude
119
- _source_includes: Fields to include
120
- allow_no_indices: Handle missing indices
121
- analyzer: Query analyzer
122
- analyze_wildcard: Analyze wildcards
123
- conflicts: How to handle conflicts ('abort', 'proceed')
124
- default_operator: Default query operator
125
- df: Default field
126
- expand_wildcards: Wildcard expansion
127
- from_: Starting document
128
- ignore_unavailable: Ignore unavailable indices
129
- lenient: Ignore query failures
130
- preference: Node preference
131
- q: Query string
132
- refresh: Refresh after operation
133
- request_cache: Use request cache
134
- requests_per_second: Throttling rate
135
- routing: Routing values
136
- scroll: Scroll timeout
137
- scroll_size: Scroll batch size
138
- search_type: Search type
139
- search_timeout: Search timeout
140
- size: Maximum documents to delete
141
- slices: Number of slices for parallel processing
142
- sort: Sort specification
143
- terminate_after: Terminate after N documents
144
- timeout: Request timeout
145
- version: Include versions
146
- wait_for_active_shards: Wait for shards
147
- wait_for_completion: Wait for completion
148
149
Body structure:
150
{
151
"query": {
152
"range": {
153
"created_at": {
154
"lt": "2022-01-01"
155
}
156
}
157
}
158
}
159
160
Returns:
161
dict: Deletion results with 'deleted', 'took', 'version_conflicts', etc.
162
"""
163
```
164
165
### Reindex Operations
166
167
Copy and transform documents between indices with optional query filtering and script processing.
168
169
```python { .api }
170
def reindex(body: dict, **params) -> dict:
171
"""
172
Copy documents from source to destination index with optional transformation.
173
174
Parameters:
175
- body: Reindex specification with source and destination
176
- refresh: Refresh destination index after operation
177
- requests_per_second: Throttling rate
178
- slices: Number of slices for parallel processing
179
- timeout: Request timeout
180
- wait_for_active_shards: Wait for shards
181
- wait_for_completion: Wait for completion or return task
182
183
Body structure:
184
{
185
"source": {
186
"index": "source_index",
187
"type": "_doc", # Optional
188
"query": { # Optional filtering
189
"term": {"status": "published"}
190
},
191
"_source": ["title", "content"], # Optional field filtering
192
"size": 1000 # Batch size
193
},
194
"dest": {
195
"index": "destination_index",
196
"type": "_doc", # Optional
197
"pipeline": "my_pipeline" # Optional ingest pipeline
198
},
199
"script": { # Optional transformation
200
"source": "ctx._source.new_field = ctx._source.old_field + '_transformed'"
201
},
202
"conflicts": "proceed" # Handle version conflicts
203
}
204
205
Returns:
206
dict: Reindex results with 'created', 'updated', 'took', etc.
207
"""
208
209
def reindex_rethrottle(task_id: str = None, **params) -> dict:
210
"""
211
Change throttling of a running reindex task.
212
213
Parameters:
214
- task_id: Task identifier from reindex operation
215
- requests_per_second: New throttling rate
216
217
Returns:
218
dict: Updated task information
219
"""
220
```
221
222
## Helper Functions
223
224
High-level helper utilities for common bulk operations with automatic batching, error handling, and progress tracking.
225
226
```python { .api }
227
from elasticsearch5.helpers import bulk, streaming_bulk, parallel_bulk, scan, reindex
228
229
def bulk(client, actions, stats_only: bool = False, **kwargs) -> tuple:
230
"""
231
Helper for bulk operations with automatic batching and error handling.
232
233
Parameters:
234
- client: Elasticsearch client instance
235
- actions: Iterable of document actions
236
- stats_only: Return only success count (bool)
237
- index: Default index name
238
- doc_type: Default document type
239
- chunk_size: Documents per batch (default 500)
240
- max_chunk_bytes: Maximum batch size in bytes
241
- thread_count: Number of threads for parallel processing
242
- queue_size: Queue size for threading
243
- expand_action_callback: Function to transform actions
244
- refresh: Refresh after operation
245
- request_timeout: Request timeout
246
- max_retries: Maximum retry attempts
247
- initial_backoff: Initial retry delay
248
- max_backoff: Maximum retry delay
249
250
Action format:
251
{
252
"_op_type": "index", # or "create", "update", "delete"
253
"_index": "my_index",
254
"_type": "_doc",
255
"_id": "document_id",
256
"_source": {"field": "value"} # For index/create
257
}
258
259
Returns:
260
If stats_only=False: (success_count, list_of_errors)
261
If stats_only=True: success_count
262
263
Raises:
264
BulkIndexError: If errors occurred and not ignored
265
"""
266
267
def streaming_bulk(client, actions, chunk_size: int = 500, **kwargs):
268
"""
269
Generator for streaming bulk operations.
270
271
Parameters: Same as bulk()
272
273
Yields:
274
(is_success: bool, action_result: dict) for each action
275
"""
276
277
def parallel_bulk(client, actions, thread_count: int = 4, **kwargs):
278
"""
279
Generator for parallel bulk operations using multiple threads.
280
281
Parameters: Same as bulk() plus thread_count
282
283
Yields:
284
(is_success: bool, action_result: dict) for each action
285
"""
286
287
def scan(client, query: dict = None, scroll: str = '5m', **kwargs):
288
"""
289
Generator to efficiently scroll through all matching documents.
290
291
Parameters:
292
- client: Elasticsearch client
293
- query: Search query (default match_all)
294
- scroll: Scroll timeout
295
- index: Index name(s)
296
- doc_type: Document type(s)
297
- raise_on_error: Raise on scroll errors
298
- preserve_order: Maintain result ordering
299
- size: Documents per scroll batch
300
- request_timeout: Request timeout
301
- clear_scroll: Clear scroll on completion
302
303
Yields:
304
Individual documents from search results
305
"""
306
307
def reindex(client, source_index: str, target_index: str, query: dict = None, **kwargs) -> tuple:
308
"""
309
Helper to reindex documents between indices.
310
311
Parameters:
312
- client: Elasticsearch client
313
- source_index: Source index name
314
- target_index: Target index name
315
- query: Optional query to filter documents
316
- target_client: Different client for target (for cross-cluster)
317
- chunk_size: Documents per batch
318
- scroll: Scroll timeout
319
- scan_kwargs: Additional scan() parameters
320
- bulk_kwargs: Additional bulk() parameters
321
322
Returns:
323
(success_count, list_of_errors)
324
"""
325
```
326
327
### Helper Exceptions
328
329
```python { .api }
330
from elasticsearch5.helpers import BulkIndexError, ScanError
331
332
class BulkIndexError(Exception):
333
"""
334
Exception for bulk operation failures.
335
336
Attributes:
337
- errors: List of failed actions with error details
338
"""
339
340
class ScanError(Exception):
341
"""
342
Exception for scan operation failures.
343
344
Attributes:
345
- scroll_id: Scroll ID for potential resume
346
"""
347
```
348
349
## Usage Examples
350
351
### Basic Bulk Operations
352
353
```python
354
from elasticsearch5 import Elasticsearch
355
356
es = Elasticsearch(['localhost:9200'])
357
358
# Prepare bulk operations
359
actions = [
360
{
361
"_op_type": "index",
362
"_index": "articles",
363
"_type": "_doc",
364
"_id": "1",
365
"_source": {"title": "Article 1", "content": "Content 1"}
366
},
367
{
368
"_op_type": "create",
369
"_index": "articles",
370
"_type": "_doc",
371
"_id": "2",
372
"_source": {"title": "Article 2", "content": "Content 2"}
373
},
374
{
375
"_op_type": "update",
376
"_index": "articles",
377
"_type": "_doc",
378
"_id": "3",
379
"_source": {"doc": {"title": "Updated Article 3"}}
380
},
381
{
382
"_op_type": "delete",
383
"_index": "articles",
384
"_type": "_doc",
385
"_id": "4"
386
}
387
]
388
389
# Execute bulk operations
390
response = es.bulk(body=actions)
391
392
# Check for errors
393
if response['errors']:
394
for item in response['items']:
395
for operation, result in item.items():
396
if 'error' in result:
397
print(f"Error in {operation}: {result['error']}")
398
else:
399
print(f"Successfully processed {len(response['items'])} operations")
400
```
401
402
### Using Helper Functions
403
404
```python
405
from elasticsearch5.helpers import bulk, scan, reindex
406
407
# Generate documents
408
def generate_docs():
409
for i in range(10000):
410
yield {
411
"_index": "large_index",
412
"_type": "_doc",
413
"_id": str(i),
414
"_source": {
415
"title": f"Document {i}",
416
"content": f"This is content for document {i}",
417
"category": "bulk_test"
418
}
419
}
420
421
# Bulk index with helper
422
try:
423
success, failed = bulk(es, generate_docs(), chunk_size=1000)
424
print(f"Successfully indexed {success} documents")
425
if failed:
426
print(f"Failed to index {len(failed)} documents")
427
except BulkIndexError as e:
428
print(f"Bulk indexing failed: {e}")
429
for error in e.errors:
430
print(f"Error: {error}")
431
```
432
433
### Streaming Bulk Processing
434
435
```python
436
from elasticsearch5.helpers import streaming_bulk
437
438
# Process large dataset with streaming
439
def process_large_dataset():
440
for is_success, result in streaming_bulk(es, generate_docs(), chunk_size=500):
441
if not is_success:
442
print(f"Failed to process: {result}")
443
else:
444
# Process successful result
445
pass
446
447
process_large_dataset()
448
```
449
450
### Update by Query
451
452
```python
453
# Update all draft articles to published
454
update_body = {
455
"query": {
456
"term": {"status": "draft"}
457
},
458
"script": {
459
"source": """
460
ctx._source.status = 'published';
461
ctx._source.published_at = params.now;
462
ctx._source.publish_count = (ctx._source.publish_count ?: 0) + 1
463
""",
464
"params": {
465
"now": "2023-01-01T12:00:00Z"
466
}
467
}
468
}
469
470
response = es.update_by_query(
471
index='articles',
472
body=update_body,
473
conflicts='proceed', # Continue on version conflicts
474
refresh=True
475
)
476
477
print(f"Updated {response['updated']} documents")
478
print(f"Version conflicts: {response.get('version_conflicts', 0)}")
479
```
480
481
### Reindex with Transformation
482
483
```python
484
# Reindex with field transformation and filtering
485
reindex_body = {
486
"source": {
487
"index": "old_articles",
488
"query": {
489
"range": {
490
"created_at": {
491
"gte": "2022-01-01"
492
}
493
}
494
}
495
},
496
"dest": {
497
"index": "new_articles",
498
"pipeline": "article_enrichment"
499
},
500
"script": {
501
"source": """
502
// Transform old format to new format
503
ctx._source.slug = ctx._source.title.toLowerCase().replaceAll('[^a-z0-9]+', '-');
504
ctx._source.word_count = ctx._source.content.split(' ').length;
505
ctx._source.migrated_at = '2023-01-01T00:00:00Z';
506
"""
507
}
508
}
509
510
response = es.reindex(
511
body=reindex_body,
512
wait_for_completion=False, # Run as task
513
slices='auto' # Parallel processing
514
)
515
516
if 'task' in response:
517
task_id = response['task']
518
print(f"Reindex started as task: {task_id}")
519
520
# Check task status later
521
task_status = es.tasks.get(task_id=task_id)
522
print(f"Task status: {task_status}")
523
```
524
525
### Scan and Process Large Datasets
526
527
```python
528
from elasticsearch5.helpers import scan
529
530
# Process all documents in an index
531
query = {
532
"query": {
533
"range": {
534
"created_at": {
535
"gte": "2023-01-01"
536
}
537
}
538
}
539
}
540
541
# Scan through all matching documents
542
processed_count = 0
543
for doc in scan(es, query=query, index='large_index', size=1000):
544
# Process each document
545
process_document(doc['_source'])
546
processed_count += 1
547
548
if processed_count % 10000 == 0:
549
print(f"Processed {processed_count} documents")
550
551
print(f"Total processed: {processed_count} documents")
552
```