0
# PyES Bulk Operations
1
2
## Overview
3
4
Bulk operations in PyES provide high-performance batch processing for indexing, updating, and deleting large numbers of documents. Rather than sending individual requests to ElasticSearch, bulk operations combine multiple operations into single requests, dramatically improving throughput and reducing network overhead. This is essential for applications that need to process large volumes of data efficiently.
5
6
## Core Bulk Classes
7
8
### ES Bulk Methods
9
10
```python { .api }
11
class ES:
12
"""
13
Main ES class bulk operation methods.
14
"""
15
16
def index_raw_bulk(self, header, document):
17
"""
18
Add raw bulk operation to buffer.
19
20
Args:
21
header (dict): Bulk operation header (index, update, delete)
22
document (dict, optional): Document body (not needed for delete)
23
"""
24
pass
25
26
def flush_bulk(self, forced=False):
27
"""
28
Flush buffered bulk operations to ElasticSearch.
29
30
Args:
31
forced (bool): Force flush even if bulk_size not reached. Default: False
32
33
Returns:
34
Bulk operation results
35
36
Raises:
37
BulkOperationException: If bulk operation fails
38
"""
39
pass
40
41
def force_bulk(self):
42
"""
43
Force immediate flush of all buffered bulk operations.
44
45
Returns:
46
Bulk operation results
47
"""
48
pass
49
50
def create_bulker(self):
51
"""
52
Create new bulker instance for managing bulk operations.
53
54
Returns:
55
Bulker: New bulker instance
56
"""
57
pass
58
59
@property
60
def bulk_size(self):
61
"""
62
Get current bulk size setting.
63
64
Returns:
65
int: Current bulk size
66
"""
67
pass
68
69
@bulk_size.setter
70
def bulk_size(self, size):
71
"""
72
Set bulk size for automatic flushing.
73
74
Args:
75
size (int): Number of operations per bulk request
76
"""
77
pass
78
79
# Basic bulk operations setup
80
from pyes import ES
81
82
es = ES('localhost:9200')
83
84
# Configure bulk processing
85
es.bulk_size = 1000 # Process 1000 operations per bulk request
86
87
# Bulk indexing with automatic flushing
88
for i in range(5000):
89
doc = {"title": f"Document {i}", "content": f"Content for document {i}"}
90
es.index(doc, "test_index", "doc", id=str(i), bulk=True)
91
# Automatically flushes when bulk_size (1000) is reached
92
93
# Manual flush for remaining documents
94
es.flush_bulk(forced=True)
95
```
96
97
## Bulk Document Operations
98
99
### Bulk Indexing
100
101
```python { .api }
102
def bulk_index_documents(es, documents, index, doc_type):
103
"""
104
Efficiently index large numbers of documents using bulk operations.
105
106
Args:
107
es (ES): ElasticSearch client instance
108
documents (list): List of documents to index
109
index (str): Target index name
110
doc_type (str): Document type
111
112
Returns:
113
dict: Bulk operation results and statistics
114
"""
115
116
# Configure bulk settings for optimal performance
117
es.bulk_size = 1000
118
119
stats = {
120
"total_docs": len(documents),
121
"processed": 0,
122
"errors": [],
123
"start_time": time.time()
124
}
125
126
try:
127
for i, doc in enumerate(documents):
128
# Add document to bulk buffer
129
es.index(doc, index, doc_type, id=doc.get('id', str(i)), bulk=True)
130
stats["processed"] += 1
131
132
# Progress reporting
133
if stats["processed"] % 10000 == 0:
134
print(f"Processed {stats['processed']}/{stats['total_docs']} documents")
135
136
# Flush any remaining documents
137
es.flush_bulk(forced=True)
138
139
stats["end_time"] = time.time()
140
stats["duration"] = stats["end_time"] - stats["start_time"]
141
stats["docs_per_second"] = stats["processed"] / stats["duration"]
142
143
return stats
144
145
except Exception as e:
146
stats["errors"].append(str(e))
147
return stats
148
149
# Large dataset indexing example
150
import time
151
152
# Generate sample documents
153
documents = []
154
for i in range(50000):
155
doc = {
156
"id": i,
157
"title": f"Document {i}",
158
"content": f"This is the content of document number {i}",
159
"category": f"category_{i % 10}",
160
"timestamp": int(time.time()) + i,
161
"views": i * 10,
162
"rating": (i % 5) + 1
163
}
164
documents.append(doc)
165
166
# Bulk index all documents
167
results = bulk_index_documents(es, documents, "bulk_test", "document")
168
print(f"Indexed {results['processed']} documents in {results['duration']:.2f} seconds")
169
print(f"Throughput: {results['docs_per_second']:.2f} docs/second")
170
```
171
172
### Bulk Updates
173
174
```python { .api }
175
def bulk_update_documents(es, updates, index, doc_type):
176
"""
177
Perform bulk updates on documents.
178
179
Args:
180
es (ES): ElasticSearch client instance
181
updates (list): List of update operations
182
index (str): Target index name
183
doc_type (str): Document type
184
185
Returns:
186
dict: Update results and statistics
187
"""
188
189
stats = {"updated": 0, "errors": []}
190
191
try:
192
for update_op in updates:
193
doc_id = update_op["id"]
194
195
if "script" in update_op:
196
# Script-based update
197
es.update(index, doc_type, doc_id,
198
script=update_op["script"],
199
params=update_op.get("params", {}),
200
bulk=True)
201
else:
202
# Document-based update
203
es.partial_update(index, doc_type, doc_id,
204
doc=update_op["doc"],
205
bulk=True)
206
207
stats["updated"] += 1
208
209
# Flush bulk updates
210
es.flush_bulk(forced=True)
211
212
except Exception as e:
213
stats["errors"].append(str(e))
214
215
return stats
216
217
# Bulk update examples
218
update_operations = [
219
{
220
"id": "1",
221
"script": "ctx._source.views += params.increment",
222
"params": {"increment": 10}
223
},
224
{
225
"id": "2",
226
"doc": {"category": "updated_category", "last_modified": "2023-12-01"}
227
},
228
{
229
"id": "3",
230
"script": "ctx._source.rating = Math.max(ctx._source.rating, params.min_rating)",
231
"params": {"min_rating": 3}
232
}
233
]
234
235
update_results = bulk_update_documents(es, update_operations, "bulk_test", "document")
236
print(f"Updated {update_results['updated']} documents")
237
```
238
239
### Bulk Deletions
240
241
```python { .api }
242
def bulk_delete_documents(es, doc_ids, index, doc_type):
243
"""
244
Perform bulk deletion of documents.
245
246
Args:
247
es (ES): ElasticSearch client instance
248
doc_ids (list): List of document IDs to delete
249
index (str): Target index name
250
doc_type (str): Document type
251
252
Returns:
253
dict: Deletion results
254
"""
255
256
stats = {"deleted": 0, "not_found": 0, "errors": []}
257
258
try:
259
for doc_id in doc_ids:
260
es.delete(index, doc_type, doc_id, bulk=True)
261
stats["deleted"] += 1
262
263
# Flush bulk deletions
264
results = es.flush_bulk(forced=True)
265
266
# Process results for detailed statistics
267
if results and "items" in results:
268
for item in results["items"]:
269
if "delete" in item:
270
delete_result = item["delete"]
271
if delete_result.get("status") == 404:
272
stats["not_found"] += 1
273
stats["deleted"] -= 1
274
275
except Exception as e:
276
stats["errors"].append(str(e))
277
278
return stats
279
280
# Bulk deletion example
281
doc_ids_to_delete = [str(i) for i in range(1000, 2000)] # Delete docs 1000-1999
282
deletion_results = bulk_delete_documents(es, doc_ids_to_delete, "bulk_test", "document")
283
284
print(f"Deleted: {deletion_results['deleted']}")
285
print(f"Not found: {deletion_results['not_found']}")
286
print(f"Errors: {len(deletion_results['errors'])}")
287
```
288
289
## Advanced Bulk Operations
290
291
### Mixed Bulk Operations
292
293
```python { .api }
294
def mixed_bulk_operations(es, operations):
295
"""
296
Execute mixed bulk operations (index, update, delete) in a single batch.
297
298
Args:
299
es (ES): ElasticSearch client instance
300
operations (list): List of mixed operation dictionaries
301
302
Returns:
303
dict: Operation results
304
"""
305
306
stats = {
307
"index_ops": 0,
308
"update_ops": 0,
309
"delete_ops": 0,
310
"errors": []
311
}
312
313
try:
314
for op in operations:
315
op_type = op["operation"]
316
317
if op_type == "index":
318
es.index(op["doc"], op["index"], op["type"],
319
id=op.get("id"), bulk=True)
320
stats["index_ops"] += 1
321
322
elif op_type == "update":
323
if "script" in op:
324
es.update(op["index"], op["type"], op["id"],
325
script=op["script"],
326
params=op.get("params", {}),
327
bulk=True)
328
else:
329
es.partial_update(op["index"], op["type"], op["id"],
330
doc=op["doc"], bulk=True)
331
stats["update_ops"] += 1
332
333
elif op_type == "delete":
334
es.delete(op["index"], op["type"], op["id"], bulk=True)
335
stats["delete_ops"] += 1
336
337
# Execute all operations
338
results = es.flush_bulk(forced=True)
339
340
# Process results for error handling
341
if results and "errors" in results and results["errors"]:
342
for item in results.get("items", []):
343
for action, result in item.items():
344
if "error" in result:
345
stats["errors"].append({
346
"action": action,
347
"id": result.get("_id"),
348
"error": result["error"]
349
})
350
351
except Exception as e:
352
stats["errors"].append({"general_error": str(e)})
353
354
return stats
355
356
# Mixed operations example
357
mixed_ops = [
358
# Index new documents
359
{
360
"operation": "index",
361
"index": "mixed_test",
362
"type": "doc",
363
"id": "new_1",
364
"doc": {"title": "New Document 1", "status": "active"}
365
},
366
{
367
"operation": "index",
368
"index": "mixed_test",
369
"type": "doc",
370
"id": "new_2",
371
"doc": {"title": "New Document 2", "status": "active"}
372
},
373
374
# Update existing documents
375
{
376
"operation": "update",
377
"index": "mixed_test",
378
"type": "doc",
379
"id": "existing_1",
380
"doc": {"last_updated": "2023-12-01", "status": "updated"}
381
},
382
{
383
"operation": "update",
384
"index": "mixed_test",
385
"type": "doc",
386
"id": "existing_2",
387
"script": "ctx._source.view_count += 1"
388
},
389
390
# Delete documents
391
{
392
"operation": "delete",
393
"index": "mixed_test",
394
"type": "doc",
395
"id": "old_1"
396
},
397
{
398
"operation": "delete",
399
"index": "mixed_test",
400
"type": "doc",
401
"id": "old_2"
402
}
403
]
404
405
mixed_results = mixed_bulk_operations(es, mixed_ops)
406
print(f"Index operations: {mixed_results['index_ops']}")
407
print(f"Update operations: {mixed_results['update_ops']}")
408
print(f"Delete operations: {mixed_results['delete_ops']}")
409
print(f"Errors: {len(mixed_results['errors'])}")
410
```
411
412
### Upsert Operations
413
414
```python { .api }
415
def bulk_upsert_documents(es, upsert_ops, index, doc_type):
416
"""
417
Perform bulk upsert operations (update if exists, create if not).
418
419
Args:
420
es (ES): ElasticSearch client instance
421
upsert_ops (list): List of upsert operations
422
index (str): Target index
423
doc_type (str): Document type
424
425
Returns:
426
dict: Upsert results
427
"""
428
429
stats = {"upserted": 0, "errors": []}
430
431
try:
432
for upsert_op in upsert_ops:
433
doc_id = upsert_op["id"]
434
435
# Use update with upsert
436
es.update(
437
index=index,
438
doc_type=doc_type,
439
id=doc_id,
440
document=upsert_op.get("doc", {}),
441
upsert=upsert_op.get("upsert", upsert_op.get("doc", {})),
442
script=upsert_op.get("script"),
443
params=upsert_op.get("params", {}),
444
bulk=True
445
)
446
447
stats["upserted"] += 1
448
449
# Flush upsert operations
450
es.flush_bulk(forced=True)
451
452
except Exception as e:
453
stats["errors"].append(str(e))
454
455
return stats
456
457
# Upsert examples
458
upsert_operations = [
459
{
460
"id": "user_123",
461
"doc": {"name": "John Doe", "last_seen": "2023-12-01"},
462
"upsert": {"name": "John Doe", "created": "2023-12-01", "last_seen": "2023-12-01"}
463
},
464
{
465
"id": "user_456",
466
"script": "if (ctx._source.containsKey('visit_count')) { ctx._source.visit_count += 1 } else { ctx._source.visit_count = 1 }",
467
"upsert": {"visit_count": 1, "created": "2023-12-01"}
468
}
469
]
470
471
upsert_results = bulk_upsert_documents(es, upsert_operations, "users", "profile")
472
```
473
474
## Bulk Operation Patterns
475
476
### Streaming Bulk Processor
477
478
```python { .api }
479
class StreamingBulkProcessor:
480
"""
481
Streaming bulk processor for continuous data ingestion.
482
483
Provides automatic batching, error handling, and performance monitoring
484
for high-volume data streams.
485
"""
486
487
def __init__(self, es_client, bulk_size=1000, flush_interval=30,
488
max_retries=3, retry_delay=5):
489
"""
490
Initialize StreamingBulkProcessor.
491
492
Args:
493
es_client (ES): ElasticSearch client
494
bulk_size (int): Documents per batch. Default: 1000
495
flush_interval (int): Auto-flush interval in seconds. Default: 30
496
max_retries (int): Maximum retry attempts. Default: 3
497
retry_delay (int): Delay between retries in seconds. Default: 5
498
"""
499
self.es = es_client
500
self.bulk_size = bulk_size
501
self.flush_interval = flush_interval
502
self.max_retries = max_retries
503
self.retry_delay = retry_delay
504
505
self.buffer = []
506
self.last_flush = time.time()
507
self.stats = {
508
"processed": 0,
509
"errors": 0,
510
"retries": 0,
511
"flushes": 0
512
}
513
514
def add_document(self, doc, index, doc_type, doc_id=None, operation="index"):
515
"""
516
Add document to processing buffer.
517
518
Args:
519
doc (dict): Document to process
520
index (str): Target index
521
doc_type (str): Document type
522
doc_id (str, optional): Document ID
523
operation (str): Operation type (index, update, delete). Default: "index"
524
"""
525
526
self.buffer.append({
527
"operation": operation,
528
"index": index,
529
"type": doc_type,
530
"id": doc_id or str(uuid.uuid4()),
531
"doc": doc
532
})
533
534
# Auto-flush if buffer is full or time interval exceeded
535
if (len(self.buffer) >= self.bulk_size or
536
time.time() - self.last_flush > self.flush_interval):
537
self.flush()
538
539
def flush(self):
540
"""Flush buffered operations to ElasticSearch."""
541
542
if not self.buffer:
543
return
544
545
retry_count = 0
546
success = False
547
548
while not success and retry_count <= self.max_retries:
549
try:
550
# Process buffer operations
551
for op in self.buffer:
552
if op["operation"] == "index":
553
self.es.index(op["doc"], op["index"], op["type"],
554
id=op["id"], bulk=True)
555
elif op["operation"] == "update":
556
self.es.partial_update(op["index"], op["type"],
557
op["id"], doc=op["doc"], bulk=True)
558
elif op["operation"] == "delete":
559
self.es.delete(op["index"], op["type"], op["id"], bulk=True)
560
561
# Execute bulk operations
562
self.es.flush_bulk(forced=True)
563
564
# Update statistics
565
self.stats["processed"] += len(self.buffer)
566
self.stats["flushes"] += 1
567
568
# Clear buffer and update timestamp
569
self.buffer.clear()
570
self.last_flush = time.time()
571
success = True
572
573
except Exception as e:
574
retry_count += 1
575
self.stats["retries"] += 1
576
577
if retry_count <= self.max_retries:
578
print(f"Bulk operation failed, retrying ({retry_count}/{self.max_retries}): {e}")
579
time.sleep(self.retry_delay)
580
else:
581
self.stats["errors"] += len(self.buffer)
582
print(f"Bulk operation failed after {self.max_retries} retries: {e}")
583
# Could save failed operations to dead letter queue here
584
self.buffer.clear()
585
586
def get_stats(self):
587
"""Get processor statistics."""
588
return {
589
**self.stats,
590
"buffer_size": len(self.buffer),
591
"last_flush": self.last_flush,
592
"uptime": time.time() - getattr(self, 'start_time', time.time())
593
}
594
595
def close(self):
596
"""Flush remaining operations and close processor."""
597
self.flush()
598
599
# Usage example
600
import uuid
601
import time
602
import json
603
604
processor = StreamingBulkProcessor(es, bulk_size=2000, flush_interval=60)
605
606
# Simulate continuous data stream
607
def simulate_data_stream():
608
"""Simulate continuous data ingestion."""
609
610
for i in range(100000):
611
doc = {
612
"timestamp": time.time(),
613
"event_type": f"event_{i % 10}",
614
"user_id": f"user_{i % 1000}",
615
"data": {"value": i, "category": f"cat_{i % 5}"}
616
}
617
618
processor.add_document(doc, "events", "event")
619
620
# Simulate processing delay
621
if i % 1000 == 0:
622
stats = processor.get_stats()
623
print(f"Processed: {stats['processed']}, Buffer: {stats['buffer_size']}")
624
time.sleep(0.1)
625
626
# Close processor
627
processor.close()
628
final_stats = processor.get_stats()
629
print(f"Final stats: {final_stats}")
630
631
# Run simulation
632
simulate_data_stream()
633
```
634
635
### Parallel Bulk Processing
636
637
```python { .api }
638
import threading
639
import queue
640
import concurrent.futures
641
642
class ParallelBulkProcessor:
643
"""
644
Parallel bulk processor for maximum throughput.
645
646
Uses multiple threads to process bulk operations concurrently.
647
"""
648
649
def __init__(self, es_client, num_workers=4, bulk_size=1000, queue_size=10000):
650
"""
651
Initialize ParallelBulkProcessor.
652
653
Args:
654
es_client (ES): ElasticSearch client
655
num_workers (int): Number of worker threads. Default: 4
656
bulk_size (int): Documents per bulk request. Default: 1000
657
queue_size (int): Maximum queue size. Default: 10000
658
"""
659
self.es = es_client
660
self.num_workers = num_workers
661
self.bulk_size = bulk_size
662
663
# Thread-safe queue for operations
664
self.operation_queue = queue.Queue(maxsize=queue_size)
665
self.result_queue = queue.Queue()
666
667
# Worker management
668
self.workers = []
669
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers)
670
self.running = False
671
672
# Statistics
673
self.stats = {
674
"queued": 0,
675
"processed": 0,
676
"errors": 0,
677
"active_workers": 0
678
}
679
680
def start(self):
681
"""Start the parallel processor."""
682
683
self.running = True
684
685
# Start worker threads
686
for i in range(self.num_workers):
687
future = self.executor.submit(self._worker_loop, i)
688
self.workers.append(future)
689
690
def _worker_loop(self, worker_id):
691
"""Worker thread main loop."""
692
693
batch = []
694
695
while self.running or not self.operation_queue.empty():
696
try:
697
# Get operation from queue (with timeout)
698
operation = self.operation_queue.get(timeout=1.0)
699
batch.append(operation)
700
701
# Process batch when full
702
if len(batch) >= self.bulk_size:
703
self._process_batch(batch, worker_id)
704
batch = []
705
706
self.operation_queue.task_done()
707
708
except queue.Empty:
709
# Process remaining batch if any
710
if batch and not self.running:
711
self._process_batch(batch, worker_id)
712
batch = []
713
continue
714
except Exception as e:
715
self.stats["errors"] += 1
716
print(f"Worker {worker_id} error: {e}")
717
718
# Process final batch
719
if batch:
720
self._process_batch(batch, worker_id)
721
722
def _process_batch(self, batch, worker_id):
723
"""Process a batch of operations."""
724
725
try:
726
self.stats["active_workers"] += 1
727
728
# Create separate ES client for this worker to avoid conflicts
729
worker_es = ES(self.es.server)
730
worker_es.bulk_size = len(batch)
731
732
for op in batch:
733
if op["operation"] == "index":
734
worker_es.index(op["doc"], op["index"], op["type"],
735
id=op["id"], bulk=True)
736
elif op["operation"] == "update":
737
worker_es.partial_update(op["index"], op["type"],
738
op["id"], doc=op["doc"], bulk=True)
739
elif op["operation"] == "delete":
740
worker_es.delete(op["index"], op["type"], op["id"], bulk=True)
741
742
# Execute bulk operations
743
results = worker_es.flush_bulk(forced=True)
744
745
# Update statistics
746
self.stats["processed"] += len(batch)
747
748
# Queue results for monitoring
749
self.result_queue.put({
750
"worker_id": worker_id,
751
"batch_size": len(batch),
752
"success": True,
753
"timestamp": time.time()
754
})
755
756
except Exception as e:
757
self.stats["errors"] += len(batch)
758
self.result_queue.put({
759
"worker_id": worker_id,
760
"batch_size": len(batch),
761
"success": False,
762
"error": str(e),
763
"timestamp": time.time()
764
})
765
finally:
766
self.stats["active_workers"] -= 1
767
768
def add_operation(self, operation, index, doc_type, doc_id=None, doc=None):
769
"""
770
Add operation to processing queue.
771
772
Args:
773
operation (str): Operation type (index, update, delete)
774
index (str): Target index
775
doc_type (str): Document type
776
doc_id (str, optional): Document ID
777
doc (dict, optional): Document data
778
"""
779
780
op = {
781
"operation": operation,
782
"index": index,
783
"type": doc_type,
784
"id": doc_id or str(uuid.uuid4()),
785
"doc": doc
786
}
787
788
self.operation_queue.put(op)
789
self.stats["queued"] += 1
790
791
def stop(self, timeout=60):
792
"""
793
Stop the processor and wait for completion.
794
795
Args:
796
timeout (int): Maximum wait time in seconds. Default: 60
797
"""
798
799
# Signal workers to stop
800
self.running = False
801
802
# Wait for queue to empty
803
self.operation_queue.join()
804
805
# Shutdown executor
806
self.executor.shutdown(wait=True, timeout=timeout)
807
808
def get_stats(self):
809
"""Get current processing statistics."""
810
return {
811
**self.stats,
812
"queue_size": self.operation_queue.qsize(),
813
"result_queue_size": self.result_queue.qsize()
814
}
815
816
# Usage example
817
def parallel_bulk_example():
818
"""Example of parallel bulk processing."""
819
820
# Create parallel processor
821
processor = ParallelBulkProcessor(es, num_workers=8, bulk_size=1500)
822
823
# Start processing
824
processor.start()
825
826
try:
827
# Add large number of operations
828
for i in range(100000):
829
doc = {
830
"id": i,
831
"title": f"Parallel Document {i}",
832
"content": f"Content for parallel processing test {i}",
833
"batch": i // 1000,
834
"timestamp": time.time()
835
}
836
837
processor.add_operation("index", "parallel_test", "doc", str(i), doc)
838
839
# Monitor progress
840
if i % 10000 == 0:
841
stats = processor.get_stats()
842
print(f"Queued: {stats['queued']}, Processed: {stats['processed']}, "
843
f"Active Workers: {stats['active_workers']}")
844
845
finally:
846
# Stop processor and wait for completion
847
processor.stop(timeout=120)
848
849
final_stats = processor.get_stats()
850
print(f"Final stats: {final_stats}")
851
852
# Run parallel processing example
853
parallel_bulk_example()
854
```
855
856
## Error Handling and Recovery
857
858
### Robust Bulk Error Handling
859
860
```python { .api }
861
from pyes import BulkOperationException
862
import logging
863
864
class RobustBulkProcessor:
865
"""
866
Bulk processor with comprehensive error handling and recovery.
867
"""
868
869
def __init__(self, es_client, bulk_size=1000):
870
self.es = es_client
871
self.bulk_size = bulk_size
872
self.logger = logging.getLogger("bulk_processor")
873
874
# Error tracking
875
self.error_stats = {
876
"version_conflicts": 0,
877
"document_missing": 0,
878
"index_missing": 0,
879
"mapping_errors": 0,
880
"timeout_errors": 0,
881
"other_errors": 0
882
}
883
884
# Failed operations for retry
885
self.failed_operations = []
886
self.dead_letter_queue = []
887
888
def process_with_error_handling(self, operations):
889
"""
890
Process operations with comprehensive error handling.
891
892
Args:
893
operations (list): List of bulk operations
894
895
Returns:
896
dict: Processing results with error details
897
"""
898
899
results = {
900
"successful": 0,
901
"failed": 0,
902
"errors": [],
903
"retry_needed": []
904
}
905
906
try:
907
# Execute bulk operations
908
for op in operations:
909
self._add_operation_to_bulk(op)
910
911
bulk_results = self.es.flush_bulk(forced=True)
912
913
# Process results for error handling
914
if bulk_results and "items" in bulk_results:
915
self._process_bulk_results(bulk_results["items"], results)
916
917
except BulkOperationException as e:
918
self.logger.error(f"Bulk operation exception: {e}")
919
results["errors"].append({"type": "bulk_exception", "message": str(e)})
920
921
# Handle specific bulk errors
922
if hasattr(e, 'errors') and e.errors:
923
for error_item in e.errors:
924
self._categorize_error(error_item, results)
925
926
except Exception as e:
927
self.logger.error(f"Unexpected error during bulk processing: {e}")
928
results["errors"].append({"type": "unexpected", "message": str(e)})
929
930
return results
931
932
def _add_operation_to_bulk(self, op):
933
"""Add operation to bulk buffer."""
934
935
if op["operation"] == "index":
936
self.es.index(op["doc"], op["index"], op["type"],
937
id=op["id"], bulk=True)
938
elif op["operation"] == "update":
939
self.es.partial_update(op["index"], op["type"], op["id"],
940
doc=op["doc"], bulk=True)
941
elif op["operation"] == "delete":
942
self.es.delete(op["index"], op["type"], op["id"], bulk=True)
943
944
def _process_bulk_results(self, items, results):
945
"""Process individual item results from bulk response."""
946
947
for item in items:
948
for action, result in item.items():
949
if "error" in result:
950
# Operation failed
951
results["failed"] += 1
952
error_info = {
953
"action": action,
954
"id": result.get("_id"),
955
"status": result.get("status"),
956
"error": result["error"]
957
}
958
959
# Categorize error for appropriate handling
960
if self._is_retryable_error(result):
961
results["retry_needed"].append(error_info)
962
else:
963
results["errors"].append(error_info)
964
self.dead_letter_queue.append(error_info)
965
966
self._update_error_stats(result["error"])
967
else:
968
# Operation successful
969
results["successful"] += 1
970
971
def _is_retryable_error(self, result):
972
"""Determine if error is retryable."""
973
974
error = result.get("error", {})
975
error_type = error.get("type", "")
976
status = result.get("status", 0)
977
978
# Retryable conditions
979
if status in [429, 503, 504]: # Rate limited or service unavailable
980
return True
981
if error_type in ["timeout_exception", "connect_timeout_exception"]:
982
return True
983
if "circuit_breaking_exception" in error_type:
984
return True
985
986
return False
987
988
def _categorize_error(self, error_item, results):
989
"""Categorize error for statistics and handling."""
990
991
error = error_item.get("error", {})
992
error_type = error.get("type", "")
993
994
if "version_conflict" in error_type:
995
self.error_stats["version_conflicts"] += 1
996
elif "document_missing" in error_type:
997
self.error_stats["document_missing"] += 1
998
elif "index_not_found" in error_type:
999
self.error_stats["index_missing"] += 1
1000
elif "mapper_parsing" in error_type:
1001
self.error_stats["mapping_errors"] += 1
1002
elif "timeout" in error_type:
1003
self.error_stats["timeout_errors"] += 1
1004
else:
1005
self.error_stats["other_errors"] += 1
1006
1007
def _update_error_stats(self, error):
1008
"""Update error statistics."""
1009
1010
error_type = error.get("type", "")
1011
1012
if "version_conflict" in error_type:
1013
self.error_stats["version_conflicts"] += 1
1014
elif "document_missing" in error_type:
1015
self.error_stats["document_missing"] += 1
1016
elif "index_not_found" in error_type:
1017
self.error_stats["index_missing"] += 1
1018
elif "mapper_parsing" in error_type:
1019
self.error_stats["mapping_errors"] += 1
1020
elif "timeout" in error_type:
1021
self.error_stats["timeout_errors"] += 1
1022
else:
1023
self.error_stats["other_errors"] += 1
1024
1025
def retry_failed_operations(self, max_retries=3, retry_delay=5):
1026
"""
1027
Retry operations that failed with retryable errors.
1028
1029
Args:
1030
max_retries (int): Maximum retry attempts. Default: 3
1031
retry_delay (int): Delay between retries in seconds. Default: 5
1032
1033
Returns:
1034
dict: Retry results
1035
"""
1036
1037
retry_results = {"successful_retries": 0, "permanent_failures": 0}
1038
1039
for attempt in range(max_retries):
1040
if not self.failed_operations:
1041
break
1042
1043
self.logger.info(f"Retry attempt {attempt + 1} for {len(self.failed_operations)} operations")
1044
1045
# Retry failed operations
1046
retry_batch = self.failed_operations.copy()
1047
self.failed_operations.clear()
1048
1049
results = self.process_with_error_handling(retry_batch)
1050
1051
retry_results["successful_retries"] += results["successful"]
1052
1053
# Failed retries go back to failed_operations
1054
self.failed_operations.extend(results["retry_needed"])
1055
1056
if not self.failed_operations:
1057
break
1058
1059
time.sleep(retry_delay)
1060
1061
# Move permanently failed operations to dead letter queue
1062
retry_results["permanent_failures"] = len(self.failed_operations)
1063
self.dead_letter_queue.extend(self.failed_operations)
1064
self.failed_operations.clear()
1065
1066
return retry_results
1067
1068
def get_error_summary(self):
1069
"""Get comprehensive error summary."""
1070
1071
return {
1072
"error_stats": self.error_stats,
1073
"failed_operations": len(self.failed_operations),
1074
"dead_letter_queue": len(self.dead_letter_queue),
1075
"total_errors": sum(self.error_stats.values())
1076
}
1077
1078
# Usage example with error handling
1079
def robust_bulk_processing_example():
1080
"""Example of robust bulk processing with error handling."""
1081
1082
processor = RobustBulkProcessor(es, bulk_size=1000)
1083
1084
# Create problematic operations to test error handling
1085
operations = []
1086
1087
for i in range(5000):
1088
# Mix of good and problematic operations
1089
if i % 100 == 0:
1090
# Version conflict (trying to update non-existent doc with version)
1091
operations.append({
1092
"operation": "update",
1093
"index": "test_index",
1094
"type": "doc",
1095
"id": f"conflict_{i}",
1096
"doc": {"field": "value"},
1097
"version": 99 # Will cause version conflict
1098
})
1099
elif i % 150 == 0:
1100
# Invalid document (missing required field)
1101
operations.append({
1102
"operation": "index",
1103
"index": "strict_index", # Index with strict mapping
1104
"type": "doc",
1105
"id": str(i),
1106
"doc": {"invalid_field": "value"} # Will cause mapping error
1107
})
1108
else:
1109
# Valid operation
1110
operations.append({
1111
"operation": "index",
1112
"index": "test_index",
1113
"type": "doc",
1114
"id": str(i),
1115
"doc": {"title": f"Document {i}", "content": f"Content {i}"}
1116
})
1117
1118
# Process with error handling
1119
results = processor.process_with_error_handling(operations)
1120
1121
print(f"Successful operations: {results['successful']}")
1122
print(f"Failed operations: {results['failed']}")
1123
print(f"Operations needing retry: {len(results['retry_needed'])}")
1124
1125
# Retry failed operations
1126
retry_results = processor.retry_failed_operations()
1127
print(f"Successful retries: {retry_results['successful_retries']}")
1128
print(f"Permanent failures: {retry_results['permanent_failures']}")
1129
1130
# Error summary
1131
error_summary = processor.get_error_summary()
1132
print(f"Error summary: {error_summary}")
1133
1134
# Run robust processing example
1135
robust_bulk_processing_example()
1136
```
1137
1138
## Performance Optimization
1139
1140
### Bulk Performance Tuning
1141
1142
```python { .api }
1143
def optimize_bulk_performance():
1144
"""Comprehensive bulk performance optimization strategies."""
1145
1146
# 1. Optimal bulk size calculation
1147
def calculate_optimal_bulk_size(avg_doc_size_kb, available_memory_mb,
1148
network_latency_ms):
1149
"""Calculate optimal bulk size based on system characteristics."""
1150
1151
# Target: 5-15MB per bulk request
1152
target_bulk_mb = 10
1153
1154
# Calculate based on document size
1155
docs_per_mb = 1024 / avg_doc_size_kb
1156
base_bulk_size = int(target_bulk_mb * docs_per_mb)
1157
1158
# Adjust for available memory (use max 10% for bulk buffer)
1159
memory_limit = (available_memory_mb * 0.1 * 1024) / avg_doc_size_kb
1160
memory_limited_size = int(min(base_bulk_size, memory_limit))
1161
1162
# Adjust for network latency
1163
if network_latency_ms > 100:
1164
# High latency: larger bulks to amortize network cost
1165
latency_adjusted = min(memory_limited_size * 2, 10000)
1166
elif network_latency_ms < 20:
1167
# Low latency: smaller bulks for faster feedback
1168
latency_adjusted = max(memory_limited_size // 2, 500)
1169
else:
1170
latency_adjusted = memory_limited_size
1171
1172
return max(100, min(latency_adjusted, 10000)) # Reasonable bounds
1173
1174
# 2. Connection optimization
1175
def optimize_es_connection():
1176
"""Optimize ElasticSearch connection for bulk operations."""
1177
1178
optimized_es = ES(
1179
server=["es1.example.com:9200", "es2.example.com:9200"], # Multiple nodes
1180
timeout=120.0, # Longer timeout for bulk operations
1181
max_retries=3, # Retry failed requests
1182
retry_time=30, # Wait between retries
1183
bulk_size=5000, # Optimized bulk size
1184
# Connection pooling (implementation-specific)
1185
connection_pool_size=10,
1186
connection_keep_alive=True
1187
)
1188
1189
return optimized_es
1190
1191
# 3. Document preparation optimization
1192
def prepare_documents_efficiently(raw_docs):
1193
"""Efficiently prepare documents for bulk indexing."""
1194
1195
prepared_docs = []
1196
1197
# Batch process documents
1198
for doc in raw_docs:
1199
# Minimize document size
1200
optimized_doc = {
1201
# Only include necessary fields
1202
k: v for k, v in doc.items()
1203
if v is not None and v != "" and k != "_internal"
1204
}
1205
1206
# Optimize field values
1207
if "timestamp" in optimized_doc:
1208
# Use epoch time instead of ISO string (smaller)
1209
optimized_doc["timestamp"] = int(optimized_doc["timestamp"])
1210
1211
# Compress large text fields if beneficial
1212
if "content" in optimized_doc and len(optimized_doc["content"]) > 1000:
1213
# Could implement compression here
1214
pass
1215
1216
prepared_docs.append(optimized_doc)
1217
1218
return prepared_docs
1219
1220
# 4. Memory management
1221
def memory_efficient_bulk_processing(documents, es_client):
1222
"""Process documents with memory efficiency."""
1223
1224
import gc
1225
1226
batch_size = 10000 # Process in memory-friendly batches
1227
total_processed = 0
1228
1229
for i in range(0, len(documents), batch_size):
1230
batch = documents[i:i + batch_size]
1231
1232
# Process batch
1233
for doc in batch:
1234
es_client.index(doc, "optimized_index", "doc", bulk=True)
1235
1236
# Flush and cleanup
1237
es_client.flush_bulk(forced=True)
1238
total_processed += len(batch)
1239
1240
# Force garbage collection to free memory
1241
if total_processed % (batch_size * 5) == 0:
1242
gc.collect()
1243
1244
print(f"Processed {total_processed}/{len(documents)} documents")
1245
1246
return total_processed
1247
1248
# 5. Index optimization for bulk operations
1249
def optimize_index_for_bulk_operations(es_client, index_name):
1250
"""Optimize index settings for bulk operations."""
1251
1252
# Temporarily disable refresh for faster indexing
1253
es_client.indices.update_settings(index_name, {
1254
"refresh_interval": "-1", # Disable auto-refresh
1255
"number_of_replicas": 0, # Reduce replicas during bulk load
1256
"translog.flush_threshold_size": "1gb", # Larger translog
1257
"merge.policy.max_merged_segment": "5gb" # Larger segments
1258
})
1259
1260
return {
1261
"refresh_interval": "1s", # Original settings to restore
1262
"number_of_replicas": 1,
1263
"translog.flush_threshold_size": "512mb",
1264
"merge.policy.max_merged_segment": "5gb"
1265
}
1266
1267
# 6. Post-bulk optimization
1268
def post_bulk_optimization(es_client, index_name, original_settings):
1269
"""Restore optimal settings after bulk operations."""
1270
1271
# Restore original settings
1272
es_client.indices.update_settings(index_name, original_settings)
1273
1274
# Force refresh to make documents searchable
1275
es_client.indices.refresh(index_name)
1276
1277
# Force merge to optimize segments
1278
es_client.indices.optimize(index_name, max_num_segments=1)
1279
1280
return {
1281
"calculate_bulk_size": calculate_optimal_bulk_size,
1282
"optimize_connection": optimize_es_connection,
1283
"prepare_documents": prepare_documents_efficiently,
1284
"memory_efficient_processing": memory_efficient_bulk_processing,
1285
"optimize_index": optimize_index_for_bulk_operations,
1286
"post_optimization": post_bulk_optimization
1287
}
1288
1289
# Example of comprehensive bulk optimization
1290
def optimized_bulk_pipeline():
1291
"""Complete optimized bulk processing pipeline."""
1292
1293
# Get optimization functions
1294
optimizers = optimize_bulk_performance()
1295
1296
# Calculate optimal bulk size (example values)
1297
optimal_size = optimizers["calculate_bulk_size"](
1298
avg_doc_size_kb=2.5, # 2.5KB average document size
1299
available_memory_mb=8192, # 8GB available memory
1300
network_latency_ms=50 # 50ms network latency
1301
)
1302
print(f"Calculated optimal bulk size: {optimal_size}")
1303
1304
# Optimize ES connection
1305
optimized_es = optimizers["optimize_connection"]()
1306
optimized_es.bulk_size = optimal_size
1307
1308
# Prepare index for bulk operations
1309
index_name = "optimized_bulk_index"
1310
original_settings = optimizers["optimize_index"](optimized_es, index_name)
1311
1312
try:
1313
# Generate and prepare documents
1314
raw_documents = [
1315
{"id": i, "title": f"Optimized Document {i}",
1316
"content": f"Optimized content for document {i}" * 10,
1317
"timestamp": time.time() + i}
1318
for i in range(100000)
1319
]
1320
1321
prepared_docs = optimizers["prepare_documents"](raw_documents)
1322
1323
# Process with memory efficiency
1324
processed_count = optimizers["memory_efficient_processing"](
1325
prepared_docs, optimized_es
1326
)
1327
1328
print(f"Successfully processed {processed_count} documents")
1329
1330
finally:
1331
# Restore optimal settings
1332
optimizers["post_optimization"](optimized_es, index_name, original_settings)
1333
1334
# Run optimized pipeline
1335
optimized_bulk_pipeline()
1336
```
1337
1338
PyES bulk operations provide powerful capabilities for high-performance data processing, with comprehensive support for error handling, parallel processing, and performance optimization to handle large-scale data ingestion efficiently.