or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mdcore-client.mddocument-modeling.mddsl-queries.mdhelper-functions.mdindex.mdnamespaced-apis.mdplugin-apis.md

async-operations.mddocs/

0

# Async Operations

1

2

Asynchronous client operations using Python's asyncio for high-performance applications requiring concurrent OpenSearch operations. The async client provides the same API as the synchronous client but with async/await support.

3

4

## Capabilities

5

6

### Async Client

7

8

Main asynchronous client class with same methods as synchronous client.

9

10

```python { .api }

11

class AsyncOpenSearch:

12

def __init__(self, hosts=None, **kwargs):

13

"""

14

Initialize async OpenSearch client.

15

16

Parameters: Same as OpenSearch client

17

"""

18

19

async def ping(self, **kwargs):

20

"""Test connection to the cluster (async)."""

21

22

async def info(self, **kwargs):

23

"""Get basic cluster information (async)."""

24

25

async def search(self, index=None, body=None, **kwargs):

26

"""Execute search query (async)."""

27

28

async def index(self, index, body, id=None, **kwargs):

29

"""Index a document (async)."""

30

31

async def get(self, index, id, **kwargs):

32

"""Retrieve document by ID (async)."""

33

34

async def update(self, index, id, body, **kwargs):

35

"""Update document (async)."""

36

37

async def delete(self, index, id, **kwargs):

38

"""Delete document (async)."""

39

40

async def bulk(self, body, index=None, **kwargs):

41

"""Bulk operations (async)."""

42

43

async def count(self, index=None, body=None, **kwargs):

44

"""Count documents (async)."""

45

46

async def scroll(self, scroll_id, scroll='5m', **kwargs):

47

"""Continue scrolling (async)."""

48

49

async def clear_scroll(self, scroll_id=None, **kwargs):

50

"""Clear scroll context (async)."""

51

52

async def close(self):

53

"""Close the client and cleanup resources."""

54

```

55

56

### Async Helper Functions

57

58

Asynchronous versions of helper functions for bulk operations and scanning.

59

60

```python { .api }

61

async def async_bulk(client, actions, **kwargs):

62

"""

63

Async bulk operations.

64

65

Parameters: Same as sync bulk() function

66

67

Returns:

68

Tuple of (success_count, failed_operations)

69

"""

70

71

async def async_streaming_bulk(client, actions, **kwargs):

72

"""

73

Async streaming bulk operations.

74

75

Parameters: Same as sync streaming_bulk() function

76

77

Async yields:

78

Tuples of (success, action_result) for each operation

79

"""

80

81

async def async_scan(client, query=None, scroll='5m', **kwargs):

82

"""

83

Async scan for large result sets.

84

85

Parameters: Same as sync scan() function

86

87

Async yields:

88

Individual document hits from search results

89

"""

90

91

async def async_reindex(client, source_index, target_index, **kwargs):

92

"""

93

Async reindexing operations.

94

95

Parameters: Same as sync reindex() function

96

97

Returns:

98

Tuple of (success_count, failed_operations)

99

"""

100

```

101

102

## Usage Examples

103

104

### Basic Async Client Usage

105

106

```python

107

import asyncio

108

from opensearchpy import AsyncOpenSearch

109

110

async def basic_async_example():

111

# Create async client

112

client = AsyncOpenSearch(

113

hosts=[{'host': 'localhost', 'port': 9200}],

114

use_ssl=True,

115

verify_certs=False

116

)

117

118

try:

119

# Test connection

120

response = await client.ping()

121

print(f"Connection successful: {response}")

122

123

# Get cluster info

124

info = await client.info()

125

print(f"Cluster: {info['cluster_name']}")

126

127

# Index a document

128

doc = {

129

'title': 'Async Document',

130

'content': 'This document was indexed asynchronously',

131

'timestamp': '2024-01-01T00:00:00Z'

132

}

133

134

result = await client.index(

135

index='async-index',

136

id='async-doc-1',

137

body=doc

138

)

139

print(f"Document indexed: {result['result']}")

140

141

# Search for documents

142

search_body = {

143

'query': {

144

'match': {

145

'title': 'Async'

146

}

147

}

148

}

149

150

search_result = await client.search(

151

index='async-index',

152

body=search_body

153

)

154

155

print(f"Found {search_result['hits']['total']['value']} documents")

156

157

finally:

158

# Always close the client

159

await client.close()

160

161

# Run async example

162

asyncio.run(basic_async_example())

163

```

164

165

### Concurrent Operations

166

167

```python

168

import asyncio

169

from opensearchpy import AsyncOpenSearch

170

171

async def concurrent_operations_example():

172

client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

173

174

try:

175

# Create multiple coroutines

176

tasks = []

177

178

# Index multiple documents concurrently

179

for i in range(10):

180

doc = {

181

'id': i,

182

'title': f'Concurrent Document {i}',

183

'content': f'Content for document {i}'

184

}

185

186

task = client.index(

187

index='concurrent-index',

188

id=str(i),

189

body=doc

190

)

191

tasks.append(task)

192

193

# Execute all indexing operations concurrently

194

results = await asyncio.gather(*tasks, return_exceptions=True)

195

196

successful = 0

197

failed = 0

198

199

for result in results:

200

if isinstance(result, Exception):

201

failed += 1

202

print(f"Failed: {result}")

203

else:

204

successful += 1

205

206

print(f"Concurrent indexing: {successful} successful, {failed} failed")

207

208

finally:

209

await client.close()

210

211

asyncio.run(concurrent_operations_example())

212

```

213

214

### Async Bulk Operations

215

216

```python

217

import asyncio

218

from opensearchpy import AsyncOpenSearch

219

from opensearchpy.helpers import async_bulk

220

221

async def async_bulk_example():

222

client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

223

224

try:

225

# Prepare bulk actions

226

actions = [

227

{

228

'_op_type': 'index',

229

'_index': 'bulk-async-index',

230

'_id': str(i),

231

'_source': {

232

'title': f'Bulk Document {i}',

233

'value': i * 10,

234

'category': 'async-bulk'

235

}

236

}

237

for i in range(1000)

238

]

239

240

# Execute async bulk operations

241

success_count, failed_ops = await async_bulk(

242

client,

243

actions,

244

chunk_size=100,

245

max_retries=3

246

)

247

248

print(f"Bulk operation: {success_count} successful")

249

if failed_ops:

250

print(f"Failed operations: {len(failed_ops)}")

251

252

finally:

253

await client.close()

254

255

asyncio.run(async_bulk_example())

256

```

257

258

### Async Streaming Bulk

259

260

```python

261

import asyncio

262

from opensearchpy import AsyncOpenSearch

263

from opensearchpy.helpers import async_streaming_bulk

264

265

async def async_streaming_bulk_example():

266

client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

267

268

async def generate_docs():

269

"""Async generator for documents."""

270

for i in range(500):

271

yield {

272

'_op_type': 'index',

273

'_index': 'streaming-async-index',

274

'_id': str(i),

275

'_source': {

276

'title': f'Streaming Document {i}',

277

'timestamp': '2024-01-01T00:00:00Z',

278

'value': i

279

}

280

}

281

282

# Simulate async data processing

283

if i % 50 == 0:

284

await asyncio.sleep(0.1)

285

286

try:

287

processed = 0

288

errors = []

289

290

# Stream bulk operations

291

async for success, info in async_streaming_bulk(

292

client,

293

generate_docs(),

294

chunk_size=50

295

):

296

if success:

297

processed += 1

298

else:

299

errors.append(info)

300

301

if processed % 100 == 0:

302

print(f"Processed: {processed}, Errors: {len(errors)}")

303

304

print(f"Streaming bulk completed: {processed} processed, {len(errors)} errors")

305

306

finally:

307

await client.close()

308

309

asyncio.run(async_streaming_bulk_example())

310

```

311

312

### Async Scanning

313

314

```python

315

import asyncio

316

from opensearchpy import AsyncOpenSearch

317

from opensearchpy.helpers import async_scan

318

319

async def async_scan_example():

320

client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

321

322

try:

323

query = {

324

'query': {

325

'range': {

326

'value': {

327

'gte': 0,

328

'lt': 1000

329

}

330

}

331

},

332

'sort': ['_doc']

333

}

334

335

processed_count = 0

336

337

# Async scan through large result set

338

async for doc in async_scan(

339

client,

340

query=query,

341

index='large-async-index',

342

size=100,

343

scroll='5m'

344

):

345

# Process each document

346

processed_count += 1

347

348

# Log progress

349

if processed_count % 1000 == 0:

350

print(f"Scanned {processed_count} documents")

351

352

print(f"Async scan completed: {processed_count} documents processed")

353

354

finally:

355

await client.close()

356

357

asyncio.run(async_scan_example())

358

```

359

360

### Async Context Manager

361

362

```python

363

import asyncio

364

from opensearchpy import AsyncOpenSearch

365

366

class AsyncOpenSearchManager:

367

def __init__(self, **kwargs):

368

self.client_kwargs = kwargs

369

self.client = None

370

371

async def __aenter__(self):

372

self.client = AsyncOpenSearch(**self.client_kwargs)

373

return self.client

374

375

async def __aexit__(self, exc_type, exc_val, exc_tb):

376

if self.client:

377

await self.client.close()

378

379

async def context_manager_example():

380

# Use async context manager for automatic cleanup

381

async with AsyncOpenSearchManager(

382

hosts=[{'host': 'localhost', 'port': 9200}]

383

) as client:

384

385

# Perform operations

386

info = await client.info()

387

print(f"Connected to: {info['cluster_name']}")

388

389

# Index some documents

390

tasks = []

391

for i in range(5):

392

task = client.index(

393

index='context-manager-index',

394

id=str(i),

395

body={'value': i, 'title': f'Document {i}'}

396

)

397

tasks.append(task)

398

399

results = await asyncio.gather(*tasks)

400

print(f"Indexed {len(results)} documents")

401

402

# Search

403

search_result = await client.search(

404

index='context-manager-index',

405

body={'query': {'match_all': {}}}

406

)

407

408

print(f"Found {search_result['hits']['total']['value']} documents")

409

410

# Client is automatically closed here

411

print("Client closed automatically")

412

413

asyncio.run(context_manager_example())

414

```

415

416

### Async with Connection Pool

417

418

```python

419

import asyncio

420

from opensearchpy import AsyncOpenSearch

421

422

async def connection_pool_example():

423

# Client with connection pool for high concurrency

424

client = AsyncOpenSearch(

425

hosts=[

426

{'host': 'node1.cluster.com', 'port': 9200},

427

{'host': 'node2.cluster.com', 'port': 9200},

428

{'host': 'node3.cluster.com', 'port': 9200}

429

],

430

# Connection pool settings

431

maxsize=20, # Maximum connections per host

432

# Health checking

433

sniff_on_start=True,

434

sniff_on_connection_fail=True,

435

sniffer_timeout=60,

436

# Retry settings

437

max_retries=3,

438

retry_on_timeout=True

439

)

440

441

try:

442

# Create many concurrent tasks

443

tasks = []

444

445

for i in range(100):

446

# Mix of different operations

447

if i % 3 == 0:

448

task = client.index(

449

index='pool-index',

450

id=str(i),

451

body={'value': i}

452

)

453

elif i % 3 == 1:

454

task = client.search(

455

index='pool-index',

456

body={'query': {'match_all': {}}}

457

)

458

else:

459

task = client.count(index='pool-index')

460

461

tasks.append(task)

462

463

# Execute all tasks concurrently

464

start_time = asyncio.get_event_loop().time()

465

results = await asyncio.gather(*tasks, return_exceptions=True)

466

end_time = asyncio.get_event_loop().time()

467

468

successful = sum(1 for r in results if not isinstance(r, Exception))

469

failed = len(results) - successful

470

471

print(f"Concurrent operations with pool:")

472

print(f" Total: {len(results)}")

473

print(f" Successful: {successful}")

474

print(f" Failed: {failed}")

475

print(f" Time: {end_time - start_time:.2f} seconds")

476

477

finally:

478

await client.close()

479

480

asyncio.run(connection_pool_example())

481

```

482

483

### Async Error Handling

484

485

```python

486

import asyncio

487

from opensearchpy import AsyncOpenSearch

488

from opensearchpy.exceptions import (

489

NotFoundError,

490

RequestError,

491

ConnectionError,

492

TransportError

493

)

494

495

async def error_handling_example():

496

client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

497

498

try:

499

# Example of handling different types of errors

500

tasks = []

501

502

# This will succeed

503

tasks.append(client.index(

504

index='error-test',

505

id='success',

506

body={'status': 'success'}

507

))

508

509

# This might fail with validation error

510

tasks.append(client.index(

511

index='error-test',

512

id='malformed',

513

body={'date_field': 'invalid-date-format'}

514

))

515

516

# Try to get non-existent document

517

async def get_nonexistent():

518

try:

519

return await client.get(index='error-test', id='nonexistent')

520

except NotFoundError:

521

return {'error': 'Document not found'}

522

523

tasks.append(get_nonexistent())

524

525

# Execute with error handling

526

results = await asyncio.gather(*tasks, return_exceptions=True)

527

528

for i, result in enumerate(results):

529

if isinstance(result, Exception):

530

print(f"Task {i} failed: {type(result).__name__}: {result}")

531

else:

532

print(f"Task {i} succeeded")

533

534

except ConnectionError as e:

535

print(f"Connection failed: {e}")

536

537

except TransportError as e:

538

print(f"Transport error: {e}")

539

540

finally:

541

await client.close()

542

543

asyncio.run(error_handling_example())

544

```

545

546

### Performance Comparison

547

548

```python

549

import asyncio

550

import time

551

from opensearchpy import OpenSearch, AsyncOpenSearch

552

553

async def performance_comparison():

554

# Synchronous client

555

sync_client = OpenSearch([{'host': 'localhost', 'port': 9200}])

556

557

# Asynchronous client

558

async_client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

559

560

num_operations = 50

561

562

try:

563

# Synchronous operations

564

print("Running synchronous operations...")

565

sync_start = time.time()

566

567

for i in range(num_operations):

568

sync_client.index(

569

index='perf-test',

570

id=f'sync-{i}',

571

body={'value': i, 'type': 'sync'}

572

)

573

574

sync_end = time.time()

575

sync_duration = sync_end - sync_start

576

577

# Asynchronous operations

578

print("Running asynchronous operations...")

579

async_start = time.time()

580

581

tasks = [

582

async_client.index(

583

index='perf-test',

584

id=f'async-{i}',

585

body={'value': i, 'type': 'async'}

586

)

587

for i in range(num_operations)

588

]

589

590

await asyncio.gather(*tasks)

591

592

async_end = time.time()

593

async_duration = async_end - async_start

594

595

# Results

596

print(f"\nPerformance comparison ({num_operations} operations):")

597

print(f" Synchronous: {sync_duration:.2f} seconds")

598

print(f" Asynchronous: {async_duration:.2f} seconds")

599

print(f" Speedup: {sync_duration / async_duration:.2f}x")

600

601

finally:

602

await async_client.close()

603

604

asyncio.run(performance_comparison())

605

```