or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mdcore-client.mddocument-modeling.mddsl-queries.mdhelper-functions.mdindex.mdnamespaced-apis.mdplugin-apis.md

helper-functions.mddocs/

0

# Helper Functions

1

2

High-level utility functions for common operations like bulk indexing, scanning large result sets, and data reindexing with built-in error handling and performance optimizations. These functions provide simplified interfaces for complex operations.

3

4

## Capabilities

5

6

### Bulk Operations

7

8

Efficient bulk processing for high-throughput document operations.

9

10

```python { .api }

11

def bulk(client, actions, index=None, doc_type=None, **kwargs):

12

"""

13

Perform bulk indexing, updating, and deleting operations.

14

15

Parameters:

16

- client: OpenSearch client instance

17

- actions: Iterable of action dictionaries or generator

18

- index (str, optional): Default index for actions without _index

19

- doc_type (str, optional): Default document type (deprecated)

20

- chunk_size (int, optional): Number of docs per chunk (default: 500)

21

- max_chunk_bytes (int, optional): Maximum size per chunk in bytes

22

- thread_count (int, optional): Number of parallel threads (default: 4)

23

- queue_size (int, optional): Size of the task queue (default: 4)

24

- refresh (str/bool, optional): Refresh policy for operations

25

- timeout (str, optional): Request timeout

26

- max_retries (int, optional): Maximum number of retries (default: 0)

27

- initial_backoff (int, optional): Initial backoff time in seconds (default: 2)

28

- max_backoff (int, optional): Maximum backoff time in seconds (default: 600)

29

- yield_ok (bool, optional): Yield successful operations (default: True)

30

31

Action format:

32

{

33

'_op_type': 'index', # 'index', 'create', 'update', 'delete'

34

'_index': 'my-index',

35

'_id': 'doc-id',

36

'_source': {'field': 'value'} # For index/create/update

37

}

38

39

Returns:

40

Iterator yielding tuples of (success_count, failed_actions)

41

42

Raises:

43

BulkIndexError: If there are failed operations and errors are not ignored

44

"""

45

46

def async_bulk(client, actions, **kwargs):

47

"""

48

Asynchronous version of bulk operations.

49

50

Parameters: Same as bulk() function

51

52

Returns:

53

Async iterator yielding operation results

54

"""

55

56

def streaming_bulk(client, actions, **kwargs):

57

"""

58

Streaming bulk operations that yield results as they complete.

59

60

Parameters: Same as bulk() function

61

62

Yields:

63

Tuples of (success, action_result) for each operation

64

"""

65

66

def async_streaming_bulk(client, actions, **kwargs):

67

"""

68

Asynchronous streaming bulk operations.

69

70

Parameters: Same as bulk() function

71

72

Async yields:

73

Tuples of (success, action_result) for each operation

74

"""

75

76

def parallel_bulk(client, actions, thread_count=4, **kwargs):

77

"""

78

Parallel bulk operations using threading for improved performance.

79

80

Parameters:

81

- client: OpenSearch client instance

82

- actions: Iterable of action dictionaries

83

- thread_count (int): Number of parallel threads (default: 4)

84

- Other parameters same as bulk() function

85

86

Yields:

87

Tuples of (success, action_result) for each operation

88

"""

89

```

90

91

### Scanning Operations

92

93

Efficient scanning for processing large result sets.

94

95

```python { .api }

96

def scan(client, query=None, scroll='5m', **kwargs):

97

"""

98

Scan search results for large datasets using scroll API.

99

100

Parameters:

101

- client: OpenSearch client instance

102

- query (dict, optional): Search query body

103

- scroll (str, optional): Scroll timeout (default: '5m')

104

- index (str/list, optional): Index name(s)

105

- doc_type (str/list, optional): Document type(s) (deprecated)

106

- size (int, optional): Number of results per shard (default: 1000)

107

- request_timeout (float, optional): Request timeout in seconds

108

- clear_scroll (bool, optional): Clear scroll context on completion (default: True)

109

- scroll_kwargs (dict, optional): Additional scroll parameters

110

- preserve_order (bool, optional): Preserve result order (default: False)

111

112

Query format:

113

{

114

'query': {

115

'match_all': {}

116

},

117

'sort': ['_doc'] # Recommended for performance

118

}

119

120

Yields:

121

Individual document hits from search results

122

123

Raises:

124

ScanError: If scan operation fails

125

"""

126

127

def async_scan(client, query=None, scroll='5m', **kwargs):

128

"""

129

Asynchronous version of scan operations.

130

131

Parameters: Same as scan() function

132

133

Async yields:

134

Individual document hits from search results

135

"""

136

```

137

138

### Reindexing Operations

139

140

Copy documents between indices with optional transformations.

141

142

```python { .api }

143

def reindex(client, source_index, target_index, query=None, **kwargs):

144

"""

145

Reindex documents from source to target index.

146

147

Parameters:

148

- client: OpenSearch client instance

149

- source_index (str): Source index name

150

- target_index (str): Target index name

151

- query (dict, optional): Query to filter source documents

152

- chunk_size (int, optional): Bulk operation chunk size (default: 500)

153

- scroll (str, optional): Scroll timeout (default: '5m')

154

- op_type (str, optional): Operation type ('index' or 'create', default: 'index')

155

- transform (callable, optional): Function to transform documents

156

- target_client: Different client for target index

157

158

Transform function signature:

159

def transform_doc(doc):

160

# Modify doc['_source'], doc['_id'], etc.

161

return doc

162

163

Returns:

164

Tuple of (success_count, failed_operations)

165

166

Raises:

167

ReindexError: If reindexing fails

168

"""

169

170

def async_reindex(client, source_index, target_index, **kwargs):

171

"""

172

Asynchronous version of reindex operations.

173

174

Parameters: Same as reindex() function

175

176

Returns:

177

Tuple of (success_count, failed_operations)

178

"""

179

```

180

181

### Utility Functions

182

183

Additional helper functions for common operations.

184

185

```python { .api }

186

def expand_action(data):

187

"""

188

Expand a single document into a bulk action format.

189

190

Parameters:

191

- data: Document data or action dictionary

192

193

Returns:

194

Properly formatted bulk action

195

"""

196

197

def _chunk_actions(actions, chunk_size, max_chunk_bytes):

198

"""

199

Internal function to chunk actions for bulk operations.

200

201

Parameters:

202

- actions: Iterable of actions

203

- chunk_size: Maximum actions per chunk

204

- max_chunk_bytes: Maximum bytes per chunk

205

206

Yields:

207

Chunks of actions

208

"""

209

210

def _process_bulk_chunk(client, chunk, **kwargs):

211

"""

212

Internal function to process a single bulk chunk.

213

214

Parameters:

215

- client: OpenSearch client instance

216

- chunk: List of actions to process

217

- kwargs: Additional bulk parameters

218

219

Returns:

220

Processed results

221

"""

222

```

223

224

## Usage Examples

225

226

### Basic Bulk Operations

227

228

```python

229

from opensearchpy import OpenSearch

230

from opensearchpy.helpers import bulk

231

232

client = OpenSearch([{'host': 'localhost', 'port': 9200}])

233

234

# Prepare bulk actions

235

actions = [

236

{

237

'_op_type': 'index',

238

'_index': 'products',

239

'_id': '1',

240

'_source': {

241

'title': 'Laptop Computer',

242

'price': 999.99,

243

'category': 'Electronics'

244

}

245

},

246

{

247

'_op_type': 'index',

248

'_index': 'products',

249

'_id': '2',

250

'_source': {

251

'title': 'Wireless Mouse',

252

'price': 29.99,

253

'category': 'Electronics'

254

}

255

},

256

{

257

'_op_type': 'update',

258

'_index': 'products',

259

'_id': '1',

260

'_source': {

261

'doc': {

262

'in_stock': True

263

}

264

}

265

},

266

{

267

'_op_type': 'delete',

268

'_index': 'products',

269

'_id': '3'

270

}

271

]

272

273

# Execute bulk operations

274

successes, failures = bulk(

275

client,

276

actions,

277

chunk_size=100,

278

thread_count=4,

279

timeout='60s'

280

)

281

282

print(f"Successful operations: {successes}")

283

if failures:

284

print(f"Failed operations: {len(failures)}")

285

for failure in failures:

286

print(f" Error: {failure}")

287

```

288

289

### Streaming Bulk with Generator

290

291

```python

292

from opensearchpy.helpers import streaming_bulk

293

294

def generate_docs():

295

"""Generate documents from data source."""

296

for i in range(10000):

297

yield {

298

'_op_type': 'index',

299

'_index': 'large-dataset',

300

'_id': str(i),

301

'_source': {

302

'id': i,

303

'value': f'Document {i}',

304

'timestamp': '2024-01-01T00:00:00Z'

305

}

306

}

307

308

# Stream bulk operations

309

for success, info in streaming_bulk(

310

client,

311

generate_docs(),

312

chunk_size=500,

313

max_retries=3

314

):

315

if not success:

316

print(f"Failed to index: {info}")

317

else:

318

print(f"Indexed document: {info['index']['_id']}")

319

```

320

321

### Large Dataset Scanning

322

323

```python

324

from opensearchpy.helpers import scan

325

326

# Scan all documents in an index

327

query = {

328

'query': {

329

'range': {

330

'timestamp': {

331

'gte': '2024-01-01',

332

'lte': '2024-12-31'

333

}

334

}

335

},

336

'sort': ['_doc'] # More efficient than default scoring

337

}

338

339

total_processed = 0

340

for doc in scan(

341

client,

342

query=query,

343

index='large-index',

344

size=1000, # Documents per shard per request

345

scroll='10m'

346

):

347

# Process each document

348

process_document(doc['_source'])

349

total_processed += 1

350

351

if total_processed % 10000 == 0:

352

print(f"Processed {total_processed} documents")

353

354

print(f"Total processed: {total_processed} documents")

355

```

356

357

### Reindexing with Transformation

358

359

```python

360

from opensearchpy.helpers import reindex

361

362

def transform_document(doc):

363

"""Transform document during reindexing."""

364

# Add new fields

365

doc['_source']['processed_at'] = '2024-01-01T00:00:00Z'

366

367

# Rename fields

368

if 'old_field' in doc['_source']:

369

doc['_source']['new_field'] = doc['_source'].pop('old_field')

370

371

# Filter out unwanted fields

372

doc['_source'].pop('temp_field', None)

373

374

# Change document ID format

375

doc['_id'] = f"new_{doc['_id']}"

376

377

return doc

378

379

# Reindex with transformation

380

query = {

381

'query': {

382

'bool': {

383

'must': [

384

{'term': {'status': 'active'}}

385

]

386

}

387

}

388

}

389

390

success_count, failed_ops = reindex(

391

client,

392

source_index='old-index',

393

target_index='new-index',

394

query=query,

395

transform=transform_document,

396

chunk_size=200

397

)

398

399

print(f"Successfully reindexed: {success_count} documents")

400

if failed_ops:

401

print(f"Failed operations: {len(failed_ops)}")

402

```

403

404

### Parallel Bulk Processing

405

406

```python

407

from opensearchpy.helpers import parallel_bulk

408

import json

409

410

def read_json_file(filename):

411

"""Read documents from JSON file."""

412

with open(filename, 'r') as f:

413

for line in f:

414

doc = json.loads(line)

415

yield {

416

'_op_type': 'index',

417

'_index': 'imported-data',

418

'_source': doc

419

}

420

421

# Process large file with parallel bulk

422

processed = 0

423

errors = []

424

425

for success, info in parallel_bulk(

426

client,

427

read_json_file('large_dataset.jsonl'),

428

thread_count=8,

429

chunk_size=1000,

430

max_retries=3,

431

initial_backoff=2,

432

max_backoff=600

433

):

434

if success:

435

processed += 1

436

else:

437

errors.append(info)

438

439

if processed % 10000 == 0:

440

print(f"Processed: {processed}, Errors: {len(errors)}")

441

442

print(f"Final: Processed {processed}, Errors: {len(errors)}")

443

```

444

445

### Async Bulk Operations

446

447

```python

448

import asyncio

449

from opensearchpy import AsyncOpenSearch

450

from opensearchpy.helpers import async_bulk

451

452

async def async_bulk_example():

453

client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])

454

455

actions = [

456

{

457

'_op_type': 'index',

458

'_index': 'async-index',

459

'_id': str(i),

460

'_source': {'value': i}

461

}

462

for i in range(1000)

463

]

464

465

# Async bulk operations

466

success_count, failed_ops = await async_bulk(

467

client,

468

actions,

469

chunk_size=100

470

)

471

472

print(f"Async bulk: {success_count} successful, {len(failed_ops)} failed")

473

474

await client.close()

475

476

# Run async example

477

asyncio.run(async_bulk_example())

478

```

479

480

### Error Handling and Retry Logic

481

482

```python

483

from opensearchpy.helpers import bulk

484

from opensearchpy.exceptions import BulkIndexError, ConnectionError

485

486

def robust_bulk_index(client, documents, max_attempts=3):

487

"""Robust bulk indexing with retry logic."""

488

actions = [

489

{

490

'_op_type': 'index',

491

'_index': 'robust-index',

492

'_source': doc

493

}

494

for doc in documents

495

]

496

497

for attempt in range(max_attempts):

498

try:

499

success_count, failed_ops = bulk(

500

client,

501

actions,

502

max_retries=2,

503

initial_backoff=2,

504

max_backoff=60

505

)

506

507

if not failed_ops:

508

print(f"All {success_count} documents indexed successfully")

509

return success_count, []

510

511

# Retry only failed operations

512

actions = failed_ops

513

print(f"Attempt {attempt + 1}: {len(failed_ops)} operations failed, retrying...")

514

515

except (BulkIndexError, ConnectionError) as e:

516

print(f"Attempt {attempt + 1} failed: {e}")

517

if attempt == max_attempts - 1:

518

raise

519

520

# Wait before retry

521

import time

522

time.sleep(2 ** attempt)

523

524

return success_count, failed_ops

525

526

# Use robust bulk indexing

527

documents = [{'id': i, 'data': f'value_{i}'} for i in range(1000)]

528

success, failures = robust_bulk_index(client, documents)

529

```