or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-operations.mdclient.mdfacets-aggregations.mdfilters.mdindex.mdmappings.mdquery-dsl.mdrivers.md

client.mddocs/

0

# PyES Client Operations

1

2

## Overview

3

4

The `ES` class is the main entry point for all ElasticSearch operations in PyES. It manages connections, provides document operations, search functionality, index management, and cluster administration.

5

6

## ES Class API Reference

7

8

```python { .api }

9

class ES:

10

"""

11

Main ElasticSearch client class providing connection management

12

and all ElasticSearch operations.

13

14

Args:

15

server (str|list): ElasticSearch server(s). Default: "localhost:9200"

16

timeout (float): Request timeout in seconds. Default: 30.0

17

bulk_size (int): Number of operations per bulk request. Default: 400

18

encoder (callable): Custom JSON encoder. Default: None

19

decoder (callable): Custom JSON decoder. Default: None

20

max_retries (int): Maximum retry attempts. Default: 3

21

retry_time (int): Retry delay in seconds. Default: 60

22

default_indices (list): Default indices for operations. Default: None

23

default_types (list): Default document types. Default: None

24

log_curl (bool): Log curl commands. Default: False

25

dump_curl (bool): Dump curl to file. Default: False

26

model (class): Document model class. Default: ElasticSearchModel

27

basic_auth (tuple): (username, password) for authentication. Default: None

28

raise_on_bulk_item_failure (bool): Raise on bulk item failures. Default: False

29

document_object_field (str): Document object field name. Default: None

30

bulker_class (class): Bulk operations class. Default: ListBulker

31

cert_reqs (str): SSL certificate requirements. Default: 'CERT_OPTIONAL'

32

"""

33

34

def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,

35

encoder=None, decoder=None, max_retries=3, retry_time=60,

36

default_indices=None, default_types=None, log_curl=False,

37

dump_curl=False, model=ElasticSearchModel, basic_auth=None,

38

raise_on_bulk_item_failure=False, document_object_field=None,

39

bulker_class=ListBulker, cert_reqs='CERT_OPTIONAL'):

40

pass

41

```

42

43

## Connection Management

44

45

### Basic Connection

46

47

```python { .api }

48

from pyes import ES

49

50

# Single server connection

51

es = ES('localhost:9200')

52

53

# Multiple servers for failover

54

es = ES(['server1:9200', 'server2:9200', 'server3:9200'])

55

56

# With authentication

57

es = ES('localhost:9200', basic_auth=('username', 'password'))

58

59

# With SSL configuration

60

es = ES('https://secure-es.example.com:9200', cert_reqs='CERT_REQUIRED')

61

```

62

63

### Connection Configuration

64

65

```python { .api }

66

# Advanced connection configuration

67

es = ES(

68

server="localhost:9200",

69

timeout=45.0, # 45 second timeout

70

max_retries=5, # Retry failed requests 5 times

71

retry_time=30, # Wait 30s between retries

72

bulk_size=1000, # Process 1000 operations per bulk

73

log_curl=True, # Log equivalent curl commands

74

raise_on_bulk_item_failure=True # Raise exceptions on bulk failures

75

)

76

```

77

78

## Document Operations

79

80

### Index Documents

81

82

```python { .api }

83

def index(self, doc, index, doc_type, id=None, parent=None,

84

force_insert=False, bulk=False, **kwargs):

85

"""

86

Index a document in ElasticSearch.

87

88

Args:

89

doc (dict): Document to index

90

index (str): Index name

91

doc_type (str): Document type

92

id (str, optional): Document ID. Auto-generated if None

93

parent (str, optional): Parent document ID

94

force_insert (bool): Use PUT instead of POST. Default: False

95

bulk (bool): Add to bulk buffer instead of immediate index. Default: False

96

**kwargs: Additional parameters (routing, refresh, etc.)

97

98

Returns:

99

str: Document ID if successful

100

101

Raises:

102

DocumentAlreadyExistsException: If document exists and force_insert=True

103

IndexMissingException: If index doesn't exist

104

"""

105

pass

106

107

# Basic document indexing

108

doc = {

109

"title": "Python ElasticSearch Tutorial",

110

"content": "Learn how to use PyES library effectively",

111

"tags": ["python", "elasticsearch", "tutorial"],

112

"published_date": "2023-12-01",

113

"author": "Jane Developer",

114

"view_count": 0

115

}

116

117

# Index with auto-generated ID

118

doc_id = es.index(doc, "blog", "post")

119

120

# Index with specific ID

121

es.index(doc, "blog", "post", id="tutorial-001")

122

123

# Index with parent-child relationship

124

child_doc = {"comment": "Great tutorial!", "author": "John Reader"}

125

es.index(child_doc, "blog", "comment", parent="tutorial-001")

126

127

# Index with routing for performance

128

es.index(doc, "blog", "post", routing="python-category")

129

130

# Force refresh immediately

131

es.index(doc, "blog", "post", refresh=True)

132

```

133

134

### Retrieve Documents

135

136

```python { .api }

137

def get(self, index, doc_type, id, fields=None, model=None, **query_params):

138

"""

139

Retrieve a document by ID.

140

141

Args:

142

index (str): Index name

143

doc_type (str): Document type

144

id (str): Document ID

145

fields (list, optional): Specific fields to retrieve

146

model (class, optional): Custom model class for result

147

**query_params: Additional parameters (routing, preference, etc.)

148

149

Returns:

150

Document object with metadata

151

152

Raises:

153

DocumentMissingException: If document not found

154

IndexMissingException: If index doesn't exist

155

"""

156

pass

157

158

# Get complete document

159

document = es.get("blog", "post", "tutorial-001")

160

print(f"Title: {document.title}")

161

print(f"Content: {document.content}")

162

print(f"Document ID: {document._meta.id}")

163

print(f"Version: {document._meta.version}")

164

165

# Get specific fields only

166

document = es.get("blog", "post", "tutorial-001", fields=["title", "tags"])

167

168

# Get with routing

169

document = es.get("blog", "post", "tutorial-001", routing="python-category")

170

171

# Get from specific node preference

172

document = es.get("blog", "post", "tutorial-001", preference="_local")

173

```

174

175

### Multi-Get Operations

176

177

```python { .api }

178

def mget(self, ids, index=None, doc_type=None, **query_params):

179

"""

180

Retrieve multiple documents by IDs.

181

182

Args:

183

ids (list): List of document IDs or dicts with index/type/id

184

index (str, optional): Default index name

185

doc_type (str, optional): Default document type

186

**query_params: Additional parameters

187

188

Returns:

189

List of documents (None for missing documents)

190

"""

191

pass

192

193

# Get multiple documents from same index/type

194

docs = es.mget(["tutorial-001", "tutorial-002", "tutorial-003"],

195

index="blog", doc_type="post")

196

197

# Get documents from different indices/types

198

requests = [

199

{"_index": "blog", "_type": "post", "_id": "tutorial-001"},

200

{"_index": "news", "_type": "article", "_id": "news-001"},

201

{"_index": "blog", "_type": "comment", "_id": "comment-001"}

202

]

203

docs = es.mget(requests)

204

205

# Process results

206

for doc in docs:

207

if doc is not None:

208

print(f"Found: {doc.title}")

209

else:

210

print("Document not found")

211

```

212

213

### Update Documents

214

215

```python { .api }

216

def update(self, index, doc_type, id, script=None, lang="mvel",

217

params=None, document=None, upsert=None, **kwargs):

218

"""

219

Update a document using script or partial document.

220

221

Args:

222

index (str): Index name

223

doc_type (str): Document type

224

id (str): Document ID

225

script (str, optional): Update script

226

lang (str): Script language. Default: "mvel"

227

params (dict, optional): Script parameters

228

document (dict, optional): Partial document for update

229

upsert (dict, optional): Document to create if not exists

230

**kwargs: Additional parameters (routing, refresh, etc.)

231

232

Returns:

233

Update result information

234

235

Raises:

236

DocumentMissingException: If document not found and no upsert

237

VersionConflictEngineException: If version conflict occurs

238

"""

239

pass

240

241

def partial_update(self, index, doc_type, id, doc=None, script=None,

242

params=None, **kwargs):

243

"""

244

Partial update of a document.

245

246

Args:

247

index (str): Index name

248

doc_type (str): Document type

249

id (str): Document ID

250

doc (dict, optional): Partial document fields

251

script (str, optional): Update script

252

params (dict, optional): Script parameters

253

**kwargs: Additional parameters

254

255

Returns:

256

Update result information

257

"""

258

pass

259

260

# Script-based update

261

es.update("blog", "post", "tutorial-001",

262

script="ctx._source.view_count += params.increment",

263

params={"increment": 1})

264

265

# Partial document update

266

es.partial_update("blog", "post", "tutorial-001",

267

doc={"tags": ["python", "elasticsearch", "tutorial", "updated"]})

268

269

# Update with upsert (create if doesn't exist)

270

es.update("blog", "post", "new-tutorial",

271

document={"title": "New Tutorial", "content": "Content here"},

272

upsert={"title": "Default Title", "created_date": "2023-12-01"})

273

274

# Conditional update with version

275

es.update("blog", "post", "tutorial-001",

276

document={"status": "published"},

277

version=2) # Only update if current version is 2

278

```

279

280

### Delete Documents

281

282

```python { .api }

283

def delete(self, index, doc_type, id, bulk=False, **query_params):

284

"""

285

Delete a document by ID.

286

287

Args:

288

index (str): Index name

289

doc_type (str): Document type

290

id (str): Document ID

291

bulk (bool): Add to bulk buffer. Default: False

292

**query_params: Additional parameters (routing, refresh, etc.)

293

294

Returns:

295

Deletion result information

296

297

Raises:

298

DocumentMissingException: If document not found

299

"""

300

pass

301

302

def exists(self, index, doc_type, id, **query_params):

303

"""

304

Check if document exists.

305

306

Args:

307

index (str): Index name

308

doc_type (str): Document type

309

id (str): Document ID

310

**query_params: Additional parameters

311

312

Returns:

313

bool: True if document exists, False otherwise

314

"""

315

pass

316

317

# Delete document

318

es.delete("blog", "post", "tutorial-001")

319

320

# Check if document exists before deletion

321

if es.exists("blog", "post", "tutorial-001"):

322

es.delete("blog", "post", "tutorial-001")

323

print("Document deleted")

324

else:

325

print("Document not found")

326

327

# Delete with routing

328

es.delete("blog", "post", "tutorial-001", routing="python-category")

329

330

# Bulk deletion (added to bulk buffer)

331

es.delete("blog", "post", "tutorial-001", bulk=True)

332

es.delete("blog", "post", "tutorial-002", bulk=True)

333

es.flush_bulk() # Process all deletions

334

```

335

336

## Search Operations

337

338

### Basic Search

339

340

```python { .api }

341

def search(self, query, indices=None, doc_types=None, model=None,

342

scan=False, headers=None, **query_params):

343

"""

344

Execute a search query.

345

346

Args:

347

query (Query|dict): Query object or raw query dict

348

indices (list, optional): Indices to search. Uses default_indices if None

349

doc_types (list, optional): Document types to search

350

model (class, optional): Custom model class for results

351

scan (bool): Use scan search for large result sets. Default: False

352

headers (dict, optional): Custom HTTP headers

353

**query_params: Additional parameters (routing, preference, etc.)

354

355

Returns:

356

Search results with hits, facets, and metadata

357

"""

358

pass

359

360

def search_raw(self, query, indices=None, doc_types=None,

361

headers=None, **query_params):

362

"""

363

Execute search and return raw dictionary result.

364

365

Args:

366

query (Query|dict): Query object or raw query dict

367

indices (list, optional): Indices to search

368

doc_types (list, optional): Document types to search

369

headers (dict, optional): Custom HTTP headers

370

**query_params: Additional parameters

371

372

Returns:

373

dict: Raw ElasticSearch response

374

"""

375

pass

376

377

from pyes import Search, TermQuery, BoolQuery, RangeQuery

378

379

# Simple term search

380

query = Search(TermQuery("tags", "python"))

381

results = es.search(query, indices=["blog"])

382

383

# Process results

384

print(f"Total hits: {results.total}")

385

for hit in results:

386

print(f"Title: {hit.title}")

387

print(f"Score: {hit._meta.score}")

388

print(f"Index: {hit._meta.index}")

389

390

# Complex boolean search

391

complex_query = Search(

392

BoolQuery(

393

must=[TermQuery("status", "published")],

394

should=[

395

TermQuery("tags", "python"),

396

TermQuery("tags", "elasticsearch")

397

],

398

must_not=[TermQuery("category", "draft")],

399

filter=RangeQuery("published_date", gte="2023-01-01")

400

)

401

).size(20).sort("published_date", order="desc")

402

403

results = es.search(complex_query, indices=["blog", "news"])

404

405

# Raw search for custom processing

406

raw_query = {

407

"query": {"match": {"title": "python"}},

408

"highlight": {"fields": {"title": {}, "content": {}}}

409

}

410

raw_results = es.search_raw(raw_query, indices=["blog"])

411

```

412

413

### Multi-Search

414

415

```python { .api }

416

def search_multi(self, queries, indices_list=None, doc_types_list=None, **kwargs):

417

"""

418

Execute multiple search queries in a single request.

419

420

Args:

421

queries (list): List of query objects or dicts

422

indices_list (list, optional): List of indices for each query

423

doc_types_list (list, optional): List of doc types for each query

424

**kwargs: Additional parameters

425

426

Returns:

427

list: List of search results for each query

428

"""

429

pass

430

431

# Multiple searches in single request

432

queries = [

433

Search(TermQuery("tags", "python")),

434

Search(TermQuery("tags", "javascript")),

435

Search(RangeQuery("view_count", gte=1000))

436

]

437

438

indices_list = [["blog"], ["blog"], ["blog", "news"]]

439

440

results = es.search_multi(queries, indices_list)

441

442

for i, result in enumerate(results):

443

print(f"Query {i+1}: {result.total} hits")

444

```

445

446

### Scroll Search for Large Results

447

448

```python { .api }

449

def search_scroll(self, scroll_id, scroll="10m"):

450

"""

451

Continue scrolling through search results.

452

453

Args:

454

scroll_id (str): Scroll ID from previous search

455

scroll (str): Scroll timeout. Default: "10m"

456

457

Returns:

458

Next batch of search results

459

"""

460

pass

461

462

# Initial search with scroll

463

query = Search(TermQuery("status", "published")).size(1000)

464

results = es.search(query, indices=["blog"], scroll="5m")

465

466

all_docs = list(results) # First batch

467

468

# Continue scrolling for remaining results

469

while results.total > len(all_docs):

470

results = es.search_scroll(results._scroll_id, scroll="5m")

471

all_docs.extend(results)

472

473

if len(results) == 0: # No more results

474

break

475

476

print(f"Retrieved {len(all_docs)} total documents")

477

478

# Scan search for memory-efficient large result processing

479

query = Search(TermQuery("category", "products"))

480

results = es.search(query, indices=["catalog"], scan=True, scroll="2m", size=100)

481

482

for batch in results:

483

for doc in batch:

484

process_document(doc) # Process each document

485

```

486

487

### Count and Delete by Query

488

489

```python { .api }

490

def count(self, query=None, indices=None, doc_types=None, **query_params):

491

"""

492

Count documents matching query.

493

494

Args:

495

query (Query|dict, optional): Query to count. Counts all if None

496

indices (list, optional): Indices to search

497

doc_types (list, optional): Document types to search

498

**query_params: Additional parameters

499

500

Returns:

501

int: Number of matching documents

502

"""

503

pass

504

505

def delete_by_query(self, indices, doc_types, query, **query_params):

506

"""

507

Delete documents matching query.

508

509

Args:

510

indices (list): Indices to delete from

511

doc_types (list): Document types to delete from

512

query (Query|dict): Query to match documents for deletion

513

**query_params: Additional parameters

514

515

Returns:

516

Deletion result information

517

"""

518

pass

519

520

# Count all documents

521

total_docs = es.count(indices=["blog"])

522

523

# Count with query

524

python_posts = es.count(TermQuery("tags", "python"), indices=["blog"])

525

526

# Delete old documents

527

old_query = RangeQuery("published_date", lt="2020-01-01")

528

deletion_result = es.delete_by_query(["blog"], ["post"], old_query)

529

print(f"Deleted {deletion_result.total} old posts")

530

```

531

532

## Suggestion Operations

533

534

### Auto-completion and Suggestions

535

536

```python { .api }

537

def suggest(self, name, text, field, type='term', size=None, params=None, **kwargs):

538

"""

539

Get suggestions for text.

540

541

Args:

542

name (str): Suggestion name

543

text (str): Text to get suggestions for

544

field (str): Field to suggest on

545

type (str): Suggestion type ('term', 'phrase', 'completion'). Default: 'term'

546

size (int, optional): Number of suggestions to return

547

params (dict, optional): Additional suggestion parameters

548

**kwargs: Additional parameters (indices, etc.)

549

550

Returns:

551

Suggestion results

552

"""

553

pass

554

555

def suggest_from_object(self, suggest, indices=None, preference=None, **kwargs):

556

"""

557

Get suggestions from Suggest object.

558

559

Args:

560

suggest (Suggest): Suggest object with configured suggestions

561

indices (list, optional): Indices to suggest from

562

preference (str, optional): Node preference

563

**kwargs: Additional parameters

564

565

Returns:

566

Suggestion results

567

"""

568

pass

569

570

from pyes import Suggest

571

572

# Term suggestions for typos

573

suggestions = es.suggest("title_suggest", "pythno", "title", type="term")

574

575

# Phrase suggestions

576

phrase_suggestions = es.suggest("content_suggest", "elsticsearch", "content",

577

type="phrase", size=3)

578

579

# Multiple suggestions using Suggest object

580

suggest = Suggest()

581

suggest.add_term("python tutorial", "title_suggest", "title")

582

suggest.add_phrase("elasticsearch guide", "content_suggest", "content")

583

suggest.add_completion("py", "tag_suggest", "tags.suggest")

584

585

all_suggestions = es.suggest_from_object(suggest, indices=["blog"])

586

587

# Process suggestions

588

for suggestion_name, suggestion_results in all_suggestions.items():

589

print(f"Suggestions for {suggestion_name}:")

590

for option in suggestion_results[0].options:

591

print(f" - {option.text} (score: {option.score})")

592

```

593

594

## File Operations

595

596

### File Indexing and Retrieval

597

598

```python { .api }

599

def put_file(self, filename, index, doc_type, id=None, name=None):

600

"""

601

Index a file as attachment.

602

603

Args:

604

filename (str): Path to file to index

605

index (str): Index name

606

doc_type (str): Document type

607

id (str, optional): Document ID. Auto-generated if None

608

name (str, optional): Name for the attachment

609

610

Returns:

611

str: Document ID

612

"""

613

pass

614

615

def get_file(self, index, doc_type, id=None):

616

"""

617

Retrieve an indexed file.

618

619

Args:

620

index (str): Index name

621

doc_type (str): Document type

622

id (str, optional): Document ID

623

624

Returns:

625

File content and metadata

626

"""

627

pass

628

629

# Index PDF file

630

doc_id = es.put_file("/path/to/document.pdf", "documents", "attachment",

631

name="Important Document")

632

633

# Index with metadata

634

import os

635

from pyes import file_to_attachment

636

637

with open("/path/to/document.pdf", "rb") as f:

638

attachment = file_to_attachment(f.read(), "document.pdf")

639

640

doc = {

641

"title": "Important Document",

642

"uploaded_by": "john.doe",

643

"upload_date": "2023-12-01",

644

"file": attachment

645

}

646

647

es.index(doc, "documents", "attachment", id="doc-001")

648

649

# Retrieve file

650

file_doc = es.get_file("documents", "attachment", "doc-001")

651

print(f"File name: {file_doc.file.title}")

652

print(f"File size: {len(file_doc.file.content)} bytes")

653

```

654

655

## Percolator Operations

656

657

### Query Registration and Matching

658

659

```python { .api }

660

def create_percolator(self, index, name, query, **kwargs):

661

"""

662

Create a percolator query.

663

664

Args:

665

index (str): Index name

666

name (str): Percolator name

667

query (Query|dict): Query to register

668

**kwargs: Additional parameters

669

670

Returns:

671

Creation result

672

"""

673

pass

674

675

def delete_percolator(self, index, name):

676

"""

677

Delete a percolator query.

678

679

Args:

680

index (str): Index name

681

name (str): Percolator name

682

683

Returns:

684

Deletion result

685

"""

686

pass

687

688

def percolate(self, index, doc_types, query):

689

"""

690

Test document against registered percolator queries.

691

692

Args:

693

index (str): Index name

694

doc_types (list): Document types

695

query (dict): Document to test

696

697

Returns:

698

Matching percolator queries

699

"""

700

pass

701

702

# Register percolator queries for content filtering

703

python_query = TermQuery("tags", "python")

704

es.create_percolator("blog", "python_posts", python_query)

705

706

tutorial_query = BoolQuery(

707

must=[TermQuery("category", "tutorial")],

708

should=[TermQuery("difficulty", "beginner")]

709

)

710

es.create_percolator("blog", "beginner_tutorials", tutorial_query)

711

712

# Test document against percolators

713

test_doc = {

714

"title": "Python Basics Tutorial",

715

"tags": ["python", "programming"],

716

"category": "tutorial",

717

"difficulty": "beginner"

718

}

719

720

matches = es.percolate("blog", ["post"], {"doc": test_doc})

721

print(f"Matching queries: {[match.id for match in matches]}")

722

# Output: ['python_posts', 'beginner_tutorials']

723

```

724

725

## More Like This

726

727

### Similar Document Discovery

728

729

```python { .api }

730

def morelikethis(self, index, doc_type, id, fields, **query_params):

731

"""

732

Find documents similar to the specified document.

733

734

Args:

735

index (str): Index name

736

doc_type (str): Document type

737

id (str): Document ID to find similar documents for

738

fields (list): Fields to use for similarity calculation

739

**query_params: Additional MLT parameters (min_term_freq, max_query_terms, etc.)

740

741

Returns:

742

Similar documents

743

"""

744

pass

745

746

# Find similar blog posts

747

similar_posts = es.morelikethis(

748

"blog", "post", "tutorial-001",

749

fields=["title", "content", "tags"],

750

min_term_freq=1,

751

max_query_terms=12,

752

min_doc_freq=1,

753

stop_words=["the", "and", "or", "but"]

754

)

755

756

print(f"Found {similar_posts.total} similar posts:")

757

for post in similar_posts:

758

print(f" - {post.title} (score: {post._meta.score})")

759

```

760

761

## Properties and Configuration

762

763

### Dynamic Properties

764

765

```python { .api }

766

@property

767

def mappings(self):

768

"""

769

Get Mapper instance for mapping management.

770

771

Returns:

772

Mapper: Mapping management instance

773

"""

774

pass

775

776

@property

777

def default_indices(self):

778

"""

779

Get default indices for operations.

780

781

Returns:

782

list: Default indices

783

"""

784

pass

785

786

@default_indices.setter

787

def default_indices(self, indices):

788

"""

789

Set default indices for operations.

790

791

Args:

792

indices (list): Default indices to use

793

"""

794

pass

795

796

@property

797

def bulk_size(self):

798

"""

799

Get current bulk operation size.

800

801

Returns:

802

int: Current bulk size

803

"""

804

pass

805

806

@bulk_size.setter

807

def bulk_size(self, size):

808

"""

809

Set bulk operation size.

810

811

Args:

812

size (int): New bulk size

813

"""

814

pass

815

816

# Configure default behavior

817

es.default_indices = ["blog", "news"] # Default to these indices

818

es.bulk_size = 1000 # Process 1000 operations per bulk

819

820

# Access mapping management

821

mapping = es.mappings

822

mapping.create_index_if_missing("new_index")

823

824

# Search using default indices (no need to specify)

825

results = es.search(TermQuery("status", "published")) # Uses default_indices

826

```

827

828

## Error Handling

829

830

### Exception Management

831

832

```python { .api }

833

from pyes import (

834

ElasticSearchException, IndexMissingException,

835

DocumentMissingException, DocumentAlreadyExistsException,

836

VersionConflictEngineException, BulkOperationException,

837

NoServerAvailable

838

)

839

840

try:

841

# Document operations with error handling

842

doc_id = es.index(document, "blog", "post", id="existing-id")

843

844

except DocumentAlreadyExistsException:

845

# Handle duplicate document

846

print("Document already exists, updating instead")

847

es.update("blog", "post", "existing-id", document=document)

848

849

except IndexMissingException:

850

# Create index and retry

851

print("Index missing, creating index")

852

es.indices.create_index("blog")

853

doc_id = es.index(document, "blog", "post", id="existing-id")

854

855

except VersionConflictEngineException as e:

856

# Handle version conflicts

857

print(f"Version conflict: {e}")

858

current_doc = es.get("blog", "post", "existing-id")

859

# Resolve conflict and retry with current version

860

861

except NoServerAvailable:

862

# Handle connection failures

863

print("No ElasticSearch servers available")

864

# Implement fallback or retry logic

865

866

except ElasticSearchException as e:

867

# Handle general ES exceptions

868

print(f"ElasticSearch error: {e}")

869

```

870

871

## Best Practices

872

873

### Performance Optimization

874

875

```python { .api }

876

# Connection pooling for concurrent applications

877

import threading

878

879

class ESConnectionPool:

880

def __init__(self, servers, pool_size=10):

881

self.servers = servers

882

self.pool = []

883

self.lock = threading.Lock()

884

885

for _ in range(pool_size):

886

es = ES(servers, timeout=30, max_retries=3)

887

self.pool.append(es)

888

889

def get_connection(self):

890

with self.lock:

891

if self.pool:

892

return self.pool.pop()

893

else:

894

return ES(self.servers)

895

896

def return_connection(self, es):

897

with self.lock:

898

self.pool.append(es)

899

900

# Use connection pool

901

pool = ESConnectionPool(['server1:9200', 'server2:9200'])

902

903

def worker_function():

904

es = pool.get_connection()

905

try:

906

# Perform operations

907

results = es.search(query, indices=["data"])

908

# Process results

909

finally:

910

pool.return_connection(es)

911

```

912

913

### Bulk Processing Patterns

914

915

```python { .api }

916

# Efficient bulk processing with error handling

917

def bulk_index_documents(es, documents, index, doc_type):

918

"""Efficiently index large numbers of documents."""

919

920

es.bulk_size = 1000 # Optimize batch size

921

failed_docs = []

922

923

try:

924

for i, doc in enumerate(documents):

925

try:

926

es.index(doc, index, doc_type, bulk=True)

927

928

# Force flush every 10,000 documents

929

if i % 10000 == 0:

930

es.flush_bulk()

931

932

except Exception as e:

933

failed_docs.append((i, doc, str(e)))

934

935

# Final flush

936

es.flush_bulk()

937

938

except BulkOperationException as e:

939

# Handle bulk operation failures

940

for failure in e.errors:

941

print(f"Bulk failure: {failure}")

942

943

return failed_docs

944

945

# Usage

946

documents = [{"title": f"Doc {i}", "content": f"Content {i}"} for i in range(50000)]

947

failures = bulk_index_documents(es, documents, "bulk_index", "document")

948

949

if failures:

950

print(f"Failed to index {len(failures)} documents")

951

# Implement retry logic for failures

952

```

953

954

The ES client provides comprehensive functionality for all ElasticSearch operations with robust error handling, flexible configuration options, and performance optimization features.