or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-operations.mdcluster-management.mdesql-operations.mdexception-handling.mdhelper-functions.mdindex-management.mdindex.mdinference-api.mdlifecycle-management.mdmachine-learning.mdquery-dsl.mdsearch-operations.mdsecurity-operations.mdvectorstore-helpers.md

helper-functions.mddocs/

0

# Helper Functions

1

2

Utility functions for common Elasticsearch operations including bulk indexing, scanning large result sets, and reindexing data. These helpers simplify complex operations and provide optimized implementations for common use cases.

3

4

## Capabilities

5

6

### Bulk Operations

7

8

Efficient bulk indexing with automatic batching, error handling, and progress tracking.

9

10

```python { .api }

11

def bulk(

12

client,

13

actions,

14

index: Optional[str] = None,

15

doc_type: Optional[str] = None,

16

routing: Optional[str] = None,

17

pipeline: Optional[str] = None,

18

refresh: Optional[str] = None,

19

timeout: Optional[str] = None,

20

chunk_size: int = 500,

21

max_chunk_bytes: int = 104857600,

22

thread_count: int = 4,

23

queue_size: int = 4,

24

expand_action_callback=None,

25

raise_on_exception: bool = True,

26

raise_on_error: bool = True,

27

ignore_status=(),

28

**kwargs

29

) -> Tuple[int, List[Dict]]:

30

"""

31

Perform bulk indexing operations.

32

33

Parameters:

34

- client: Elasticsearch client instance

35

- actions: Iterable of action dictionaries or documents

36

- index: Default index name for actions

37

- doc_type: Default document type (deprecated)

38

- routing: Default routing value

39

- pipeline: Default ingest pipeline

40

- refresh: Refresh policy for operations

41

- timeout: Request timeout

42

- chunk_size: Number of documents per chunk

43

- max_chunk_bytes: Maximum chunk size in bytes

44

- thread_count: Number of parallel threads

45

- queue_size: Thread pool queue size

46

- expand_action_callback: Callback to expand actions

47

- raise_on_exception: Whether to raise on exceptions

48

- raise_on_error: Whether to raise on API errors

49

- ignore_status: HTTP status codes to ignore

50

51

Returns:

52

Tuple of (success_count, failed_operations)

53

"""

54

55

def streaming_bulk(

56

client,

57

actions,

58

index: Optional[str] = None,

59

doc_type: Optional[str] = None,

60

routing: Optional[str] = None,

61

pipeline: Optional[str] = None,

62

refresh: Optional[str] = None,

63

timeout: Optional[str] = None,

64

chunk_size: int = 500,

65

max_chunk_bytes: int = 104857600,

66

expand_action_callback=None,

67

raise_on_exception: bool = True,

68

raise_on_error: bool = True,

69

ignore_status=(),

70

**kwargs

71

):

72

"""

73

Generator that yields bulk operation results as they complete.

74

75

Parameters: Same as bulk()

76

77

Yields:

78

Tuples of (success, info) for each chunk processed

79

"""

80

81

def parallel_bulk(

82

client,

83

actions,

84

index: Optional[str] = None,

85

doc_type: Optional[str] = None,

86

routing: Optional[str] = None,

87

pipeline: Optional[str] = None,

88

refresh: Optional[str] = None,

89

timeout: Optional[str] = None,

90

chunk_size: int = 500,

91

max_chunk_bytes: int = 104857600,

92

thread_count: int = 4,

93

queue_size: int = 4,

94

expand_action_callback=None,

95

ignore_status=(),

96

**kwargs

97

):

98

"""

99

Parallel bulk indexing using multiple threads.

100

101

Parameters: Same as bulk() with additional thread control

102

103

Yields:

104

Tuples of (success, info) for each chunk processed

105

"""

106

```

107

108

### Scanning Operations

109

110

Efficiently iterate through large result sets using scroll API.

111

112

```python { .api }

113

def scan(

114

client,

115

query: Optional[Dict[str, Any]] = None,

116

scroll: str = "5m",

117

raise_on_error: bool = True,

118

preserve_order: bool = False,

119

size: int = 1000,

120

request_timeout: Optional[float] = None,

121

clear_scroll: bool = True,

122

scroll_kwargs: Optional[Dict] = None,

123

**kwargs

124

):

125

"""

126

Scan and scroll through all documents matching a query.

127

128

Parameters:

129

- client: Elasticsearch client instance

130

- query: Query to execute (default: match_all)

131

- scroll: Scroll context timeout

132

- raise_on_error: Whether to raise on errors

133

- preserve_order: Whether to preserve result ordering

134

- size: Number of documents per shard per batch

135

- request_timeout: Request timeout

136

- clear_scroll: Whether to clear scroll context when done

137

- scroll_kwargs: Additional arguments for scroll requests

138

- **kwargs: Additional search parameters

139

140

Yields:

141

Individual document hits

142

"""

143

```

144

145

### Reindexing Operations

146

147

Copy documents between indices with optional transformation.

148

149

```python { .api }

150

def reindex(

151

client,

152

source_index: str,

153

target_index: str,

154

query: Optional[Dict[str, Any]] = None,

155

target_client: Optional[object] = None,

156

chunk_size: int = 500,

157

scroll: str = "5m",

158

scan_kwargs: Optional[Dict] = None,

159

bulk_kwargs: Optional[Dict] = None,

160

transform_doc_callback=None,

161

**kwargs

162

) -> Tuple[int, List[Dict]]:

163

"""

164

Reindex documents from source to target index.

165

166

Parameters:

167

- client: Source Elasticsearch client

168

- source_index: Source index name

169

- target_index: Target index name

170

- query: Query to filter source documents

171

- target_client: Target client (if different from source)

172

- chunk_size: Bulk operation chunk size

173

- scroll: Scroll timeout for scanning

174

- scan_kwargs: Additional scan arguments

175

- bulk_kwargs: Additional bulk arguments

176

- transform_doc_callback: Function to transform documents

177

178

Returns:

179

Tuple of (success_count, failed_operations)

180

"""

181

```

182

183

### Asynchronous Helper Functions

184

185

Async versions of helper functions for use with AsyncElasticsearch.

186

187

```python { .api }

188

async def async_bulk(

189

client,

190

actions,

191

index: Optional[str] = None,

192

doc_type: Optional[str] = None,

193

routing: Optional[str] = None,

194

pipeline: Optional[str] = None,

195

refresh: Optional[str] = None,

196

timeout: Optional[str] = None,

197

chunk_size: int = 500,

198

max_chunk_bytes: int = 104857600,

199

expand_action_callback=None,

200

raise_on_exception: bool = True,

201

raise_on_error: bool = True,

202

ignore_status=(),

203

**kwargs

204

) -> Tuple[int, List[Dict]]:

205

"""

206

Async version of bulk operation.

207

208

Parameters: Same as bulk()

209

210

Returns:

211

Tuple of (success_count, failed_operations)

212

"""

213

214

async def async_streaming_bulk(

215

client,

216

actions,

217

index: Optional[str] = None,

218

doc_type: Optional[str] = None,

219

routing: Optional[str] = None,

220

pipeline: Optional[str] = None,

221

refresh: Optional[str] = None,

222

timeout: Optional[str] = None,

223

chunk_size: int = 500,

224

max_chunk_bytes: int = 104857600,

225

expand_action_callback=None,

226

raise_on_exception: bool = True,

227

raise_on_error: bool = True,

228

ignore_status=(),

229

**kwargs

230

):

231

"""

232

Async generator for streaming bulk operations.

233

234

Parameters: Same as streaming_bulk()

235

236

Yields:

237

Tuples of (success, info) for each chunk processed

238

"""

239

240

async def async_scan(

241

client,

242

query: Optional[Dict[str, Any]] = None,

243

scroll: str = "5m",

244

raise_on_error: bool = True,

245

preserve_order: bool = False,

246

size: int = 1000,

247

request_timeout: Optional[float] = None,

248

clear_scroll: bool = True,

249

scroll_kwargs: Optional[Dict] = None,

250

**kwargs

251

):

252

"""

253

Async version of scan operation.

254

255

Parameters: Same as scan()

256

257

Yields:

258

Individual document hits

259

"""

260

261

async def async_reindex(

262

client,

263

source_index: str,

264

target_index: str,

265

query: Optional[Dict[str, Any]] = None,

266

target_client: Optional[object] = None,

267

chunk_size: int = 500,

268

scroll: str = "5m",

269

scan_kwargs: Optional[Dict] = None,

270

bulk_kwargs: Optional[Dict] = None,

271

transform_doc_callback=None,

272

**kwargs

273

) -> Tuple[int, List[Dict]]:

274

"""

275

Async version of reindex operation.

276

277

Parameters: Same as reindex()

278

279

Returns:

280

Tuple of (success_count, failed_operations)

281

"""

282

```

283

284

### Utility Functions

285

286

Additional utility functions for common operations.

287

288

```python { .api }

289

def expand_action(action: Dict[str, Any]) -> Dict[str, Any]:

290

"""

291

Expand a shorthand action dictionary to full format.

292

293

Parameters:

294

- action: Action dictionary to expand

295

296

Returns:

297

Expanded action dictionary

298

"""

299

```

300

301

### Helper Exceptions

302

303

Exception types specific to helper operations.

304

305

```python { .api }

306

class BulkIndexError(Exception):

307

"""

308

Exception raised when bulk operations encounter errors.

309

310

Attributes:

311

- errors: List of individual operation errors

312

"""

313

def __init__(self, message: str, errors: List[Dict]): ...

314

@property

315

def errors(self) -> List[Dict]: ...

316

317

class ScanError(Exception):

318

"""

319

Exception raised during scan operations.

320

321

Attributes:

322

- scroll_id: Scroll context ID if available

323

- partial_results: Results obtained before error

324

"""

325

def __init__(self, message: str, scroll_id: Optional[str] = None): ...

326

```

327

328

## Usage Examples

329

330

### Bulk Indexing

331

332

```python

333

from elasticsearch import Elasticsearch

334

from elasticsearch.helpers import bulk, BulkIndexError

335

336

client = Elasticsearch(hosts=['http://localhost:9200'])

337

338

# Prepare documents for bulk indexing

339

def generate_docs():

340

for i in range(10000):

341

yield {

342

"_index": "products",

343

"_id": i,

344

"_source": {

345

"name": f"Product {i}",

346

"price": i * 10.99,

347

"category": f"Category {i % 5}"

348

}

349

}

350

351

try:

352

# Bulk index documents

353

success_count, failed_ops = bulk(

354

client,

355

generate_docs(),

356

chunk_size=1000,

357

timeout='60s',

358

refresh='wait_for'

359

)

360

361

print(f"Successfully indexed: {success_count} documents")

362

if failed_ops:

363

print(f"Failed operations: {len(failed_ops)}")

364

365

except BulkIndexError as e:

366

print(f"Bulk indexing failed: {e}")

367

for error in e.errors:

368

print(f"Error: {error}")

369

```

370

371

### Streaming Bulk Operations

372

373

```python

374

from elasticsearch.helpers import streaming_bulk

375

376

def generate_large_dataset():

377

for i in range(100000):

378

yield {

379

"_index": "logs",

380

"_source": {

381

"timestamp": f"2024-01-01T{i:02d}:00:00Z",

382

"level": "INFO" if i % 2 == 0 else "WARN",

383

"message": f"Log message {i}",

384

"service": f"service-{i % 10}"

385

}

386

}

387

388

# Stream bulk operations with progress tracking

389

total_indexed = 0

390

for success, info in streaming_bulk(

391

client,

392

generate_large_dataset(),

393

chunk_size=5000,

394

max_chunk_bytes=10 * 1024 * 1024 # 10MB chunks

395

):

396

if success:

397

total_indexed += 1

398

if total_indexed % 10 == 0: # Progress every 10 chunks

399

print(f"Indexed {total_indexed * 5000} documents...")

400

else:

401

print(f"Failed chunk: {info}")

402

403

print(f"Total documents indexed: {total_indexed * 5000}")

404

```

405

406

### Scanning Large Result Sets

407

408

```python

409

from elasticsearch.helpers import scan

410

411

# Scan through all documents in an index

412

total_docs = 0

413

for doc in scan(

414

client,

415

index="products",

416

query={"match_all": {}},

417

scroll='2m',

418

size=1000

419

):

420

# Process each document

421

total_docs += 1

422

product_name = doc['_source']['name']

423

product_price = doc['_source']['price']

424

425

# Example: Update price with 10% discount

426

if product_price > 100:

427

client.update(

428

index="products",

429

id=doc['_id'],

430

document={"price": product_price * 0.9}

431

)

432

433

print(f"Processed {total_docs} documents")

434

```

435

436

### Advanced Scanning with Query

437

438

```python

439

# Scan with complex query and field filtering

440

query = {

441

"bool": {

442

"must": [

443

{"range": {"price": {"gte": 50, "lte": 1000}}},

444

{"term": {"status": "active"}}

445

],

446

"must_not": [

447

{"term": {"category.keyword": "discontinued"}}

448

]

449

}

450

}

451

452

for doc in scan(

453

client,

454

index="products",

455

query=query,

456

_source=["name", "price", "category"], # Only retrieve specific fields

457

scroll='5m',

458

size=2000,

459

preserve_order=True

460

):

461

print(f"Product: {doc['_source']['name']}, Price: {doc['_source']['price']}")

462

```

463

464

### Reindexing with Transformation

465

466

```python

467

from elasticsearch.helpers import reindex

468

469

def transform_document(doc):

470

"""Transform documents during reindex."""

471

source = doc['_source']

472

473

# Add new computed field

474

source['price_tier'] = 'high' if source['price'] > 1000 else 'low'

475

476

# Rename field

477

if 'desc' in source:

478

source['description'] = source.pop('desc')

479

480

# Convert price to integer cents

481

source['price_cents'] = int(source['price'] * 100)

482

483

return doc

484

485

# Reindex with transformation

486

success_count, failed_ops = reindex(

487

client,

488

source_index="products_v1",

489

target_index="products_v2",

490

query={"term": {"status": "active"}}, # Only reindex active products

491

transform_doc_callback=transform_document,

492

chunk_size=1000

493

)

494

495

print(f"Reindexed {success_count} documents")

496

if failed_ops:

497

print(f"Failed operations: {len(failed_ops)}")

498

```

499

500

### Parallel Bulk Processing

501

502

```python

503

from elasticsearch.helpers import parallel_bulk

504

import threading

505

506

def generate_docs_from_database():

507

"""Generate documents from database query."""

508

# Simulate database connection and query

509

for i in range(50000):

510

yield {

511

"_index": "analytics",

512

"_source": {

513

"user_id": i % 1000,

514

"event": f"action_{i % 10}",

515

"timestamp": "2024-01-01T00:00:00Z",

516

"value": i * 1.5

517

}

518

}

519

520

# Use parallel bulk for high-throughput indexing

521

results = []

522

for success, info in parallel_bulk(

523

client,

524

generate_docs_from_database(),

525

chunk_size=2000,

526

thread_count=8, # Use 8 parallel threads

527

queue_size=8,

528

timeout='120s'

529

):

530

results.append((success, info))

531

if len(results) % 100 == 0:

532

print(f"Processed {len(results)} chunks")

533

534

successful_chunks = sum(1 for success, _ in results if success)

535

print(f"Successfully processed {successful_chunks} chunks")

536

```

537

538

### Async Helper Usage

539

540

```python

541

import asyncio

542

from elasticsearch import AsyncElasticsearch

543

from elasticsearch.helpers import async_bulk, async_scan

544

545

async def async_bulk_example():

546

"""Example of async bulk operations."""

547

async_client = AsyncElasticsearch(hosts=['http://localhost:9200'])

548

549

# Prepare async documents

550

async def async_generate_docs():

551

for i in range(1000):

552

yield {

553

"_index": "async_test",

554

"_id": i,

555

"_source": {"value": i, "squared": i * i}

556

}

557

558

try:

559

# Async bulk indexing

560

success_count, failed_ops = await async_bulk(

561

async_client,

562

async_generate_docs(),

563

chunk_size=100

564

)

565

566

print(f"Async indexed: {success_count} documents")

567

568

# Async scanning

569

total_scanned = 0

570

async for doc in async_scan(

571

async_client,

572

index="async_test",

573

query={"match_all": {}}

574

):

575

total_scanned += 1

576

577

print(f"Async scanned: {total_scanned} documents")

578

579

finally:

580

await async_client.close()

581

582

# Run async example

583

asyncio.run(async_bulk_example())

584

```