0
# Async Operations
1
2
Asynchronous client operations using Python's asyncio for high-performance applications requiring concurrent OpenSearch operations. The async client provides the same API as the synchronous client but with async/await support.
3
4
## Capabilities
5
6
### Async Client
7
8
Main asynchronous client class with same methods as synchronous client.
9
10
```python { .api }
11
class AsyncOpenSearch:
12
def __init__(self, hosts=None, **kwargs):
13
"""
14
Initialize async OpenSearch client.
15
16
Parameters: Same as OpenSearch client
17
"""
18
19
async def ping(self, **kwargs):
20
"""Test connection to the cluster (async)."""
21
22
async def info(self, **kwargs):
23
"""Get basic cluster information (async)."""
24
25
async def search(self, index=None, body=None, **kwargs):
26
"""Execute search query (async)."""
27
28
async def index(self, index, body, id=None, **kwargs):
29
"""Index a document (async)."""
30
31
async def get(self, index, id, **kwargs):
32
"""Retrieve document by ID (async)."""
33
34
async def update(self, index, id, body, **kwargs):
35
"""Update document (async)."""
36
37
async def delete(self, index, id, **kwargs):
38
"""Delete document (async)."""
39
40
async def bulk(self, body, index=None, **kwargs):
41
"""Bulk operations (async)."""
42
43
async def count(self, index=None, body=None, **kwargs):
44
"""Count documents (async)."""
45
46
async def scroll(self, scroll_id, scroll='5m', **kwargs):
47
"""Continue scrolling (async)."""
48
49
async def clear_scroll(self, scroll_id=None, **kwargs):
50
"""Clear scroll context (async)."""
51
52
async def close(self):
53
"""Close the client and cleanup resources."""
54
```
55
56
### Async Helper Functions
57
58
Asynchronous versions of helper functions for bulk operations and scanning.
59
60
```python { .api }
61
async def async_bulk(client, actions, **kwargs):
62
"""
63
Async bulk operations.
64
65
Parameters: Same as sync bulk() function
66
67
Returns:
68
Tuple of (success_count, failed_operations)
69
"""
70
71
async def async_streaming_bulk(client, actions, **kwargs):
72
"""
73
Async streaming bulk operations.
74
75
Parameters: Same as sync streaming_bulk() function
76
77
Async yields:
78
Tuples of (success, action_result) for each operation
79
"""
80
81
async def async_scan(client, query=None, scroll='5m', **kwargs):
82
"""
83
Async scan for large result sets.
84
85
Parameters: Same as sync scan() function
86
87
Async yields:
88
Individual document hits from search results
89
"""
90
91
async def async_reindex(client, source_index, target_index, **kwargs):
92
"""
93
Async reindexing operations.
94
95
Parameters: Same as sync reindex() function
96
97
Returns:
98
Tuple of (success_count, failed_operations)
99
"""
100
```
101
102
## Usage Examples
103
104
### Basic Async Client Usage
105
106
```python
107
import asyncio
108
from opensearchpy import AsyncOpenSearch
109
110
async def basic_async_example():
111
# Create async client
112
client = AsyncOpenSearch(
113
hosts=[{'host': 'localhost', 'port': 9200}],
114
use_ssl=True,
115
verify_certs=False
116
)
117
118
try:
119
# Test connection
120
response = await client.ping()
121
print(f"Connection successful: {response}")
122
123
# Get cluster info
124
info = await client.info()
125
print(f"Cluster: {info['cluster_name']}")
126
127
# Index a document
128
doc = {
129
'title': 'Async Document',
130
'content': 'This document was indexed asynchronously',
131
'timestamp': '2024-01-01T00:00:00Z'
132
}
133
134
result = await client.index(
135
index='async-index',
136
id='async-doc-1',
137
body=doc
138
)
139
print(f"Document indexed: {result['result']}")
140
141
# Search for documents
142
search_body = {
143
'query': {
144
'match': {
145
'title': 'Async'
146
}
147
}
148
}
149
150
search_result = await client.search(
151
index='async-index',
152
body=search_body
153
)
154
155
print(f"Found {search_result['hits']['total']['value']} documents")
156
157
finally:
158
# Always close the client
159
await client.close()
160
161
# Run async example
162
asyncio.run(basic_async_example())
163
```
164
165
### Concurrent Operations
166
167
```python
168
import asyncio
169
from opensearchpy import AsyncOpenSearch
170
171
async def concurrent_operations_example():
172
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
173
174
try:
175
# Create multiple coroutines
176
tasks = []
177
178
# Index multiple documents concurrently
179
for i in range(10):
180
doc = {
181
'id': i,
182
'title': f'Concurrent Document {i}',
183
'content': f'Content for document {i}'
184
}
185
186
task = client.index(
187
index='concurrent-index',
188
id=str(i),
189
body=doc
190
)
191
tasks.append(task)
192
193
# Execute all indexing operations concurrently
194
results = await asyncio.gather(*tasks, return_exceptions=True)
195
196
successful = 0
197
failed = 0
198
199
for result in results:
200
if isinstance(result, Exception):
201
failed += 1
202
print(f"Failed: {result}")
203
else:
204
successful += 1
205
206
print(f"Concurrent indexing: {successful} successful, {failed} failed")
207
208
finally:
209
await client.close()
210
211
asyncio.run(concurrent_operations_example())
212
```
213
214
### Async Bulk Operations
215
216
```python
217
import asyncio
218
from opensearchpy import AsyncOpenSearch
219
from opensearchpy.helpers import async_bulk
220
221
async def async_bulk_example():
222
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
223
224
try:
225
# Prepare bulk actions
226
actions = [
227
{
228
'_op_type': 'index',
229
'_index': 'bulk-async-index',
230
'_id': str(i),
231
'_source': {
232
'title': f'Bulk Document {i}',
233
'value': i * 10,
234
'category': 'async-bulk'
235
}
236
}
237
for i in range(1000)
238
]
239
240
# Execute async bulk operations
241
success_count, failed_ops = await async_bulk(
242
client,
243
actions,
244
chunk_size=100,
245
max_retries=3
246
)
247
248
print(f"Bulk operation: {success_count} successful")
249
if failed_ops:
250
print(f"Failed operations: {len(failed_ops)}")
251
252
finally:
253
await client.close()
254
255
asyncio.run(async_bulk_example())
256
```
257
258
### Async Streaming Bulk
259
260
```python
261
import asyncio
262
from opensearchpy import AsyncOpenSearch
263
from opensearchpy.helpers import async_streaming_bulk
264
265
async def async_streaming_bulk_example():
266
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
267
268
async def generate_docs():
269
"""Async generator for documents."""
270
for i in range(500):
271
yield {
272
'_op_type': 'index',
273
'_index': 'streaming-async-index',
274
'_id': str(i),
275
'_source': {
276
'title': f'Streaming Document {i}',
277
'timestamp': '2024-01-01T00:00:00Z',
278
'value': i
279
}
280
}
281
282
# Simulate async data processing
283
if i % 50 == 0:
284
await asyncio.sleep(0.1)
285
286
try:
287
processed = 0
288
errors = []
289
290
# Stream bulk operations
291
async for success, info in async_streaming_bulk(
292
client,
293
generate_docs(),
294
chunk_size=50
295
):
296
if success:
297
processed += 1
298
else:
299
errors.append(info)
300
301
if processed % 100 == 0:
302
print(f"Processed: {processed}, Errors: {len(errors)}")
303
304
print(f"Streaming bulk completed: {processed} processed, {len(errors)} errors")
305
306
finally:
307
await client.close()
308
309
asyncio.run(async_streaming_bulk_example())
310
```
311
312
### Async Scanning
313
314
```python
315
import asyncio
316
from opensearchpy import AsyncOpenSearch
317
from opensearchpy.helpers import async_scan
318
319
async def async_scan_example():
320
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
321
322
try:
323
query = {
324
'query': {
325
'range': {
326
'value': {
327
'gte': 0,
328
'lt': 1000
329
}
330
}
331
},
332
'sort': ['_doc']
333
}
334
335
processed_count = 0
336
337
# Async scan through large result set
338
async for doc in async_scan(
339
client,
340
query=query,
341
index='large-async-index',
342
size=100,
343
scroll='5m'
344
):
345
# Process each document
346
processed_count += 1
347
348
# Log progress
349
if processed_count % 1000 == 0:
350
print(f"Scanned {processed_count} documents")
351
352
print(f"Async scan completed: {processed_count} documents processed")
353
354
finally:
355
await client.close()
356
357
asyncio.run(async_scan_example())
358
```
359
360
### Async Context Manager
361
362
```python
363
import asyncio
364
from opensearchpy import AsyncOpenSearch
365
366
class AsyncOpenSearchManager:
367
def __init__(self, **kwargs):
368
self.client_kwargs = kwargs
369
self.client = None
370
371
async def __aenter__(self):
372
self.client = AsyncOpenSearch(**self.client_kwargs)
373
return self.client
374
375
async def __aexit__(self, exc_type, exc_val, exc_tb):
376
if self.client:
377
await self.client.close()
378
379
async def context_manager_example():
380
# Use async context manager for automatic cleanup
381
async with AsyncOpenSearchManager(
382
hosts=[{'host': 'localhost', 'port': 9200}]
383
) as client:
384
385
# Perform operations
386
info = await client.info()
387
print(f"Connected to: {info['cluster_name']}")
388
389
# Index some documents
390
tasks = []
391
for i in range(5):
392
task = client.index(
393
index='context-manager-index',
394
id=str(i),
395
body={'value': i, 'title': f'Document {i}'}
396
)
397
tasks.append(task)
398
399
results = await asyncio.gather(*tasks)
400
print(f"Indexed {len(results)} documents")
401
402
# Search
403
search_result = await client.search(
404
index='context-manager-index',
405
body={'query': {'match_all': {}}}
406
)
407
408
print(f"Found {search_result['hits']['total']['value']} documents")
409
410
# Client is automatically closed here
411
print("Client closed automatically")
412
413
asyncio.run(context_manager_example())
414
```
415
416
### Async with Connection Pool
417
418
```python
419
import asyncio
420
from opensearchpy import AsyncOpenSearch
421
422
async def connection_pool_example():
423
# Client with connection pool for high concurrency
424
client = AsyncOpenSearch(
425
hosts=[
426
{'host': 'node1.cluster.com', 'port': 9200},
427
{'host': 'node2.cluster.com', 'port': 9200},
428
{'host': 'node3.cluster.com', 'port': 9200}
429
],
430
# Connection pool settings
431
maxsize=20, # Maximum connections per host
432
# Health checking
433
sniff_on_start=True,
434
sniff_on_connection_fail=True,
435
sniffer_timeout=60,
436
# Retry settings
437
max_retries=3,
438
retry_on_timeout=True
439
)
440
441
try:
442
# Create many concurrent tasks
443
tasks = []
444
445
for i in range(100):
446
# Mix of different operations
447
if i % 3 == 0:
448
task = client.index(
449
index='pool-index',
450
id=str(i),
451
body={'value': i}
452
)
453
elif i % 3 == 1:
454
task = client.search(
455
index='pool-index',
456
body={'query': {'match_all': {}}}
457
)
458
else:
459
task = client.count(index='pool-index')
460
461
tasks.append(task)
462
463
# Execute all tasks concurrently
464
start_time = asyncio.get_event_loop().time()
465
results = await asyncio.gather(*tasks, return_exceptions=True)
466
end_time = asyncio.get_event_loop().time()
467
468
successful = sum(1 for r in results if not isinstance(r, Exception))
469
failed = len(results) - successful
470
471
print(f"Concurrent operations with pool:")
472
print(f" Total: {len(results)}")
473
print(f" Successful: {successful}")
474
print(f" Failed: {failed}")
475
print(f" Time: {end_time - start_time:.2f} seconds")
476
477
finally:
478
await client.close()
479
480
asyncio.run(connection_pool_example())
481
```
482
483
### Async Error Handling
484
485
```python
486
import asyncio
487
from opensearchpy import AsyncOpenSearch
488
from opensearchpy.exceptions import (
489
NotFoundError,
490
RequestError,
491
ConnectionError,
492
TransportError
493
)
494
495
async def error_handling_example():
496
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
497
498
try:
499
# Example of handling different types of errors
500
tasks = []
501
502
# This will succeed
503
tasks.append(client.index(
504
index='error-test',
505
id='success',
506
body={'status': 'success'}
507
))
508
509
# This might fail with validation error
510
tasks.append(client.index(
511
index='error-test',
512
id='malformed',
513
body={'date_field': 'invalid-date-format'}
514
))
515
516
# Try to get non-existent document
517
async def get_nonexistent():
518
try:
519
return await client.get(index='error-test', id='nonexistent')
520
except NotFoundError:
521
return {'error': 'Document not found'}
522
523
tasks.append(get_nonexistent())
524
525
# Execute with error handling
526
results = await asyncio.gather(*tasks, return_exceptions=True)
527
528
for i, result in enumerate(results):
529
if isinstance(result, Exception):
530
print(f"Task {i} failed: {type(result).__name__}: {result}")
531
else:
532
print(f"Task {i} succeeded")
533
534
except ConnectionError as e:
535
print(f"Connection failed: {e}")
536
537
except TransportError as e:
538
print(f"Transport error: {e}")
539
540
finally:
541
await client.close()
542
543
asyncio.run(error_handling_example())
544
```
545
546
### Performance Comparison
547
548
```python
549
import asyncio
550
import time
551
from opensearchpy import OpenSearch, AsyncOpenSearch
552
553
async def performance_comparison():
554
# Synchronous client
555
sync_client = OpenSearch([{'host': 'localhost', 'port': 9200}])
556
557
# Asynchronous client
558
async_client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
559
560
num_operations = 50
561
562
try:
563
# Synchronous operations
564
print("Running synchronous operations...")
565
sync_start = time.time()
566
567
for i in range(num_operations):
568
sync_client.index(
569
index='perf-test',
570
id=f'sync-{i}',
571
body={'value': i, 'type': 'sync'}
572
)
573
574
sync_end = time.time()
575
sync_duration = sync_end - sync_start
576
577
# Asynchronous operations
578
print("Running asynchronous operations...")
579
async_start = time.time()
580
581
tasks = [
582
async_client.index(
583
index='perf-test',
584
id=f'async-{i}',
585
body={'value': i, 'type': 'async'}
586
)
587
for i in range(num_operations)
588
]
589
590
await asyncio.gather(*tasks)
591
592
async_end = time.time()
593
async_duration = async_end - async_start
594
595
# Results
596
print(f"\nPerformance comparison ({num_operations} operations):")
597
print(f" Synchronous: {sync_duration:.2f} seconds")
598
print(f" Asynchronous: {async_duration:.2f} seconds")
599
print(f" Speedup: {sync_duration / async_duration:.2f}x")
600
601
finally:
602
await async_client.close()
603
604
asyncio.run(performance_comparison())
605
```