0
# PyES Client Operations
1
2
## Overview
3
4
The `ES` class is the main entry point for all ElasticSearch operations in PyES. It manages connections, provides document operations, search functionality, index management, and cluster administration.
5
6
## ES Class API Reference
7
8
```python { .api }
9
class ES:
10
"""
11
Main ElasticSearch client class providing connection management
12
and all ElasticSearch operations.
13
14
Args:
15
server (str|list): ElasticSearch server(s). Default: "localhost:9200"
16
timeout (float): Request timeout in seconds. Default: 30.0
17
bulk_size (int): Number of operations per bulk request. Default: 400
18
encoder (callable): Custom JSON encoder. Default: None
19
decoder (callable): Custom JSON decoder. Default: None
20
max_retries (int): Maximum retry attempts. Default: 3
21
retry_time (int): Retry delay in seconds. Default: 60
22
default_indices (list): Default indices for operations. Default: None
23
default_types (list): Default document types. Default: None
24
log_curl (bool): Log curl commands. Default: False
25
dump_curl (bool): Dump curl to file. Default: False
26
model (class): Document model class. Default: ElasticSearchModel
27
basic_auth (tuple): (username, password) for authentication. Default: None
28
raise_on_bulk_item_failure (bool): Raise on bulk item failures. Default: False
29
document_object_field (str): Document object field name. Default: None
30
bulker_class (class): Bulk operations class. Default: ListBulker
31
cert_reqs (str): SSL certificate requirements. Default: 'CERT_OPTIONAL'
32
"""
33
34
def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
35
encoder=None, decoder=None, max_retries=3, retry_time=60,
36
default_indices=None, default_types=None, log_curl=False,
37
dump_curl=False, model=ElasticSearchModel, basic_auth=None,
38
raise_on_bulk_item_failure=False, document_object_field=None,
39
bulker_class=ListBulker, cert_reqs='CERT_OPTIONAL'):
40
pass
41
```
42
43
## Connection Management
44
45
### Basic Connection
46
47
```python { .api }
48
from pyes import ES
49
50
# Single server connection
51
es = ES('localhost:9200')
52
53
# Multiple servers for failover
54
es = ES(['server1:9200', 'server2:9200', 'server3:9200'])
55
56
# With authentication
57
es = ES('localhost:9200', basic_auth=('username', 'password'))
58
59
# With SSL configuration
60
es = ES('https://secure-es.example.com:9200', cert_reqs='CERT_REQUIRED')
61
```
62
63
### Connection Configuration
64
65
```python { .api }
66
# Advanced connection configuration
67
es = ES(
68
server="localhost:9200",
69
timeout=45.0, # 45 second timeout
70
max_retries=5, # Retry failed requests 5 times
71
retry_time=30, # Wait 30s between retries
72
bulk_size=1000, # Process 1000 operations per bulk
73
log_curl=True, # Log equivalent curl commands
74
raise_on_bulk_item_failure=True # Raise exceptions on bulk failures
75
)
76
```
77
78
## Document Operations
79
80
### Index Documents
81
82
```python { .api }
83
def index(self, doc, index, doc_type, id=None, parent=None,
84
force_insert=False, bulk=False, **kwargs):
85
"""
86
Index a document in ElasticSearch.
87
88
Args:
89
doc (dict): Document to index
90
index (str): Index name
91
doc_type (str): Document type
92
id (str, optional): Document ID. Auto-generated if None
93
parent (str, optional): Parent document ID
94
force_insert (bool): Use PUT instead of POST. Default: False
95
bulk (bool): Add to bulk buffer instead of immediate index. Default: False
96
**kwargs: Additional parameters (routing, refresh, etc.)
97
98
Returns:
99
str: Document ID if successful
100
101
Raises:
102
DocumentAlreadyExistsException: If document exists and force_insert=True
103
IndexMissingException: If index doesn't exist
104
"""
105
pass
106
107
# Basic document indexing
108
doc = {
109
"title": "Python ElasticSearch Tutorial",
110
"content": "Learn how to use PyES library effectively",
111
"tags": ["python", "elasticsearch", "tutorial"],
112
"published_date": "2023-12-01",
113
"author": "Jane Developer",
114
"view_count": 0
115
}
116
117
# Index with auto-generated ID
118
doc_id = es.index(doc, "blog", "post")
119
120
# Index with specific ID
121
es.index(doc, "blog", "post", id="tutorial-001")
122
123
# Index with parent-child relationship
124
child_doc = {"comment": "Great tutorial!", "author": "John Reader"}
125
es.index(child_doc, "blog", "comment", parent="tutorial-001")
126
127
# Index with routing for performance
128
es.index(doc, "blog", "post", routing="python-category")
129
130
# Force refresh immediately
131
es.index(doc, "blog", "post", refresh=True)
132
```
133
134
### Retrieve Documents
135
136
```python { .api }
137
def get(self, index, doc_type, id, fields=None, model=None, **query_params):
138
"""
139
Retrieve a document by ID.
140
141
Args:
142
index (str): Index name
143
doc_type (str): Document type
144
id (str): Document ID
145
fields (list, optional): Specific fields to retrieve
146
model (class, optional): Custom model class for result
147
**query_params: Additional parameters (routing, preference, etc.)
148
149
Returns:
150
Document object with metadata
151
152
Raises:
153
DocumentMissingException: If document not found
154
IndexMissingException: If index doesn't exist
155
"""
156
pass
157
158
# Get complete document
159
document = es.get("blog", "post", "tutorial-001")
160
print(f"Title: {document.title}")
161
print(f"Content: {document.content}")
162
print(f"Document ID: {document._meta.id}")
163
print(f"Version: {document._meta.version}")
164
165
# Get specific fields only
166
document = es.get("blog", "post", "tutorial-001", fields=["title", "tags"])
167
168
# Get with routing
169
document = es.get("blog", "post", "tutorial-001", routing="python-category")
170
171
# Get from specific node preference
172
document = es.get("blog", "post", "tutorial-001", preference="_local")
173
```
174
175
### Multi-Get Operations
176
177
```python { .api }
178
def mget(self, ids, index=None, doc_type=None, **query_params):
179
"""
180
Retrieve multiple documents by IDs.
181
182
Args:
183
ids (list): List of document IDs or dicts with index/type/id
184
index (str, optional): Default index name
185
doc_type (str, optional): Default document type
186
**query_params: Additional parameters
187
188
Returns:
189
List of documents (None for missing documents)
190
"""
191
pass
192
193
# Get multiple documents from same index/type
194
docs = es.mget(["tutorial-001", "tutorial-002", "tutorial-003"],
195
index="blog", doc_type="post")
196
197
# Get documents from different indices/types
198
requests = [
199
{"_index": "blog", "_type": "post", "_id": "tutorial-001"},
200
{"_index": "news", "_type": "article", "_id": "news-001"},
201
{"_index": "blog", "_type": "comment", "_id": "comment-001"}
202
]
203
docs = es.mget(requests)
204
205
# Process results
206
for doc in docs:
207
if doc is not None:
208
print(f"Found: {doc.title}")
209
else:
210
print("Document not found")
211
```
212
213
### Update Documents
214
215
```python { .api }
216
def update(self, index, doc_type, id, script=None, lang="mvel",
217
params=None, document=None, upsert=None, **kwargs):
218
"""
219
Update a document using script or partial document.
220
221
Args:
222
index (str): Index name
223
doc_type (str): Document type
224
id (str): Document ID
225
script (str, optional): Update script
226
lang (str): Script language. Default: "mvel"
227
params (dict, optional): Script parameters
228
document (dict, optional): Partial document for update
229
upsert (dict, optional): Document to create if not exists
230
**kwargs: Additional parameters (routing, refresh, etc.)
231
232
Returns:
233
Update result information
234
235
Raises:
236
DocumentMissingException: If document not found and no upsert
237
VersionConflictEngineException: If version conflict occurs
238
"""
239
pass
240
241
def partial_update(self, index, doc_type, id, doc=None, script=None,
242
params=None, **kwargs):
243
"""
244
Partial update of a document.
245
246
Args:
247
index (str): Index name
248
doc_type (str): Document type
249
id (str): Document ID
250
doc (dict, optional): Partial document fields
251
script (str, optional): Update script
252
params (dict, optional): Script parameters
253
**kwargs: Additional parameters
254
255
Returns:
256
Update result information
257
"""
258
pass
259
260
# Script-based update
261
es.update("blog", "post", "tutorial-001",
262
script="ctx._source.view_count += params.increment",
263
params={"increment": 1})
264
265
# Partial document update
266
es.partial_update("blog", "post", "tutorial-001",
267
doc={"tags": ["python", "elasticsearch", "tutorial", "updated"]})
268
269
# Update with upsert (create if doesn't exist)
270
es.update("blog", "post", "new-tutorial",
271
document={"title": "New Tutorial", "content": "Content here"},
272
upsert={"title": "Default Title", "created_date": "2023-12-01"})
273
274
# Conditional update with version
275
es.update("blog", "post", "tutorial-001",
276
document={"status": "published"},
277
version=2) # Only update if current version is 2
278
```
279
280
### Delete Documents
281
282
```python { .api }
283
def delete(self, index, doc_type, id, bulk=False, **query_params):
284
"""
285
Delete a document by ID.
286
287
Args:
288
index (str): Index name
289
doc_type (str): Document type
290
id (str): Document ID
291
bulk (bool): Add to bulk buffer. Default: False
292
**query_params: Additional parameters (routing, refresh, etc.)
293
294
Returns:
295
Deletion result information
296
297
Raises:
298
DocumentMissingException: If document not found
299
"""
300
pass
301
302
def exists(self, index, doc_type, id, **query_params):
303
"""
304
Check if document exists.
305
306
Args:
307
index (str): Index name
308
doc_type (str): Document type
309
id (str): Document ID
310
**query_params: Additional parameters
311
312
Returns:
313
bool: True if document exists, False otherwise
314
"""
315
pass
316
317
# Delete document
318
es.delete("blog", "post", "tutorial-001")
319
320
# Check if document exists before deletion
321
if es.exists("blog", "post", "tutorial-001"):
322
es.delete("blog", "post", "tutorial-001")
323
print("Document deleted")
324
else:
325
print("Document not found")
326
327
# Delete with routing
328
es.delete("blog", "post", "tutorial-001", routing="python-category")
329
330
# Bulk deletion (added to bulk buffer)
331
es.delete("blog", "post", "tutorial-001", bulk=True)
332
es.delete("blog", "post", "tutorial-002", bulk=True)
333
es.flush_bulk() # Process all deletions
334
```
335
336
## Search Operations
337
338
### Basic Search
339
340
```python { .api }
341
def search(self, query, indices=None, doc_types=None, model=None,
342
scan=False, headers=None, **query_params):
343
"""
344
Execute a search query.
345
346
Args:
347
query (Query|dict): Query object or raw query dict
348
indices (list, optional): Indices to search. Uses default_indices if None
349
doc_types (list, optional): Document types to search
350
model (class, optional): Custom model class for results
351
scan (bool): Use scan search for large result sets. Default: False
352
headers (dict, optional): Custom HTTP headers
353
**query_params: Additional parameters (routing, preference, etc.)
354
355
Returns:
356
Search results with hits, facets, and metadata
357
"""
358
pass
359
360
def search_raw(self, query, indices=None, doc_types=None,
361
headers=None, **query_params):
362
"""
363
Execute search and return raw dictionary result.
364
365
Args:
366
query (Query|dict): Query object or raw query dict
367
indices (list, optional): Indices to search
368
doc_types (list, optional): Document types to search
369
headers (dict, optional): Custom HTTP headers
370
**query_params: Additional parameters
371
372
Returns:
373
dict: Raw ElasticSearch response
374
"""
375
pass
376
377
from pyes import Search, TermQuery, BoolQuery, RangeQuery
378
379
# Simple term search
380
query = Search(TermQuery("tags", "python"))
381
results = es.search(query, indices=["blog"])
382
383
# Process results
384
print(f"Total hits: {results.total}")
385
for hit in results:
386
print(f"Title: {hit.title}")
387
print(f"Score: {hit._meta.score}")
388
print(f"Index: {hit._meta.index}")
389
390
# Complex boolean search
391
complex_query = Search(
392
BoolQuery(
393
must=[TermQuery("status", "published")],
394
should=[
395
TermQuery("tags", "python"),
396
TermQuery("tags", "elasticsearch")
397
],
398
must_not=[TermQuery("category", "draft")],
399
filter=RangeQuery("published_date", gte="2023-01-01")
400
)
401
).size(20).sort("published_date", order="desc")
402
403
results = es.search(complex_query, indices=["blog", "news"])
404
405
# Raw search for custom processing
406
raw_query = {
407
"query": {"match": {"title": "python"}},
408
"highlight": {"fields": {"title": {}, "content": {}}}
409
}
410
raw_results = es.search_raw(raw_query, indices=["blog"])
411
```
412
413
### Multi-Search
414
415
```python { .api }
416
def search_multi(self, queries, indices_list=None, doc_types_list=None, **kwargs):
417
"""
418
Execute multiple search queries in a single request.
419
420
Args:
421
queries (list): List of query objects or dicts
422
indices_list (list, optional): List of indices for each query
423
doc_types_list (list, optional): List of doc types for each query
424
**kwargs: Additional parameters
425
426
Returns:
427
list: List of search results for each query
428
"""
429
pass
430
431
# Multiple searches in single request
432
queries = [
433
Search(TermQuery("tags", "python")),
434
Search(TermQuery("tags", "javascript")),
435
Search(RangeQuery("view_count", gte=1000))
436
]
437
438
indices_list = [["blog"], ["blog"], ["blog", "news"]]
439
440
results = es.search_multi(queries, indices_list)
441
442
for i, result in enumerate(results):
443
print(f"Query {i+1}: {result.total} hits")
444
```
445
446
### Scroll Search for Large Results
447
448
```python { .api }
449
def search_scroll(self, scroll_id, scroll="10m"):
450
"""
451
Continue scrolling through search results.
452
453
Args:
454
scroll_id (str): Scroll ID from previous search
455
scroll (str): Scroll timeout. Default: "10m"
456
457
Returns:
458
Next batch of search results
459
"""
460
pass
461
462
# Initial search with scroll
463
query = Search(TermQuery("status", "published")).size(1000)
464
results = es.search(query, indices=["blog"], scroll="5m")
465
466
all_docs = list(results) # First batch
467
468
# Continue scrolling for remaining results
469
while results.total > len(all_docs):
470
results = es.search_scroll(results._scroll_id, scroll="5m")
471
all_docs.extend(results)
472
473
if len(results) == 0: # No more results
474
break
475
476
print(f"Retrieved {len(all_docs)} total documents")
477
478
# Scan search for memory-efficient large result processing
479
query = Search(TermQuery("category", "products"))
480
results = es.search(query, indices=["catalog"], scan=True, scroll="2m", size=100)
481
482
for batch in results:
483
for doc in batch:
484
process_document(doc) # Process each document
485
```
486
487
### Count and Delete by Query
488
489
```python { .api }
490
def count(self, query=None, indices=None, doc_types=None, **query_params):
491
"""
492
Count documents matching query.
493
494
Args:
495
query (Query|dict, optional): Query to count. Counts all if None
496
indices (list, optional): Indices to search
497
doc_types (list, optional): Document types to search
498
**query_params: Additional parameters
499
500
Returns:
501
int: Number of matching documents
502
"""
503
pass
504
505
def delete_by_query(self, indices, doc_types, query, **query_params):
506
"""
507
Delete documents matching query.
508
509
Args:
510
indices (list): Indices to delete from
511
doc_types (list): Document types to delete from
512
query (Query|dict): Query to match documents for deletion
513
**query_params: Additional parameters
514
515
Returns:
516
Deletion result information
517
"""
518
pass
519
520
# Count all documents
521
total_docs = es.count(indices=["blog"])
522
523
# Count with query
524
python_posts = es.count(TermQuery("tags", "python"), indices=["blog"])
525
526
# Delete old documents
527
old_query = RangeQuery("published_date", lt="2020-01-01")
528
deletion_result = es.delete_by_query(["blog"], ["post"], old_query)
529
print(f"Deleted {deletion_result.total} old posts")
530
```
531
532
## Suggestion Operations
533
534
### Auto-completion and Suggestions
535
536
```python { .api }
537
def suggest(self, name, text, field, type='term', size=None, params=None, **kwargs):
538
"""
539
Get suggestions for text.
540
541
Args:
542
name (str): Suggestion name
543
text (str): Text to get suggestions for
544
field (str): Field to suggest on
545
type (str): Suggestion type ('term', 'phrase', 'completion'). Default: 'term'
546
size (int, optional): Number of suggestions to return
547
params (dict, optional): Additional suggestion parameters
548
**kwargs: Additional parameters (indices, etc.)
549
550
Returns:
551
Suggestion results
552
"""
553
pass
554
555
def suggest_from_object(self, suggest, indices=None, preference=None, **kwargs):
556
"""
557
Get suggestions from Suggest object.
558
559
Args:
560
suggest (Suggest): Suggest object with configured suggestions
561
indices (list, optional): Indices to suggest from
562
preference (str, optional): Node preference
563
**kwargs: Additional parameters
564
565
Returns:
566
Suggestion results
567
"""
568
pass
569
570
from pyes import Suggest
571
572
# Term suggestions for typos
573
suggestions = es.suggest("title_suggest", "pythno", "title", type="term")
574
575
# Phrase suggestions
576
phrase_suggestions = es.suggest("content_suggest", "elsticsearch", "content",
577
type="phrase", size=3)
578
579
# Multiple suggestions using Suggest object
580
suggest = Suggest()
581
suggest.add_term("python tutorial", "title_suggest", "title")
582
suggest.add_phrase("elasticsearch guide", "content_suggest", "content")
583
suggest.add_completion("py", "tag_suggest", "tags.suggest")
584
585
all_suggestions = es.suggest_from_object(suggest, indices=["blog"])
586
587
# Process suggestions
588
for suggestion_name, suggestion_results in all_suggestions.items():
589
print(f"Suggestions for {suggestion_name}:")
590
for option in suggestion_results[0].options:
591
print(f" - {option.text} (score: {option.score})")
592
```
593
594
## File Operations
595
596
### File Indexing and Retrieval
597
598
```python { .api }
599
def put_file(self, filename, index, doc_type, id=None, name=None):
600
"""
601
Index a file as attachment.
602
603
Args:
604
filename (str): Path to file to index
605
index (str): Index name
606
doc_type (str): Document type
607
id (str, optional): Document ID. Auto-generated if None
608
name (str, optional): Name for the attachment
609
610
Returns:
611
str: Document ID
612
"""
613
pass
614
615
def get_file(self, index, doc_type, id=None):
616
"""
617
Retrieve an indexed file.
618
619
Args:
620
index (str): Index name
621
doc_type (str): Document type
622
id (str, optional): Document ID
623
624
Returns:
625
File content and metadata
626
"""
627
pass
628
629
# Index PDF file
630
doc_id = es.put_file("/path/to/document.pdf", "documents", "attachment",
631
name="Important Document")
632
633
# Index with metadata
634
import os
635
from pyes import file_to_attachment
636
637
with open("/path/to/document.pdf", "rb") as f:
638
attachment = file_to_attachment(f.read(), "document.pdf")
639
640
doc = {
641
"title": "Important Document",
642
"uploaded_by": "john.doe",
643
"upload_date": "2023-12-01",
644
"file": attachment
645
}
646
647
es.index(doc, "documents", "attachment", id="doc-001")
648
649
# Retrieve file
650
file_doc = es.get_file("documents", "attachment", "doc-001")
651
print(f"File name: {file_doc.file.title}")
652
print(f"File size: {len(file_doc.file.content)} bytes")
653
```
654
655
## Percolator Operations
656
657
### Query Registration and Matching
658
659
```python { .api }
660
def create_percolator(self, index, name, query, **kwargs):
661
"""
662
Create a percolator query.
663
664
Args:
665
index (str): Index name
666
name (str): Percolator name
667
query (Query|dict): Query to register
668
**kwargs: Additional parameters
669
670
Returns:
671
Creation result
672
"""
673
pass
674
675
def delete_percolator(self, index, name):
676
"""
677
Delete a percolator query.
678
679
Args:
680
index (str): Index name
681
name (str): Percolator name
682
683
Returns:
684
Deletion result
685
"""
686
pass
687
688
def percolate(self, index, doc_types, query):
689
"""
690
Test document against registered percolator queries.
691
692
Args:
693
index (str): Index name
694
doc_types (list): Document types
695
query (dict): Document to test
696
697
Returns:
698
Matching percolator queries
699
"""
700
pass
701
702
# Register percolator queries for content filtering
703
python_query = TermQuery("tags", "python")
704
es.create_percolator("blog", "python_posts", python_query)
705
706
tutorial_query = BoolQuery(
707
must=[TermQuery("category", "tutorial")],
708
should=[TermQuery("difficulty", "beginner")]
709
)
710
es.create_percolator("blog", "beginner_tutorials", tutorial_query)
711
712
# Test document against percolators
713
test_doc = {
714
"title": "Python Basics Tutorial",
715
"tags": ["python", "programming"],
716
"category": "tutorial",
717
"difficulty": "beginner"
718
}
719
720
matches = es.percolate("blog", ["post"], {"doc": test_doc})
721
print(f"Matching queries: {[match.id for match in matches]}")
722
# Output: ['python_posts', 'beginner_tutorials']
723
```
724
725
## More Like This
726
727
### Similar Document Discovery
728
729
```python { .api }
730
def morelikethis(self, index, doc_type, id, fields, **query_params):
731
"""
732
Find documents similar to the specified document.
733
734
Args:
735
index (str): Index name
736
doc_type (str): Document type
737
id (str): Document ID to find similar documents for
738
fields (list): Fields to use for similarity calculation
739
**query_params: Additional MLT parameters (min_term_freq, max_query_terms, etc.)
740
741
Returns:
742
Similar documents
743
"""
744
pass
745
746
# Find similar blog posts
747
similar_posts = es.morelikethis(
748
"blog", "post", "tutorial-001",
749
fields=["title", "content", "tags"],
750
min_term_freq=1,
751
max_query_terms=12,
752
min_doc_freq=1,
753
stop_words=["the", "and", "or", "but"]
754
)
755
756
print(f"Found {similar_posts.total} similar posts:")
757
for post in similar_posts:
758
print(f" - {post.title} (score: {post._meta.score})")
759
```
760
761
## Properties and Configuration
762
763
### Dynamic Properties
764
765
```python { .api }
766
@property
767
def mappings(self):
768
"""
769
Get Mapper instance for mapping management.
770
771
Returns:
772
Mapper: Mapping management instance
773
"""
774
pass
775
776
@property
777
def default_indices(self):
778
"""
779
Get default indices for operations.
780
781
Returns:
782
list: Default indices
783
"""
784
pass
785
786
@default_indices.setter
787
def default_indices(self, indices):
788
"""
789
Set default indices for operations.
790
791
Args:
792
indices (list): Default indices to use
793
"""
794
pass
795
796
@property
797
def bulk_size(self):
798
"""
799
Get current bulk operation size.
800
801
Returns:
802
int: Current bulk size
803
"""
804
pass
805
806
@bulk_size.setter
807
def bulk_size(self, size):
808
"""
809
Set bulk operation size.
810
811
Args:
812
size (int): New bulk size
813
"""
814
pass
815
816
# Configure default behavior
817
es.default_indices = ["blog", "news"] # Default to these indices
818
es.bulk_size = 1000 # Process 1000 operations per bulk
819
820
# Access mapping management
821
mapping = es.mappings
822
mapping.create_index_if_missing("new_index")
823
824
# Search using default indices (no need to specify)
825
results = es.search(TermQuery("status", "published")) # Uses default_indices
826
```
827
828
## Error Handling
829
830
### Exception Management
831
832
```python { .api }
833
from pyes import (
834
ElasticSearchException, IndexMissingException,
835
DocumentMissingException, DocumentAlreadyExistsException,
836
VersionConflictEngineException, BulkOperationException,
837
NoServerAvailable
838
)
839
840
try:
841
# Document operations with error handling
842
doc_id = es.index(document, "blog", "post", id="existing-id")
843
844
except DocumentAlreadyExistsException:
845
# Handle duplicate document
846
print("Document already exists, updating instead")
847
es.update("blog", "post", "existing-id", document=document)
848
849
except IndexMissingException:
850
# Create index and retry
851
print("Index missing, creating index")
852
es.indices.create_index("blog")
853
doc_id = es.index(document, "blog", "post", id="existing-id")
854
855
except VersionConflictEngineException as e:
856
# Handle version conflicts
857
print(f"Version conflict: {e}")
858
current_doc = es.get("blog", "post", "existing-id")
859
# Resolve conflict and retry with current version
860
861
except NoServerAvailable:
862
# Handle connection failures
863
print("No ElasticSearch servers available")
864
# Implement fallback or retry logic
865
866
except ElasticSearchException as e:
867
# Handle general ES exceptions
868
print(f"ElasticSearch error: {e}")
869
```
870
871
## Best Practices
872
873
### Performance Optimization
874
875
```python { .api }
876
# Connection pooling for concurrent applications
877
import threading
878
879
class ESConnectionPool:
880
def __init__(self, servers, pool_size=10):
881
self.servers = servers
882
self.pool = []
883
self.lock = threading.Lock()
884
885
for _ in range(pool_size):
886
es = ES(servers, timeout=30, max_retries=3)
887
self.pool.append(es)
888
889
def get_connection(self):
890
with self.lock:
891
if self.pool:
892
return self.pool.pop()
893
else:
894
return ES(self.servers)
895
896
def return_connection(self, es):
897
with self.lock:
898
self.pool.append(es)
899
900
# Use connection pool
901
pool = ESConnectionPool(['server1:9200', 'server2:9200'])
902
903
def worker_function():
904
es = pool.get_connection()
905
try:
906
# Perform operations
907
results = es.search(query, indices=["data"])
908
# Process results
909
finally:
910
pool.return_connection(es)
911
```
912
913
### Bulk Processing Patterns
914
915
```python { .api }
916
# Efficient bulk processing with error handling
917
def bulk_index_documents(es, documents, index, doc_type):
918
"""Efficiently index large numbers of documents."""
919
920
es.bulk_size = 1000 # Optimize batch size
921
failed_docs = []
922
923
try:
924
for i, doc in enumerate(documents):
925
try:
926
es.index(doc, index, doc_type, bulk=True)
927
928
# Force flush every 10,000 documents
929
if i % 10000 == 0:
930
es.flush_bulk()
931
932
except Exception as e:
933
failed_docs.append((i, doc, str(e)))
934
935
# Final flush
936
es.flush_bulk()
937
938
except BulkOperationException as e:
939
# Handle bulk operation failures
940
for failure in e.errors:
941
print(f"Bulk failure: {failure}")
942
943
return failed_docs
944
945
# Usage
946
documents = [{"title": f"Doc {i}", "content": f"Content {i}"} for i in range(50000)]
947
failures = bulk_index_documents(es, documents, "bulk_index", "document")
948
949
if failures:
950
print(f"Failed to index {len(failures)} documents")
951
# Implement retry logic for failures
952
```
953
954
The ES client provides comprehensive functionality for all ElasticSearch operations with robust error handling, flexible configuration options, and performance optimization features.