or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-operations.mdclient.mdfacets-aggregations.mdfilters.mdindex.mdmappings.mdquery-dsl.mdrivers.md

bulk-operations.mddocs/

0

# PyES Bulk Operations

1

2

## Overview

3

4

Bulk operations in PyES provide high-performance batch processing for indexing, updating, and deleting large numbers of documents. Rather than sending individual requests to ElasticSearch, bulk operations combine multiple operations into single requests, dramatically improving throughput and reducing network overhead. This is essential for applications that need to process large volumes of data efficiently.

5

6

## Core Bulk Classes

7

8

### ES Bulk Methods

9

10

```python { .api }

11

class ES:

12

"""

13

Main ES class bulk operation methods.

14

"""

15

16

def index_raw_bulk(self, header, document):

17

"""

18

Add raw bulk operation to buffer.

19

20

Args:

21

header (dict): Bulk operation header (index, update, delete)

22

document (dict, optional): Document body (not needed for delete)

23

"""

24

pass

25

26

def flush_bulk(self, forced=False):

27

"""

28

Flush buffered bulk operations to ElasticSearch.

29

30

Args:

31

forced (bool): Force flush even if bulk_size not reached. Default: False

32

33

Returns:

34

Bulk operation results

35

36

Raises:

37

BulkOperationException: If bulk operation fails

38

"""

39

pass

40

41

def force_bulk(self):

42

"""

43

Force immediate flush of all buffered bulk operations.

44

45

Returns:

46

Bulk operation results

47

"""

48

pass

49

50

def create_bulker(self):

51

"""

52

Create new bulker instance for managing bulk operations.

53

54

Returns:

55

Bulker: New bulker instance

56

"""

57

pass

58

59

@property

60

def bulk_size(self):

61

"""

62

Get current bulk size setting.

63

64

Returns:

65

int: Current bulk size

66

"""

67

pass

68

69

@bulk_size.setter

70

def bulk_size(self, size):

71

"""

72

Set bulk size for automatic flushing.

73

74

Args:

75

size (int): Number of operations per bulk request

76

"""

77

pass

78

79

# Basic bulk operations setup

80

from pyes import ES

81

82

es = ES('localhost:9200')

83

84

# Configure bulk processing

85

es.bulk_size = 1000 # Process 1000 operations per bulk request

86

87

# Bulk indexing with automatic flushing

88

for i in range(5000):

89

doc = {"title": f"Document {i}", "content": f"Content for document {i}"}

90

es.index(doc, "test_index", "doc", id=str(i), bulk=True)

91

# Automatically flushes when bulk_size (1000) is reached

92

93

# Manual flush for remaining documents

94

es.flush_bulk(forced=True)

95

```

96

97

## Bulk Document Operations

98

99

### Bulk Indexing

100

101

```python { .api }

102

def bulk_index_documents(es, documents, index, doc_type):

103

"""

104

Efficiently index large numbers of documents using bulk operations.

105

106

Args:

107

es (ES): ElasticSearch client instance

108

documents (list): List of documents to index

109

index (str): Target index name

110

doc_type (str): Document type

111

112

Returns:

113

dict: Bulk operation results and statistics

114

"""

115

116

# Configure bulk settings for optimal performance

117

es.bulk_size = 1000

118

119

stats = {

120

"total_docs": len(documents),

121

"processed": 0,

122

"errors": [],

123

"start_time": time.time()

124

}

125

126

try:

127

for i, doc in enumerate(documents):

128

# Add document to bulk buffer

129

es.index(doc, index, doc_type, id=doc.get('id', str(i)), bulk=True)

130

stats["processed"] += 1

131

132

# Progress reporting

133

if stats["processed"] % 10000 == 0:

134

print(f"Processed {stats['processed']}/{stats['total_docs']} documents")

135

136

# Flush any remaining documents

137

es.flush_bulk(forced=True)

138

139

stats["end_time"] = time.time()

140

stats["duration"] = stats["end_time"] - stats["start_time"]

141

stats["docs_per_second"] = stats["processed"] / stats["duration"]

142

143

return stats

144

145

except Exception as e:

146

stats["errors"].append(str(e))

147

return stats

148

149

# Large dataset indexing example

150

import time

151

152

# Generate sample documents

153

documents = []

154

for i in range(50000):

155

doc = {

156

"id": i,

157

"title": f"Document {i}",

158

"content": f"This is the content of document number {i}",

159

"category": f"category_{i % 10}",

160

"timestamp": int(time.time()) + i,

161

"views": i * 10,

162

"rating": (i % 5) + 1

163

}

164

documents.append(doc)

165

166

# Bulk index all documents

167

results = bulk_index_documents(es, documents, "bulk_test", "document")

168

print(f"Indexed {results['processed']} documents in {results['duration']:.2f} seconds")

169

print(f"Throughput: {results['docs_per_second']:.2f} docs/second")

170

```

171

172

### Bulk Updates

173

174

```python { .api }

175

def bulk_update_documents(es, updates, index, doc_type):

176

"""

177

Perform bulk updates on documents.

178

179

Args:

180

es (ES): ElasticSearch client instance

181

updates (list): List of update operations

182

index (str): Target index name

183

doc_type (str): Document type

184

185

Returns:

186

dict: Update results and statistics

187

"""

188

189

stats = {"updated": 0, "errors": []}

190

191

try:

192

for update_op in updates:

193

doc_id = update_op["id"]

194

195

if "script" in update_op:

196

# Script-based update

197

es.update(index, doc_type, doc_id,

198

script=update_op["script"],

199

params=update_op.get("params", {}),

200

bulk=True)

201

else:

202

# Document-based update

203

es.partial_update(index, doc_type, doc_id,

204

doc=update_op["doc"],

205

bulk=True)

206

207

stats["updated"] += 1

208

209

# Flush bulk updates

210

es.flush_bulk(forced=True)

211

212

except Exception as e:

213

stats["errors"].append(str(e))

214

215

return stats

216

217

# Bulk update examples

218

update_operations = [

219

{

220

"id": "1",

221

"script": "ctx._source.views += params.increment",

222

"params": {"increment": 10}

223

},

224

{

225

"id": "2",

226

"doc": {"category": "updated_category", "last_modified": "2023-12-01"}

227

},

228

{

229

"id": "3",

230

"script": "ctx._source.rating = Math.max(ctx._source.rating, params.min_rating)",

231

"params": {"min_rating": 3}

232

}

233

]

234

235

update_results = bulk_update_documents(es, update_operations, "bulk_test", "document")

236

print(f"Updated {update_results['updated']} documents")

237

```

238

239

### Bulk Deletions

240

241

```python { .api }

242

def bulk_delete_documents(es, doc_ids, index, doc_type):

243

"""

244

Perform bulk deletion of documents.

245

246

Args:

247

es (ES): ElasticSearch client instance

248

doc_ids (list): List of document IDs to delete

249

index (str): Target index name

250

doc_type (str): Document type

251

252

Returns:

253

dict: Deletion results

254

"""

255

256

stats = {"deleted": 0, "not_found": 0, "errors": []}

257

258

try:

259

for doc_id in doc_ids:

260

es.delete(index, doc_type, doc_id, bulk=True)

261

stats["deleted"] += 1

262

263

# Flush bulk deletions

264

results = es.flush_bulk(forced=True)

265

266

# Process results for detailed statistics

267

if results and "items" in results:

268

for item in results["items"]:

269

if "delete" in item:

270

delete_result = item["delete"]

271

if delete_result.get("status") == 404:

272

stats["not_found"] += 1

273

stats["deleted"] -= 1

274

275

except Exception as e:

276

stats["errors"].append(str(e))

277

278

return stats

279

280

# Bulk deletion example

281

doc_ids_to_delete = [str(i) for i in range(1000, 2000)] # Delete docs 1000-1999

282

deletion_results = bulk_delete_documents(es, doc_ids_to_delete, "bulk_test", "document")

283

284

print(f"Deleted: {deletion_results['deleted']}")

285

print(f"Not found: {deletion_results['not_found']}")

286

print(f"Errors: {len(deletion_results['errors'])}")

287

```

288

289

## Advanced Bulk Operations

290

291

### Mixed Bulk Operations

292

293

```python { .api }

294

def mixed_bulk_operations(es, operations):

295

"""

296

Execute mixed bulk operations (index, update, delete) in a single batch.

297

298

Args:

299

es (ES): ElasticSearch client instance

300

operations (list): List of mixed operation dictionaries

301

302

Returns:

303

dict: Operation results

304

"""

305

306

stats = {

307

"index_ops": 0,

308

"update_ops": 0,

309

"delete_ops": 0,

310

"errors": []

311

}

312

313

try:

314

for op in operations:

315

op_type = op["operation"]

316

317

if op_type == "index":

318

es.index(op["doc"], op["index"], op["type"],

319

id=op.get("id"), bulk=True)

320

stats["index_ops"] += 1

321

322

elif op_type == "update":

323

if "script" in op:

324

es.update(op["index"], op["type"], op["id"],

325

script=op["script"],

326

params=op.get("params", {}),

327

bulk=True)

328

else:

329

es.partial_update(op["index"], op["type"], op["id"],

330

doc=op["doc"], bulk=True)

331

stats["update_ops"] += 1

332

333

elif op_type == "delete":

334

es.delete(op["index"], op["type"], op["id"], bulk=True)

335

stats["delete_ops"] += 1

336

337

# Execute all operations

338

results = es.flush_bulk(forced=True)

339

340

# Process results for error handling

341

if results and "errors" in results and results["errors"]:

342

for item in results.get("items", []):

343

for action, result in item.items():

344

if "error" in result:

345

stats["errors"].append({

346

"action": action,

347

"id": result.get("_id"),

348

"error": result["error"]

349

})

350

351

except Exception as e:

352

stats["errors"].append({"general_error": str(e)})

353

354

return stats

355

356

# Mixed operations example

357

mixed_ops = [

358

# Index new documents

359

{

360

"operation": "index",

361

"index": "mixed_test",

362

"type": "doc",

363

"id": "new_1",

364

"doc": {"title": "New Document 1", "status": "active"}

365

},

366

{

367

"operation": "index",

368

"index": "mixed_test",

369

"type": "doc",

370

"id": "new_2",

371

"doc": {"title": "New Document 2", "status": "active"}

372

},

373

374

# Update existing documents

375

{

376

"operation": "update",

377

"index": "mixed_test",

378

"type": "doc",

379

"id": "existing_1",

380

"doc": {"last_updated": "2023-12-01", "status": "updated"}

381

},

382

{

383

"operation": "update",

384

"index": "mixed_test",

385

"type": "doc",

386

"id": "existing_2",

387

"script": "ctx._source.view_count += 1"

388

},

389

390

# Delete documents

391

{

392

"operation": "delete",

393

"index": "mixed_test",

394

"type": "doc",

395

"id": "old_1"

396

},

397

{

398

"operation": "delete",

399

"index": "mixed_test",

400

"type": "doc",

401

"id": "old_2"

402

}

403

]

404

405

mixed_results = mixed_bulk_operations(es, mixed_ops)

406

print(f"Index operations: {mixed_results['index_ops']}")

407

print(f"Update operations: {mixed_results['update_ops']}")

408

print(f"Delete operations: {mixed_results['delete_ops']}")

409

print(f"Errors: {len(mixed_results['errors'])}")

410

```

411

412

### Upsert Operations

413

414

```python { .api }

415

def bulk_upsert_documents(es, upsert_ops, index, doc_type):

416

"""

417

Perform bulk upsert operations (update if exists, create if not).

418

419

Args:

420

es (ES): ElasticSearch client instance

421

upsert_ops (list): List of upsert operations

422

index (str): Target index

423

doc_type (str): Document type

424

425

Returns:

426

dict: Upsert results

427

"""

428

429

stats = {"upserted": 0, "errors": []}

430

431

try:

432

for upsert_op in upsert_ops:

433

doc_id = upsert_op["id"]

434

435

# Use update with upsert

436

es.update(

437

index=index,

438

doc_type=doc_type,

439

id=doc_id,

440

document=upsert_op.get("doc", {}),

441

upsert=upsert_op.get("upsert", upsert_op.get("doc", {})),

442

script=upsert_op.get("script"),

443

params=upsert_op.get("params", {}),

444

bulk=True

445

)

446

447

stats["upserted"] += 1

448

449

# Flush upsert operations

450

es.flush_bulk(forced=True)

451

452

except Exception as e:

453

stats["errors"].append(str(e))

454

455

return stats

456

457

# Upsert examples

458

upsert_operations = [

459

{

460

"id": "user_123",

461

"doc": {"name": "John Doe", "last_seen": "2023-12-01"},

462

"upsert": {"name": "John Doe", "created": "2023-12-01", "last_seen": "2023-12-01"}

463

},

464

{

465

"id": "user_456",

466

"script": "if (ctx._source.containsKey('visit_count')) { ctx._source.visit_count += 1 } else { ctx._source.visit_count = 1 }",

467

"upsert": {"visit_count": 1, "created": "2023-12-01"}

468

}

469

]

470

471

upsert_results = bulk_upsert_documents(es, upsert_operations, "users", "profile")

472

```

473

474

## Bulk Operation Patterns

475

476

### Streaming Bulk Processor

477

478

```python { .api }

479

class StreamingBulkProcessor:

480

"""

481

Streaming bulk processor for continuous data ingestion.

482

483

Provides automatic batching, error handling, and performance monitoring

484

for high-volume data streams.

485

"""

486

487

def __init__(self, es_client, bulk_size=1000, flush_interval=30,

488

max_retries=3, retry_delay=5):

489

"""

490

Initialize StreamingBulkProcessor.

491

492

Args:

493

es_client (ES): ElasticSearch client

494

bulk_size (int): Documents per batch. Default: 1000

495

flush_interval (int): Auto-flush interval in seconds. Default: 30

496

max_retries (int): Maximum retry attempts. Default: 3

497

retry_delay (int): Delay between retries in seconds. Default: 5

498

"""

499

self.es = es_client

500

self.bulk_size = bulk_size

501

self.flush_interval = flush_interval

502

self.max_retries = max_retries

503

self.retry_delay = retry_delay

504

505

self.buffer = []

506

self.last_flush = time.time()

507

self.stats = {

508

"processed": 0,

509

"errors": 0,

510

"retries": 0,

511

"flushes": 0

512

}

513

514

def add_document(self, doc, index, doc_type, doc_id=None, operation="index"):

515

"""

516

Add document to processing buffer.

517

518

Args:

519

doc (dict): Document to process

520

index (str): Target index

521

doc_type (str): Document type

522

doc_id (str, optional): Document ID

523

operation (str): Operation type (index, update, delete). Default: "index"

524

"""

525

526

self.buffer.append({

527

"operation": operation,

528

"index": index,

529

"type": doc_type,

530

"id": doc_id or str(uuid.uuid4()),

531

"doc": doc

532

})

533

534

# Auto-flush if buffer is full or time interval exceeded

535

if (len(self.buffer) >= self.bulk_size or

536

time.time() - self.last_flush > self.flush_interval):

537

self.flush()

538

539

def flush(self):

540

"""Flush buffered operations to ElasticSearch."""

541

542

if not self.buffer:

543

return

544

545

retry_count = 0

546

success = False

547

548

while not success and retry_count <= self.max_retries:

549

try:

550

# Process buffer operations

551

for op in self.buffer:

552

if op["operation"] == "index":

553

self.es.index(op["doc"], op["index"], op["type"],

554

id=op["id"], bulk=True)

555

elif op["operation"] == "update":

556

self.es.partial_update(op["index"], op["type"],

557

op["id"], doc=op["doc"], bulk=True)

558

elif op["operation"] == "delete":

559

self.es.delete(op["index"], op["type"], op["id"], bulk=True)

560

561

# Execute bulk operations

562

self.es.flush_bulk(forced=True)

563

564

# Update statistics

565

self.stats["processed"] += len(self.buffer)

566

self.stats["flushes"] += 1

567

568

# Clear buffer and update timestamp

569

self.buffer.clear()

570

self.last_flush = time.time()

571

success = True

572

573

except Exception as e:

574

retry_count += 1

575

self.stats["retries"] += 1

576

577

if retry_count <= self.max_retries:

578

print(f"Bulk operation failed, retrying ({retry_count}/{self.max_retries}): {e}")

579

time.sleep(self.retry_delay)

580

else:

581

self.stats["errors"] += len(self.buffer)

582

print(f"Bulk operation failed after {self.max_retries} retries: {e}")

583

# Could save failed operations to dead letter queue here

584

self.buffer.clear()

585

586

def get_stats(self):

587

"""Get processor statistics."""

588

return {

589

**self.stats,

590

"buffer_size": len(self.buffer),

591

"last_flush": self.last_flush,

592

"uptime": time.time() - getattr(self, 'start_time', time.time())

593

}

594

595

def close(self):

596

"""Flush remaining operations and close processor."""

597

self.flush()

598

599

# Usage example

600

import uuid

601

import time

602

import json

603

604

processor = StreamingBulkProcessor(es, bulk_size=2000, flush_interval=60)

605

606

# Simulate continuous data stream

607

def simulate_data_stream():

608

"""Simulate continuous data ingestion."""

609

610

for i in range(100000):

611

doc = {

612

"timestamp": time.time(),

613

"event_type": f"event_{i % 10}",

614

"user_id": f"user_{i % 1000}",

615

"data": {"value": i, "category": f"cat_{i % 5}"}

616

}

617

618

processor.add_document(doc, "events", "event")

619

620

# Simulate processing delay

621

if i % 1000 == 0:

622

stats = processor.get_stats()

623

print(f"Processed: {stats['processed']}, Buffer: {stats['buffer_size']}")

624

time.sleep(0.1)

625

626

# Close processor

627

processor.close()

628

final_stats = processor.get_stats()

629

print(f"Final stats: {final_stats}")

630

631

# Run simulation

632

simulate_data_stream()

633

```

634

635

### Parallel Bulk Processing

636

637

```python { .api }

638

import threading

639

import queue

640

import concurrent.futures

641

642

class ParallelBulkProcessor:

643

"""

644

Parallel bulk processor for maximum throughput.

645

646

Uses multiple threads to process bulk operations concurrently.

647

"""

648

649

def __init__(self, es_client, num_workers=4, bulk_size=1000, queue_size=10000):

650

"""

651

Initialize ParallelBulkProcessor.

652

653

Args:

654

es_client (ES): ElasticSearch client

655

num_workers (int): Number of worker threads. Default: 4

656

bulk_size (int): Documents per bulk request. Default: 1000

657

queue_size (int): Maximum queue size. Default: 10000

658

"""

659

self.es = es_client

660

self.num_workers = num_workers

661

self.bulk_size = bulk_size

662

663

# Thread-safe queue for operations

664

self.operation_queue = queue.Queue(maxsize=queue_size)

665

self.result_queue = queue.Queue()

666

667

# Worker management

668

self.workers = []

669

self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers)

670

self.running = False

671

672

# Statistics

673

self.stats = {

674

"queued": 0,

675

"processed": 0,

676

"errors": 0,

677

"active_workers": 0

678

}

679

680

def start(self):

681

"""Start the parallel processor."""

682

683

self.running = True

684

685

# Start worker threads

686

for i in range(self.num_workers):

687

future = self.executor.submit(self._worker_loop, i)

688

self.workers.append(future)

689

690

def _worker_loop(self, worker_id):

691

"""Worker thread main loop."""

692

693

batch = []

694

695

while self.running or not self.operation_queue.empty():

696

try:

697

# Get operation from queue (with timeout)

698

operation = self.operation_queue.get(timeout=1.0)

699

batch.append(operation)

700

701

# Process batch when full

702

if len(batch) >= self.bulk_size:

703

self._process_batch(batch, worker_id)

704

batch = []

705

706

self.operation_queue.task_done()

707

708

except queue.Empty:

709

# Process remaining batch if any

710

if batch and not self.running:

711

self._process_batch(batch, worker_id)

712

batch = []

713

continue

714

except Exception as e:

715

self.stats["errors"] += 1

716

print(f"Worker {worker_id} error: {e}")

717

718

# Process final batch

719

if batch:

720

self._process_batch(batch, worker_id)

721

722

def _process_batch(self, batch, worker_id):

723

"""Process a batch of operations."""

724

725

try:

726

self.stats["active_workers"] += 1

727

728

# Create separate ES client for this worker to avoid conflicts

729

worker_es = ES(self.es.server)

730

worker_es.bulk_size = len(batch)

731

732

for op in batch:

733

if op["operation"] == "index":

734

worker_es.index(op["doc"], op["index"], op["type"],

735

id=op["id"], bulk=True)

736

elif op["operation"] == "update":

737

worker_es.partial_update(op["index"], op["type"],

738

op["id"], doc=op["doc"], bulk=True)

739

elif op["operation"] == "delete":

740

worker_es.delete(op["index"], op["type"], op["id"], bulk=True)

741

742

# Execute bulk operations

743

results = worker_es.flush_bulk(forced=True)

744

745

# Update statistics

746

self.stats["processed"] += len(batch)

747

748

# Queue results for monitoring

749

self.result_queue.put({

750

"worker_id": worker_id,

751

"batch_size": len(batch),

752

"success": True,

753

"timestamp": time.time()

754

})

755

756

except Exception as e:

757

self.stats["errors"] += len(batch)

758

self.result_queue.put({

759

"worker_id": worker_id,

760

"batch_size": len(batch),

761

"success": False,

762

"error": str(e),

763

"timestamp": time.time()

764

})

765

finally:

766

self.stats["active_workers"] -= 1

767

768

def add_operation(self, operation, index, doc_type, doc_id=None, doc=None):

769

"""

770

Add operation to processing queue.

771

772

Args:

773

operation (str): Operation type (index, update, delete)

774

index (str): Target index

775

doc_type (str): Document type

776

doc_id (str, optional): Document ID

777

doc (dict, optional): Document data

778

"""

779

780

op = {

781

"operation": operation,

782

"index": index,

783

"type": doc_type,

784

"id": doc_id or str(uuid.uuid4()),

785

"doc": doc

786

}

787

788

self.operation_queue.put(op)

789

self.stats["queued"] += 1

790

791

def stop(self, timeout=60):

792

"""

793

Stop the processor and wait for completion.

794

795

Args:

796

timeout (int): Maximum wait time in seconds. Default: 60

797

"""

798

799

# Signal workers to stop

800

self.running = False

801

802

# Wait for queue to empty

803

self.operation_queue.join()

804

805

# Shutdown executor

806

self.executor.shutdown(wait=True, timeout=timeout)

807

808

def get_stats(self):

809

"""Get current processing statistics."""

810

return {

811

**self.stats,

812

"queue_size": self.operation_queue.qsize(),

813

"result_queue_size": self.result_queue.qsize()

814

}

815

816

# Usage example

817

def parallel_bulk_example():

818

"""Example of parallel bulk processing."""

819

820

# Create parallel processor

821

processor = ParallelBulkProcessor(es, num_workers=8, bulk_size=1500)

822

823

# Start processing

824

processor.start()

825

826

try:

827

# Add large number of operations

828

for i in range(100000):

829

doc = {

830

"id": i,

831

"title": f"Parallel Document {i}",

832

"content": f"Content for parallel processing test {i}",

833

"batch": i // 1000,

834

"timestamp": time.time()

835

}

836

837

processor.add_operation("index", "parallel_test", "doc", str(i), doc)

838

839

# Monitor progress

840

if i % 10000 == 0:

841

stats = processor.get_stats()

842

print(f"Queued: {stats['queued']}, Processed: {stats['processed']}, "

843

f"Active Workers: {stats['active_workers']}")

844

845

finally:

846

# Stop processor and wait for completion

847

processor.stop(timeout=120)

848

849

final_stats = processor.get_stats()

850

print(f"Final stats: {final_stats}")

851

852

# Run parallel processing example

853

parallel_bulk_example()

854

```

855

856

## Error Handling and Recovery

857

858

### Robust Bulk Error Handling

859

860

```python { .api }

861

from pyes import BulkOperationException

862

import logging

863

864

class RobustBulkProcessor:

865

"""

866

Bulk processor with comprehensive error handling and recovery.

867

"""

868

869

def __init__(self, es_client, bulk_size=1000):

870

self.es = es_client

871

self.bulk_size = bulk_size

872

self.logger = logging.getLogger("bulk_processor")

873

874

# Error tracking

875

self.error_stats = {

876

"version_conflicts": 0,

877

"document_missing": 0,

878

"index_missing": 0,

879

"mapping_errors": 0,

880

"timeout_errors": 0,

881

"other_errors": 0

882

}

883

884

# Failed operations for retry

885

self.failed_operations = []

886

self.dead_letter_queue = []

887

888

def process_with_error_handling(self, operations):

889

"""

890

Process operations with comprehensive error handling.

891

892

Args:

893

operations (list): List of bulk operations

894

895

Returns:

896

dict: Processing results with error details

897

"""

898

899

results = {

900

"successful": 0,

901

"failed": 0,

902

"errors": [],

903

"retry_needed": []

904

}

905

906

try:

907

# Execute bulk operations

908

for op in operations:

909

self._add_operation_to_bulk(op)

910

911

bulk_results = self.es.flush_bulk(forced=True)

912

913

# Process results for error handling

914

if bulk_results and "items" in bulk_results:

915

self._process_bulk_results(bulk_results["items"], results)

916

917

except BulkOperationException as e:

918

self.logger.error(f"Bulk operation exception: {e}")

919

results["errors"].append({"type": "bulk_exception", "message": str(e)})

920

921

# Handle specific bulk errors

922

if hasattr(e, 'errors') and e.errors:

923

for error_item in e.errors:

924

self._categorize_error(error_item, results)

925

926

except Exception as e:

927

self.logger.error(f"Unexpected error during bulk processing: {e}")

928

results["errors"].append({"type": "unexpected", "message": str(e)})

929

930

return results

931

932

def _add_operation_to_bulk(self, op):

933

"""Add operation to bulk buffer."""

934

935

if op["operation"] == "index":

936

self.es.index(op["doc"], op["index"], op["type"],

937

id=op["id"], bulk=True)

938

elif op["operation"] == "update":

939

self.es.partial_update(op["index"], op["type"], op["id"],

940

doc=op["doc"], bulk=True)

941

elif op["operation"] == "delete":

942

self.es.delete(op["index"], op["type"], op["id"], bulk=True)

943

944

def _process_bulk_results(self, items, results):

945

"""Process individual item results from bulk response."""

946

947

for item in items:

948

for action, result in item.items():

949

if "error" in result:

950

# Operation failed

951

results["failed"] += 1

952

error_info = {

953

"action": action,

954

"id": result.get("_id"),

955

"status": result.get("status"),

956

"error": result["error"]

957

}

958

959

# Categorize error for appropriate handling

960

if self._is_retryable_error(result):

961

results["retry_needed"].append(error_info)

962

else:

963

results["errors"].append(error_info)

964

self.dead_letter_queue.append(error_info)

965

966

self._update_error_stats(result["error"])

967

else:

968

# Operation successful

969

results["successful"] += 1

970

971

def _is_retryable_error(self, result):

972

"""Determine if error is retryable."""

973

974

error = result.get("error", {})

975

error_type = error.get("type", "")

976

status = result.get("status", 0)

977

978

# Retryable conditions

979

if status in [429, 503, 504]: # Rate limited or service unavailable

980

return True

981

if error_type in ["timeout_exception", "connect_timeout_exception"]:

982

return True

983

if "circuit_breaking_exception" in error_type:

984

return True

985

986

return False

987

988

def _categorize_error(self, error_item, results):

989

"""Categorize error for statistics and handling."""

990

991

error = error_item.get("error", {})

992

error_type = error.get("type", "")

993

994

if "version_conflict" in error_type:

995

self.error_stats["version_conflicts"] += 1

996

elif "document_missing" in error_type:

997

self.error_stats["document_missing"] += 1

998

elif "index_not_found" in error_type:

999

self.error_stats["index_missing"] += 1

1000

elif "mapper_parsing" in error_type:

1001

self.error_stats["mapping_errors"] += 1

1002

elif "timeout" in error_type:

1003

self.error_stats["timeout_errors"] += 1

1004

else:

1005

self.error_stats["other_errors"] += 1

1006

1007

def _update_error_stats(self, error):

1008

"""Update error statistics."""

1009

1010

error_type = error.get("type", "")

1011

1012

if "version_conflict" in error_type:

1013

self.error_stats["version_conflicts"] += 1

1014

elif "document_missing" in error_type:

1015

self.error_stats["document_missing"] += 1

1016

elif "index_not_found" in error_type:

1017

self.error_stats["index_missing"] += 1

1018

elif "mapper_parsing" in error_type:

1019

self.error_stats["mapping_errors"] += 1

1020

elif "timeout" in error_type:

1021

self.error_stats["timeout_errors"] += 1

1022

else:

1023

self.error_stats["other_errors"] += 1

1024

1025

def retry_failed_operations(self, max_retries=3, retry_delay=5):

1026

"""

1027

Retry operations that failed with retryable errors.

1028

1029

Args:

1030

max_retries (int): Maximum retry attempts. Default: 3

1031

retry_delay (int): Delay between retries in seconds. Default: 5

1032

1033

Returns:

1034

dict: Retry results

1035

"""

1036

1037

retry_results = {"successful_retries": 0, "permanent_failures": 0}

1038

1039

for attempt in range(max_retries):

1040

if not self.failed_operations:

1041

break

1042

1043

self.logger.info(f"Retry attempt {attempt + 1} for {len(self.failed_operations)} operations")

1044

1045

# Retry failed operations

1046

retry_batch = self.failed_operations.copy()

1047

self.failed_operations.clear()

1048

1049

results = self.process_with_error_handling(retry_batch)

1050

1051

retry_results["successful_retries"] += results["successful"]

1052

1053

# Failed retries go back to failed_operations

1054

self.failed_operations.extend(results["retry_needed"])

1055

1056

if not self.failed_operations:

1057

break

1058

1059

time.sleep(retry_delay)

1060

1061

# Move permanently failed operations to dead letter queue

1062

retry_results["permanent_failures"] = len(self.failed_operations)

1063

self.dead_letter_queue.extend(self.failed_operations)

1064

self.failed_operations.clear()

1065

1066

return retry_results

1067

1068

def get_error_summary(self):

1069

"""Get comprehensive error summary."""

1070

1071

return {

1072

"error_stats": self.error_stats,

1073

"failed_operations": len(self.failed_operations),

1074

"dead_letter_queue": len(self.dead_letter_queue),

1075

"total_errors": sum(self.error_stats.values())

1076

}

1077

1078

# Usage example with error handling

1079

def robust_bulk_processing_example():

1080

"""Example of robust bulk processing with error handling."""

1081

1082

processor = RobustBulkProcessor(es, bulk_size=1000)

1083

1084

# Create problematic operations to test error handling

1085

operations = []

1086

1087

for i in range(5000):

1088

# Mix of good and problematic operations

1089

if i % 100 == 0:

1090

# Version conflict (trying to update non-existent doc with version)

1091

operations.append({

1092

"operation": "update",

1093

"index": "test_index",

1094

"type": "doc",

1095

"id": f"conflict_{i}",

1096

"doc": {"field": "value"},

1097

"version": 99 # Will cause version conflict

1098

})

1099

elif i % 150 == 0:

1100

# Invalid document (missing required field)

1101

operations.append({

1102

"operation": "index",

1103

"index": "strict_index", # Index with strict mapping

1104

"type": "doc",

1105

"id": str(i),

1106

"doc": {"invalid_field": "value"} # Will cause mapping error

1107

})

1108

else:

1109

# Valid operation

1110

operations.append({

1111

"operation": "index",

1112

"index": "test_index",

1113

"type": "doc",

1114

"id": str(i),

1115

"doc": {"title": f"Document {i}", "content": f"Content {i}"}

1116

})

1117

1118

# Process with error handling

1119

results = processor.process_with_error_handling(operations)

1120

1121

print(f"Successful operations: {results['successful']}")

1122

print(f"Failed operations: {results['failed']}")

1123

print(f"Operations needing retry: {len(results['retry_needed'])}")

1124

1125

# Retry failed operations

1126

retry_results = processor.retry_failed_operations()

1127

print(f"Successful retries: {retry_results['successful_retries']}")

1128

print(f"Permanent failures: {retry_results['permanent_failures']}")

1129

1130

# Error summary

1131

error_summary = processor.get_error_summary()

1132

print(f"Error summary: {error_summary}")

1133

1134

# Run robust processing example

1135

robust_bulk_processing_example()

1136

```

1137

1138

## Performance Optimization

1139

1140

### Bulk Performance Tuning

1141

1142

```python { .api }

1143

def optimize_bulk_performance():

1144

"""Comprehensive bulk performance optimization strategies."""

1145

1146

# 1. Optimal bulk size calculation

1147

def calculate_optimal_bulk_size(avg_doc_size_kb, available_memory_mb,

1148

network_latency_ms):

1149

"""Calculate optimal bulk size based on system characteristics."""

1150

1151

# Target: 5-15MB per bulk request

1152

target_bulk_mb = 10

1153

1154

# Calculate based on document size

1155

docs_per_mb = 1024 / avg_doc_size_kb

1156

base_bulk_size = int(target_bulk_mb * docs_per_mb)

1157

1158

# Adjust for available memory (use max 10% for bulk buffer)

1159

memory_limit = (available_memory_mb * 0.1 * 1024) / avg_doc_size_kb

1160

memory_limited_size = int(min(base_bulk_size, memory_limit))

1161

1162

# Adjust for network latency

1163

if network_latency_ms > 100:

1164

# High latency: larger bulks to amortize network cost

1165

latency_adjusted = min(memory_limited_size * 2, 10000)

1166

elif network_latency_ms < 20:

1167

# Low latency: smaller bulks for faster feedback

1168

latency_adjusted = max(memory_limited_size // 2, 500)

1169

else:

1170

latency_adjusted = memory_limited_size

1171

1172

return max(100, min(latency_adjusted, 10000)) # Reasonable bounds

1173

1174

# 2. Connection optimization

1175

def optimize_es_connection():

1176

"""Optimize ElasticSearch connection for bulk operations."""

1177

1178

optimized_es = ES(

1179

server=["es1.example.com:9200", "es2.example.com:9200"], # Multiple nodes

1180

timeout=120.0, # Longer timeout for bulk operations

1181

max_retries=3, # Retry failed requests

1182

retry_time=30, # Wait between retries

1183

bulk_size=5000, # Optimized bulk size

1184

# Connection pooling (implementation-specific)

1185

connection_pool_size=10,

1186

connection_keep_alive=True

1187

)

1188

1189

return optimized_es

1190

1191

# 3. Document preparation optimization

1192

def prepare_documents_efficiently(raw_docs):

1193

"""Efficiently prepare documents for bulk indexing."""

1194

1195

prepared_docs = []

1196

1197

# Batch process documents

1198

for doc in raw_docs:

1199

# Minimize document size

1200

optimized_doc = {

1201

# Only include necessary fields

1202

k: v for k, v in doc.items()

1203

if v is not None and v != "" and k != "_internal"

1204

}

1205

1206

# Optimize field values

1207

if "timestamp" in optimized_doc:

1208

# Use epoch time instead of ISO string (smaller)

1209

optimized_doc["timestamp"] = int(optimized_doc["timestamp"])

1210

1211

# Compress large text fields if beneficial

1212

if "content" in optimized_doc and len(optimized_doc["content"]) > 1000:

1213

# Could implement compression here

1214

pass

1215

1216

prepared_docs.append(optimized_doc)

1217

1218

return prepared_docs

1219

1220

# 4. Memory management

1221

def memory_efficient_bulk_processing(documents, es_client):

1222

"""Process documents with memory efficiency."""

1223

1224

import gc

1225

1226

batch_size = 10000 # Process in memory-friendly batches

1227

total_processed = 0

1228

1229

for i in range(0, len(documents), batch_size):

1230

batch = documents[i:i + batch_size]

1231

1232

# Process batch

1233

for doc in batch:

1234

es_client.index(doc, "optimized_index", "doc", bulk=True)

1235

1236

# Flush and cleanup

1237

es_client.flush_bulk(forced=True)

1238

total_processed += len(batch)

1239

1240

# Force garbage collection to free memory

1241

if total_processed % (batch_size * 5) == 0:

1242

gc.collect()

1243

1244

print(f"Processed {total_processed}/{len(documents)} documents")

1245

1246

return total_processed

1247

1248

# 5. Index optimization for bulk operations

1249

def optimize_index_for_bulk_operations(es_client, index_name):

1250

"""Optimize index settings for bulk operations."""

1251

1252

# Temporarily disable refresh for faster indexing

1253

es_client.indices.update_settings(index_name, {

1254

"refresh_interval": "-1", # Disable auto-refresh

1255

"number_of_replicas": 0, # Reduce replicas during bulk load

1256

"translog.flush_threshold_size": "1gb", # Larger translog

1257

"merge.policy.max_merged_segment": "5gb" # Larger segments

1258

})

1259

1260

return {

1261

"refresh_interval": "1s", # Original settings to restore

1262

"number_of_replicas": 1,

1263

"translog.flush_threshold_size": "512mb",

1264

"merge.policy.max_merged_segment": "5gb"

1265

}

1266

1267

# 6. Post-bulk optimization

1268

def post_bulk_optimization(es_client, index_name, original_settings):

1269

"""Restore optimal settings after bulk operations."""

1270

1271

# Restore original settings

1272

es_client.indices.update_settings(index_name, original_settings)

1273

1274

# Force refresh to make documents searchable

1275

es_client.indices.refresh(index_name)

1276

1277

# Force merge to optimize segments

1278

es_client.indices.optimize(index_name, max_num_segments=1)

1279

1280

return {

1281

"calculate_bulk_size": calculate_optimal_bulk_size,

1282

"optimize_connection": optimize_es_connection,

1283

"prepare_documents": prepare_documents_efficiently,

1284

"memory_efficient_processing": memory_efficient_bulk_processing,

1285

"optimize_index": optimize_index_for_bulk_operations,

1286

"post_optimization": post_bulk_optimization

1287

}

1288

1289

# Example of comprehensive bulk optimization

1290

def optimized_bulk_pipeline():

1291

"""Complete optimized bulk processing pipeline."""

1292

1293

# Get optimization functions

1294

optimizers = optimize_bulk_performance()

1295

1296

# Calculate optimal bulk size (example values)

1297

optimal_size = optimizers["calculate_bulk_size"](

1298

avg_doc_size_kb=2.5, # 2.5KB average document size

1299

available_memory_mb=8192, # 8GB available memory

1300

network_latency_ms=50 # 50ms network latency

1301

)

1302

print(f"Calculated optimal bulk size: {optimal_size}")

1303

1304

# Optimize ES connection

1305

optimized_es = optimizers["optimize_connection"]()

1306

optimized_es.bulk_size = optimal_size

1307

1308

# Prepare index for bulk operations

1309

index_name = "optimized_bulk_index"

1310

original_settings = optimizers["optimize_index"](optimized_es, index_name)

1311

1312

try:

1313

# Generate and prepare documents

1314

raw_documents = [

1315

{"id": i, "title": f"Optimized Document {i}",

1316

"content": f"Optimized content for document {i}" * 10,

1317

"timestamp": time.time() + i}

1318

for i in range(100000)

1319

]

1320

1321

prepared_docs = optimizers["prepare_documents"](raw_documents)

1322

1323

# Process with memory efficiency

1324

processed_count = optimizers["memory_efficient_processing"](

1325

prepared_docs, optimized_es

1326

)

1327

1328

print(f"Successfully processed {processed_count} documents")

1329

1330

finally:

1331

# Restore optimal settings

1332

optimizers["post_optimization"](optimized_es, index_name, original_settings)

1333

1334

# Run optimized pipeline

1335

optimized_bulk_pipeline()

1336

```

1337

1338

PyES bulk operations provide powerful capabilities for high-performance data processing, with comprehensive support for error handling, parallel processing, and performance optimization to handle large-scale data ingestion efficiently.