or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-operations.mdcluster-operations.mddocument-operations.mdindex-management.mdindex.mdsearch-operations.mdtransport-connection.md

bulk-operations.mddocs/

0

# Bulk Operations

1

2

High-performance operations for processing multiple documents efficiently. Includes bulk indexing, updates, deletions, and specialized helper functions for streaming and parallel processing of large datasets.

3

4

## Capabilities

5

6

### Bulk Document Operations

7

8

Execute multiple document operations (index, create, update, delete) in a single request for improved performance.

9

10

```python { .api }

11

def bulk(body: list, index: str = None, doc_type: str = None, **params) -> dict:

12

"""

13

Execute multiple document operations in a single request.

14

15

Parameters:

16

- body: List of operations (action/metadata and optional source)

17

- index: Default index for operations without explicit index

18

- doc_type: Default document type

19

- pipeline: Default ingest pipeline

20

- refresh: Control when changes are visible ('true', 'false', 'wait_for')

21

- routing: Default routing value

22

- timeout: Request timeout

23

- wait_for_active_shards: Wait for N shards to be active

24

25

Body format (list of alternating action lines and document lines):

26

[

27

{"index": {"_index": "my_index", "_type": "_doc", "_id": "1"}},

28

{"title": "Document 1", "content": "Content here"},

29

{"create": {"_index": "my_index", "_type": "_doc", "_id": "2"}},

30

{"title": "Document 2", "content": "More content"},

31

{"update": {"_index": "my_index", "_type": "_doc", "_id": "3"}},

32

{"doc": {"title": "Updated Document 3"}},

33

{"delete": {"_index": "my_index", "_type": "_doc", "_id": "4"}}

34

]

35

36

Returns:

37

dict: Bulk response with 'items' array containing results for each operation

38

"""

39

```

40

41

### Update by Query

42

43

Update multiple documents matching a query using scripts or partial updates.

44

45

```python { .api }

46

def update_by_query(index: str, doc_type: str = None, body: dict = None, **params) -> dict:

47

"""

48

Update documents matching a query.

49

50

Parameters:

51

- index: Index name(s) to update

52

- doc_type: Document type(s)

53

- body: Update specification with query and script

54

- _source: Fields to include in response

55

- _source_excludes: Fields to exclude

56

- _source_includes: Fields to include

57

- allow_no_indices: Handle missing indices

58

- analyzer: Query analyzer

59

- analyze_wildcard: Analyze wildcards

60

- conflicts: How to handle version conflicts ('abort', 'proceed')

61

- default_operator: Default query operator

62

- df: Default field

63

- expand_wildcards: Wildcard expansion

64

- from_: Starting document

65

- ignore_unavailable: Ignore unavailable indices

66

- lenient: Ignore query failures

67

- pipeline: Ingest pipeline for updated documents

68

- preference: Node preference

69

- q: Query string

70

- refresh: Refresh after operation

71

- request_cache: Use request cache

72

- requests_per_second: Throttling rate (operations per second)

73

- routing: Routing values

74

- scroll: Scroll timeout for large updates

75

- scroll_size: Scroll batch size

76

- search_type: Search type

77

- search_timeout: Search timeout

78

- size: Maximum documents to update

79

- slices: Number of slices for parallel processing

80

- sort: Sort specification

81

- terminate_after: Terminate after N documents

82

- timeout: Request timeout

83

- version: Include versions in response

84

- version_type: Version type

85

- wait_for_active_shards: Wait for shards

86

- wait_for_completion: Wait for completion or return task

87

88

Body structure:

89

{

90

"query": {

91

"term": {"status": "draft"}

92

},

93

"script": {

94

"source": "ctx._source.status = 'published'; ctx._source.published_at = params.now",

95

"params": {"now": "2023-01-01T12:00:00Z"}

96

}

97

}

98

99

Returns:

100

dict: Update results with 'updated', 'version_conflicts', 'took', etc.

101

"""

102

```

103

104

### Delete by Query

105

106

Delete multiple documents matching a query efficiently.

107

108

```python { .api }

109

def delete_by_query(index: str, body: dict, doc_type: str = None, **params) -> dict:

110

"""

111

Delete documents matching a query.

112

113

Parameters:

114

- index: Index name(s) to delete from

115

- body: Delete specification with query

116

- doc_type: Document type(s)

117

- _source: Fields to include in response

118

- _source_excludes: Fields to exclude

119

- _source_includes: Fields to include

120

- allow_no_indices: Handle missing indices

121

- analyzer: Query analyzer

122

- analyze_wildcard: Analyze wildcards

123

- conflicts: How to handle conflicts ('abort', 'proceed')

124

- default_operator: Default query operator

125

- df: Default field

126

- expand_wildcards: Wildcard expansion

127

- from_: Starting document

128

- ignore_unavailable: Ignore unavailable indices

129

- lenient: Ignore query failures

130

- preference: Node preference

131

- q: Query string

132

- refresh: Refresh after operation

133

- request_cache: Use request cache

134

- requests_per_second: Throttling rate

135

- routing: Routing values

136

- scroll: Scroll timeout

137

- scroll_size: Scroll batch size

138

- search_type: Search type

139

- search_timeout: Search timeout

140

- size: Maximum documents to delete

141

- slices: Number of slices for parallel processing

142

- sort: Sort specification

143

- terminate_after: Terminate after N documents

144

- timeout: Request timeout

145

- version: Include versions

146

- wait_for_active_shards: Wait for shards

147

- wait_for_completion: Wait for completion

148

149

Body structure:

150

{

151

"query": {

152

"range": {

153

"created_at": {

154

"lt": "2022-01-01"

155

}

156

}

157

}

158

}

159

160

Returns:

161

dict: Deletion results with 'deleted', 'took', 'version_conflicts', etc.

162

"""

163

```

164

165

### Reindex Operations

166

167

Copy and transform documents between indices with optional query filtering and script processing.

168

169

```python { .api }

170

def reindex(body: dict, **params) -> dict:

171

"""

172

Copy documents from source to destination index with optional transformation.

173

174

Parameters:

175

- body: Reindex specification with source and destination

176

- refresh: Refresh destination index after operation

177

- requests_per_second: Throttling rate

178

- slices: Number of slices for parallel processing

179

- timeout: Request timeout

180

- wait_for_active_shards: Wait for shards

181

- wait_for_completion: Wait for completion or return task

182

183

Body structure:

184

{

185

"source": {

186

"index": "source_index",

187

"type": "_doc", # Optional

188

"query": { # Optional filtering

189

"term": {"status": "published"}

190

},

191

"_source": ["title", "content"], # Optional field filtering

192

"size": 1000 # Batch size

193

},

194

"dest": {

195

"index": "destination_index",

196

"type": "_doc", # Optional

197

"pipeline": "my_pipeline" # Optional ingest pipeline

198

},

199

"script": { # Optional transformation

200

"source": "ctx._source.new_field = ctx._source.old_field + '_transformed'"

201

},

202

"conflicts": "proceed" # Handle version conflicts

203

}

204

205

Returns:

206

dict: Reindex results with 'created', 'updated', 'took', etc.

207

"""

208

209

def reindex_rethrottle(task_id: str = None, **params) -> dict:

210

"""

211

Change throttling of a running reindex task.

212

213

Parameters:

214

- task_id: Task identifier from reindex operation

215

- requests_per_second: New throttling rate

216

217

Returns:

218

dict: Updated task information

219

"""

220

```

221

222

## Helper Functions

223

224

High-level helper utilities for common bulk operations with automatic batching, error handling, and progress tracking.

225

226

```python { .api }

227

from elasticsearch5.helpers import bulk, streaming_bulk, parallel_bulk, scan, reindex

228

229

def bulk(client, actions, stats_only: bool = False, **kwargs) -> tuple:

230

"""

231

Helper for bulk operations with automatic batching and error handling.

232

233

Parameters:

234

- client: Elasticsearch client instance

235

- actions: Iterable of document actions

236

- stats_only: Return only success count (bool)

237

- index: Default index name

238

- doc_type: Default document type

239

- chunk_size: Documents per batch (default 500)

240

- max_chunk_bytes: Maximum batch size in bytes

241

- thread_count: Number of threads for parallel processing

242

- queue_size: Queue size for threading

243

- expand_action_callback: Function to transform actions

244

- refresh: Refresh after operation

245

- request_timeout: Request timeout

246

- max_retries: Maximum retry attempts

247

- initial_backoff: Initial retry delay

248

- max_backoff: Maximum retry delay

249

250

Action format:

251

{

252

"_op_type": "index", # or "create", "update", "delete"

253

"_index": "my_index",

254

"_type": "_doc",

255

"_id": "document_id",

256

"_source": {"field": "value"} # For index/create

257

}

258

259

Returns:

260

If stats_only=False: (success_count, list_of_errors)

261

If stats_only=True: success_count

262

263

Raises:

264

BulkIndexError: If errors occurred and not ignored

265

"""

266

267

def streaming_bulk(client, actions, chunk_size: int = 500, **kwargs):

268

"""

269

Generator for streaming bulk operations.

270

271

Parameters: Same as bulk()

272

273

Yields:

274

(is_success: bool, action_result: dict) for each action

275

"""

276

277

def parallel_bulk(client, actions, thread_count: int = 4, **kwargs):

278

"""

279

Generator for parallel bulk operations using multiple threads.

280

281

Parameters: Same as bulk() plus thread_count

282

283

Yields:

284

(is_success: bool, action_result: dict) for each action

285

"""

286

287

def scan(client, query: dict = None, scroll: str = '5m', **kwargs):

288

"""

289

Generator to efficiently scroll through all matching documents.

290

291

Parameters:

292

- client: Elasticsearch client

293

- query: Search query (default match_all)

294

- scroll: Scroll timeout

295

- index: Index name(s)

296

- doc_type: Document type(s)

297

- raise_on_error: Raise on scroll errors

298

- preserve_order: Maintain result ordering

299

- size: Documents per scroll batch

300

- request_timeout: Request timeout

301

- clear_scroll: Clear scroll on completion

302

303

Yields:

304

Individual documents from search results

305

"""

306

307

def reindex(client, source_index: str, target_index: str, query: dict = None, **kwargs) -> tuple:

308

"""

309

Helper to reindex documents between indices.

310

311

Parameters:

312

- client: Elasticsearch client

313

- source_index: Source index name

314

- target_index: Target index name

315

- query: Optional query to filter documents

316

- target_client: Different client for target (for cross-cluster)

317

- chunk_size: Documents per batch

318

- scroll: Scroll timeout

319

- scan_kwargs: Additional scan() parameters

320

- bulk_kwargs: Additional bulk() parameters

321

322

Returns:

323

(success_count, list_of_errors)

324

"""

325

```

326

327

### Helper Exceptions

328

329

```python { .api }

330

from elasticsearch5.helpers import BulkIndexError, ScanError

331

332

class BulkIndexError(Exception):

333

"""

334

Exception for bulk operation failures.

335

336

Attributes:

337

- errors: List of failed actions with error details

338

"""

339

340

class ScanError(Exception):

341

"""

342

Exception for scan operation failures.

343

344

Attributes:

345

- scroll_id: Scroll ID for potential resume

346

"""

347

```

348

349

## Usage Examples

350

351

### Basic Bulk Operations

352

353

```python

354

from elasticsearch5 import Elasticsearch

355

356

es = Elasticsearch(['localhost:9200'])

357

358

# Prepare bulk operations

359

actions = [

360

{

361

"_op_type": "index",

362

"_index": "articles",

363

"_type": "_doc",

364

"_id": "1",

365

"_source": {"title": "Article 1", "content": "Content 1"}

366

},

367

{

368

"_op_type": "create",

369

"_index": "articles",

370

"_type": "_doc",

371

"_id": "2",

372

"_source": {"title": "Article 2", "content": "Content 2"}

373

},

374

{

375

"_op_type": "update",

376

"_index": "articles",

377

"_type": "_doc",

378

"_id": "3",

379

"_source": {"doc": {"title": "Updated Article 3"}}

380

},

381

{

382

"_op_type": "delete",

383

"_index": "articles",

384

"_type": "_doc",

385

"_id": "4"

386

}

387

]

388

389

# Execute bulk operations

390

response = es.bulk(body=actions)

391

392

# Check for errors

393

if response['errors']:

394

for item in response['items']:

395

for operation, result in item.items():

396

if 'error' in result:

397

print(f"Error in {operation}: {result['error']}")

398

else:

399

print(f"Successfully processed {len(response['items'])} operations")

400

```

401

402

### Using Helper Functions

403

404

```python

405

from elasticsearch5.helpers import bulk, scan, reindex

406

407

# Generate documents

408

def generate_docs():

409

for i in range(10000):

410

yield {

411

"_index": "large_index",

412

"_type": "_doc",

413

"_id": str(i),

414

"_source": {

415

"title": f"Document {i}",

416

"content": f"This is content for document {i}",

417

"category": "bulk_test"

418

}

419

}

420

421

# Bulk index with helper

422

try:

423

success, failed = bulk(es, generate_docs(), chunk_size=1000)

424

print(f"Successfully indexed {success} documents")

425

if failed:

426

print(f"Failed to index {len(failed)} documents")

427

except BulkIndexError as e:

428

print(f"Bulk indexing failed: {e}")

429

for error in e.errors:

430

print(f"Error: {error}")

431

```

432

433

### Streaming Bulk Processing

434

435

```python

436

from elasticsearch5.helpers import streaming_bulk

437

438

# Process large dataset with streaming

439

def process_large_dataset():

440

for is_success, result in streaming_bulk(es, generate_docs(), chunk_size=500):

441

if not is_success:

442

print(f"Failed to process: {result}")

443

else:

444

# Process successful result

445

pass

446

447

process_large_dataset()

448

```

449

450

### Update by Query

451

452

```python

453

# Update all draft articles to published

454

update_body = {

455

"query": {

456

"term": {"status": "draft"}

457

},

458

"script": {

459

"source": """

460

ctx._source.status = 'published';

461

ctx._source.published_at = params.now;

462

ctx._source.publish_count = (ctx._source.publish_count ?: 0) + 1

463

""",

464

"params": {

465

"now": "2023-01-01T12:00:00Z"

466

}

467

}

468

}

469

470

response = es.update_by_query(

471

index='articles',

472

body=update_body,

473

conflicts='proceed', # Continue on version conflicts

474

refresh=True

475

)

476

477

print(f"Updated {response['updated']} documents")

478

print(f"Version conflicts: {response.get('version_conflicts', 0)}")

479

```

480

481

### Reindex with Transformation

482

483

```python

484

# Reindex with field transformation and filtering

485

reindex_body = {

486

"source": {

487

"index": "old_articles",

488

"query": {

489

"range": {

490

"created_at": {

491

"gte": "2022-01-01"

492

}

493

}

494

}

495

},

496

"dest": {

497

"index": "new_articles",

498

"pipeline": "article_enrichment"

499

},

500

"script": {

501

"source": """

502

// Transform old format to new format

503

ctx._source.slug = ctx._source.title.toLowerCase().replaceAll('[^a-z0-9]+', '-');

504

ctx._source.word_count = ctx._source.content.split(' ').length;

505

ctx._source.migrated_at = '2023-01-01T00:00:00Z';

506

"""

507

}

508

}

509

510

response = es.reindex(

511

body=reindex_body,

512

wait_for_completion=False, # Run as task

513

slices='auto' # Parallel processing

514

)

515

516

if 'task' in response:

517

task_id = response['task']

518

print(f"Reindex started as task: {task_id}")

519

520

# Check task status later

521

task_status = es.tasks.get(task_id=task_id)

522

print(f"Task status: {task_status}")

523

```

524

525

### Scan and Process Large Datasets

526

527

```python

528

from elasticsearch5.helpers import scan

529

530

# Process all documents in an index

531

query = {

532

"query": {

533

"range": {

534

"created_at": {

535

"gte": "2023-01-01"

536

}

537

}

538

}

539

}

540

541

# Scan through all matching documents

542

processed_count = 0

543

for doc in scan(es, query=query, index='large_index', size=1000):

544

# Process each document

545

process_document(doc['_source'])

546

processed_count += 1

547

548

if processed_count % 10000 == 0:

549

print(f"Processed {processed_count} documents")

550

551

print(f"Total processed: {processed_count} documents")

552

```