or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdbulk-operations.mdbulk2-operations.mdexceptions.mdindex.mdmetadata-api.mdrest-api.mdutilities.md

bulk2-operations.mddocs/

0

# Bulk API v2.0 Operations

1

2

Next-generation bulk operations with improved performance, simplified job management, and enhanced monitoring capabilities for modern high-volume data processing. Bulk API v2.0 provides streamlined job workflows, better error handling, and more efficient data transfer compared to the original Bulk API.

3

4

## SFBulk2Handler Class

5

6

The main handler class for Bulk API v2.0 operations, providing modern bulk functionality with improved job management and monitoring.

7

8

```python { .api }

9

class SFBulk2Handler:

10

def __init__(

11

self,

12

session_id,

13

bulk2_url,

14

proxies=None,

15

session=None

16

):

17

"""

18

Initialize Bulk API v2.0 handler.

19

20

Parameters:

21

- session_id: Authenticated Salesforce session ID

22

- bulk2_url: Bulk 2.0 API endpoint URL

23

- proxies: HTTP proxy configuration dictionary

24

- session: Optional custom requests.Session object

25

"""

26

```

27

28

### Constants and Configuration

29

30

```python { .api }

31

class SFBulk2Handler:

32

JSON_CONTENT_TYPE = "application/json"

33

CSV_CONTENT_TYPE = "text/csv"

34

DEFAULT_WAIT_TIMEOUT_SECONDS = 300

35

MAX_CHECK_INTERVAL_SECONDS = 30

36

DEFAULT_QUERY_PAGE_SIZE = 50000

37

```

38

39

### Accessing Bulk 2.0 Operations

40

41

The SFBulk2Handler is accessed through the `bulk2` property of the main Salesforce client:

42

43

```python

44

from simple_salesforce import Salesforce

45

46

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

47

48

# Access Bulk 2.0 handler

49

bulk2_handler = sf.bulk2

50

51

# Bulk 2.0 uses direct method calls rather than object attributes

52

```

53

54

## Job Management

55

56

Comprehensive job lifecycle management with improved status monitoring and control.

57

58

```python { .api }

59

class SFBulk2Handler:

60

def create_job(

61

self,

62

operation,

63

object_name=None,

64

external_id_field=None,

65

query=None

66

):

67

"""

68

Create a new Bulk 2.0 job.

69

70

Parameters:

71

- operation: Operation type (insert, upsert, update, delete, hard_delete, query, query_all)

72

- object_name: Salesforce SObject API name (required for DML operations)

73

- external_id_field: External ID field name (required for upsert)

74

- query: SOQL query string (required for query operations)

75

76

Returns:

77

dict: Job creation response with job ID and status

78

"""

79

80

def get_job(self, job_id, is_query):

81

"""

82

Get comprehensive job information and status.

83

84

Parameters:

85

- job_id: Bulk 2.0 job identifier

86

- is_query: True for query jobs, False for ingest jobs

87

88

Returns:

89

dict: Complete job information including progress and statistics

90

"""

91

92

def close_job(self, job_id):

93

"""

94

Close ingest job to begin processing uploaded data.

95

96

Parameters:

97

- job_id: Ingest job identifier

98

99

Returns:

100

dict: Job status after closing

101

"""

102

103

def abort_job(self, job_id, is_query):

104

"""

105

Abort running job to stop processing.

106

107

Parameters:

108

- job_id: Job identifier to abort

109

- is_query: True for query jobs, False for ingest jobs

110

111

Returns:

112

dict: Job status after aborting

113

"""

114

115

def delete_job(self, job_id, is_query):

116

"""

117

Delete completed job and its data.

118

119

Parameters:

120

- job_id: Job identifier to delete

121

- is_query: True for query jobs, False for ingest jobs

122

123

Returns:

124

dict: Deletion confirmation

125

"""

126

127

def wait_for_job(self, job_id, is_query, wait=0.5):

128

"""

129

Wait for job completion with intelligent polling.

130

131

Parameters:

132

- job_id: Job identifier to monitor

133

- is_query: True for query jobs, False for ingest jobs

134

- wait: Initial polling interval in seconds

135

136

Returns:

137

dict: Final job status when completed or failed

138

"""

139

```

140

141

## Data Operations

142

143

Efficient data upload and download operations with support for large datasets.

144

145

```python { .api }

146

class SFBulk2Handler:

147

def upload_job_data(self, job_id, data, content_url=None):

148

"""

149

Upload data for ingest job processing.

150

151

Parameters:

152

- job_id: Ingest job identifier

153

- data: CSV data string or file-like object

154

- content_url: Optional alternative upload URL

155

156

Returns:

157

dict: Upload confirmation and status

158

"""

159

160

def get_query_results(

161

self,

162

job_id,

163

locator="",

164

max_records=None

165

):

166

"""

167

Get query results with pagination support.

168

169

Parameters:

170

- job_id: Query job identifier

171

- locator: Pagination locator for subsequent pages

172

- max_records: Maximum records per page (default: DEFAULT_QUERY_PAGE_SIZE)

173

174

Returns:

175

dict: Query results with data and pagination info

176

"""

177

178

def download_job_data(

179

self,

180

path,

181

job_id,

182

locator="",

183

max_records=None,

184

chunk_size=1024

185

):

186

"""

187

Download query results directly to file.

188

189

Parameters:

190

- path: Local file path for saving results

191

- job_id: Query job identifier

192

- locator: Pagination locator for specific page

193

- max_records: Maximum records per download

194

- chunk_size: File write chunk size in bytes

195

196

Returns:

197

dict: Download status and file information

198

"""

199

```

200

201

## Enums and Types

202

203

Bulk API v2.0 operation types and job states for type-safe operations.

204

205

```python { .api }

206

class Operation:

207

"""Enumeration of supported Bulk 2.0 operations."""

208

INSERT = "insert"

209

UPSERT = "upsert"

210

UPDATE = "update"

211

DELETE = "delete"

212

HARD_DELETE = "hard_delete"

213

QUERY = "query"

214

QUERY_ALL = "query_all"

215

216

class JobState:

217

"""Enumeration of Bulk 2.0 job states."""

218

OPEN = "open"

219

ABORTED = "aborted"

220

FAILED = "failed"

221

UPLOAD_COMPLETE = "upload_complete"

222

IN_PROGRESS = "in_progress"

223

JOB_COMPLETE = "job_complete"

224

225

class ColumnDelimiter:

226

"""CSV column delimiter options."""

227

COMMA = "COMMA"

228

TAB = "TAB"

229

PIPE = "PIPE"

230

SEMICOLON = "SEMICOLON"

231

CARET = "CARET"

232

233

class LineEnding:

234

"""CSV line ending options."""

235

LF = "LF" # Unix/Linux

236

CRLF = "CRLF" # Windows

237

238

class ResultsType:

239

"""Query result format types."""

240

CSV = "CSV"

241

JSON = "JSON"

242

```

243

244

## Usage Examples

245

246

### Basic Bulk 2.0 Insert

247

248

```python

249

from simple_salesforce import Salesforce

250

251

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

252

253

# Prepare CSV data

254

csv_data = """Name,Type,Industry

255

Bulk2 Account 1,Customer,Technology

256

Bulk2 Account 2,Partner,Manufacturing

257

Bulk2 Account 3,Customer,Healthcare"""

258

259

# Create ingest job

260

job_response = sf.bulk2.create_job(

261

operation='insert',

262

object_name='Account'

263

)

264

job_id = job_response['id']

265

print(f"Created job: {job_id}")

266

267

# Upload data

268

sf.bulk2.upload_job_data(job_id, csv_data)

269

270

# Close job to start processing

271

sf.bulk2.close_job(job_id)

272

273

# Wait for completion

274

final_status = sf.bulk2.wait_for_job(job_id, is_query=False)

275

276

print(f"Job completed with state: {final_status['state']}")

277

print(f"Records processed: {final_status['numberRecordsProcessed']}")

278

print(f"Records failed: {final_status['numberRecordsFailed']}")

279

```

280

281

### Bulk 2.0 Query Operations

282

283

```python

284

# Create query job

285

query_job = sf.bulk2.create_job(

286

operation='query',

287

query="SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"

288

)

289

query_job_id = query_job['id']

290

291

# Wait for query completion

292

final_query_status = sf.bulk2.wait_for_job(query_job_id, is_query=True)

293

294

if final_query_status['state'] == 'JobComplete':

295

# Get query results

296

results = sf.bulk2.get_query_results(query_job_id)

297

298

# Process CSV results

299

csv_data = results['data']

300

lines = csv_data.strip().split('\n')

301

header = lines[0].split(',')

302

303

for line in lines[1:]:

304

values = line.split(',')

305

record = dict(zip(header, values))

306

print(f"Account: {record['Name']} - {record['Type']}")

307

308

# Download large results to file

309

sf.bulk2.download_job_data(

310

'/path/to/results.csv',

311

query_job_id,

312

max_records=100000

313

)

314

```

315

316

### Advanced Bulk 2.0 Upsert

317

318

```python

319

# Prepare data with external ID

320

upsert_csv = """External_ID__c,Name,Type,Industry

321

EXT001,Upsert Account 1,Customer,Technology

322

EXT002,Upsert Account 2,Partner,Manufacturing

323

EXT003,Updated Account 3,Customer,Finance"""

324

325

# Create upsert job with external ID field

326

upsert_job = sf.bulk2.create_job(

327

operation='upsert',

328

object_name='Account',

329

external_id_field='External_ID__c'

330

)

331

upsert_job_id = upsert_job['id']

332

333

# Upload and process

334

sf.bulk2.upload_job_data(upsert_job_id, upsert_csv)

335

sf.bulk2.close_job(upsert_job_id)

336

337

# Monitor with custom polling

338

import time

339

340

while True:

341

status = sf.bulk2.get_job(upsert_job_id, is_query=False)

342

print(f"Job state: {status['state']}")

343

344

if status['state'] in ['JobComplete', 'Failed', 'Aborted']:

345

break

346

347

time.sleep(2)

348

349

# Check final results

350

final_status = sf.bulk2.get_job(upsert_job_id, is_query=False)

351

print(f"Created: {final_status['numberRecordsProcessed'] - final_status['numberRecordsFailed']}")

352

print(f"Failed: {final_status['numberRecordsFailed']}")

353

```

354

355

### Bulk 2.0 with Error Handling

356

357

```python

358

def bulk2_insert_with_monitoring(sf, object_name, csv_data):

359

"""Bulk 2.0 insert with comprehensive error handling."""

360

361

try:

362

# Create job

363

job_response = sf.bulk2.create_job(

364

operation='insert',

365

object_name=object_name

366

)

367

job_id = job_response['id']

368

369

print(f"Created job {job_id} for {object_name}")

370

371

# Upload data

372

upload_response = sf.bulk2.upload_job_data(job_id, csv_data)

373

print("Data uploaded successfully")

374

375

# Close job

376

close_response = sf.bulk2.close_job(job_id)

377

print(f"Job closed, state: {close_response['state']}")

378

379

# Wait for completion with timeout

380

start_time = time.time()

381

timeout = 300 # 5 minutes

382

383

while True:

384

status = sf.bulk2.get_job(job_id, is_query=False)

385

elapsed = time.time() - start_time

386

387

print(f"Job state: {status['state']} (elapsed: {elapsed:.1f}s)")

388

389

if status['state'] == 'JobComplete':

390

print("Job completed successfully!")

391

return {

392

'job_id': job_id,

393

'success': True,

394

'processed': status['numberRecordsProcessed'],

395

'failed': status['numberRecordsFailed']

396

}

397

elif status['state'] in ['Failed', 'Aborted']:

398

print(f"Job failed with state: {status['state']}")

399

return {

400

'job_id': job_id,

401

'success': False,

402

'error': status.get('stateMessage', 'Unknown error')

403

}

404

elif elapsed > timeout:

405

# Abort timed-out job

406

sf.bulk2.abort_job(job_id, is_query=False)

407

raise TimeoutError(f"Job {job_id} timed out after {timeout} seconds")

408

409

time.sleep(5)

410

411

except Exception as e:

412

print(f"Error in bulk operation: {e}")

413

# Clean up job if possible

414

try:

415

sf.bulk2.abort_job(job_id, is_query=False)

416

except:

417

pass

418

raise

419

420

# Usage

421

result = bulk2_insert_with_monitoring(sf, 'Contact', contact_csv_data)

422

if result['success']:

423

print(f"Processed {result['processed']} records")

424

else:

425

print(f"Job failed: {result['error']}")

426

```

427

428

### Large Dataset Query with Pagination

429

430

```python

431

def bulk2_query_all_pages(sf, query, output_dir):

432

"""Query large dataset with automatic pagination."""

433

434

# Create query job

435

query_job = sf.bulk2.create_job(operation='query', query=query)

436

job_id = query_job['id']

437

438

# Wait for completion

439

sf.bulk2.wait_for_job(job_id, is_query=True)

440

441

page_num = 1

442

locator = ""

443

all_records = []

444

445

while True:

446

# Get page of results

447

page_results = sf.bulk2.get_query_results(

448

job_id,

449

locator=locator,

450

max_records=50000

451

)

452

453

# Download page to file

454

page_file = f"{output_dir}/page_{page_num}.csv"

455

sf.bulk2.download_job_data(

456

page_file,

457

job_id,

458

locator=locator,

459

max_records=50000

460

)

461

462

print(f"Downloaded page {page_num} to {page_file}")

463

464

# Check for more pages

465

if 'nextRecordsUrl' not in page_results or not page_results['nextRecordsUrl']:

466

break

467

468

# Extract locator for next page

469

locator = page_results['nextRecordsUrl'].split('locator=')[1]

470

page_num += 1

471

472

print(f"Downloaded {page_num} pages of results")

473

return page_num

474

475

# Usage

476

pages_downloaded = bulk2_query_all_pages(

477

sf,

478

"SELECT Id, Name, Email FROM Contact WHERE CreatedDate = LAST_N_DAYS:30",

479

"/tmp/bulk_results"

480

)

481

```

482

483

### Bulk 2.0 Job Management

484

485

```python

486

def manage_bulk2_jobs(sf):

487

"""Example of advanced job management operations."""

488

489

# Create multiple jobs

490

jobs = []

491

492

# Insert job

493

insert_job = sf.bulk2.create_job(operation='insert', object_name='Account')

494

jobs.append(('insert', insert_job['id']))

495

496

# Query job

497

query_job = sf.bulk2.create_job(

498

operation='query',

499

query="SELECT Id, Name FROM Account LIMIT 1000"

500

)

501

jobs.append(('query', query_job['id']))

502

503

# Monitor all jobs

504

for job_type, job_id in jobs:

505

is_query = (job_type == 'query')

506

507

# Get job status

508

status = sf.bulk2.get_job(job_id, is_query=is_query)

509

print(f"{job_type.title()} job {job_id}: {status['state']}")

510

511

# Abort if needed

512

if status['state'] == 'InProgress':

513

print(f"Aborting {job_type} job {job_id}")

514

sf.bulk2.abort_job(job_id, is_query=is_query)

515

516

# Delete completed jobs

517

elif status['state'] in ['JobComplete', 'Failed', 'Aborted']:

518

print(f"Deleting {job_type} job {job_id}")

519

sf.bulk2.delete_job(job_id, is_query=is_query)

520

521

# Usage

522

manage_bulk2_jobs(sf)

523

```

524

525

## Utility Functions

526

527

Data processing utilities for Bulk API v2.0 operations.

528

529

```python { .api }

530

class SFBulk2Handler:

531

def filter_null_bytes(self, b):

532

"""

533

Filter null bytes from strings or bytes objects.

534

535

Parameters:

536

- b: String or bytes object to filter

537

538

Returns:

539

str|bytes: Filtered content with null bytes removed

540

"""

541

```

542

543

## Best Practices

544

545

### Data Preparation for Bulk 2.0

546

547

```python

548

def prepare_csv_for_bulk2(records, field_mapping=None):

549

"""Prepare record data as CSV for Bulk 2.0 operations."""

550

551

if not records:

552

return ""

553

554

# Apply field mapping if provided

555

if field_mapping:

556

records = [

557

{field_mapping.get(k, k): v for k, v in record.items()}

558

for record in records

559

]

560

561

# Get headers from first record

562

headers = list(records[0].keys())

563

564

# Build CSV

565

csv_lines = [','.join(headers)]

566

567

for record in records:

568

values = []

569

for header in headers:

570

value = record.get(header, '')

571

572

# Handle CSV escaping

573

if isinstance(value, str):

574

if ',' in value or '"' in value or '\n' in value:

575

value = f'"{value.replace('"', '""')}"'

576

577

values.append(str(value))

578

579

csv_lines.append(','.join(values))

580

581

return '\n'.join(csv_lines)

582

583

# Usage

584

csv_data = prepare_csv_for_bulk2(

585

account_records,

586

field_mapping={'company_name': 'Name', 'account_type': 'Type'}

587

)

588

```

589

590

### Error Analysis

591

592

```python

593

def analyze_bulk2_results(sf, job_id):

594

"""Analyze Bulk 2.0 job results for errors and success rates."""

595

596

# Get final job status

597

job_status = sf.bulk2.get_job(job_id, is_query=False)

598

599

total_records = job_status['numberRecordsProcessed']

600

failed_records = job_status['numberRecordsFailed']

601

success_records = total_records - failed_records

602

603

success_rate = (success_records / total_records * 100) if total_records > 0 else 0

604

605

analysis = {

606

'job_id': job_id,

607

'total_records': total_records,

608

'successful_records': success_records,

609

'failed_records': failed_records,

610

'success_rate': f"{success_rate:.1f}%",

611

'job_state': job_status['state']

612

}

613

614

# Add error details if available

615

if 'stateMessage' in job_status:

616

analysis['error_message'] = job_status['stateMessage']

617

618

return analysis

619

620

# Usage

621

results = analyze_bulk2_results(sf, completed_job_id)

622

print(f"Job {results['job_id']}: {results['success_rate']} success rate")

623

```

624

625

### Performance Optimization

626

627

```python

628

def optimize_bulk2_performance():

629

"""Best practices for Bulk 2.0 performance optimization."""

630

631

recommendations = {

632

'batch_sizing': 'Use up to 100MB per job (no artificial batch limits)',

633

'data_format': 'Use CSV format for better performance vs JSON',

634

'field_selection': 'Only include necessary fields in queries',

635

'parallel_jobs': 'Run multiple concurrent jobs for different objects',

636

'monitoring': 'Use wait_for_job() with appropriate polling intervals',

637

'cleanup': 'Delete completed jobs to avoid storage limits'

638

}

639

640

return recommendations

641

642

# Example of concurrent processing

643

import concurrent.futures

644

import threading

645

646

def process_object_bulk2(sf, object_name, data):

647

"""Process single object with Bulk 2.0."""

648

csv_data = prepare_csv_for_bulk2(data)

649

650

job = sf.bulk2.create_job(operation='insert', object_name=object_name)

651

sf.bulk2.upload_job_data(job['id'], csv_data)

652

sf.bulk2.close_job(job['id'])

653

654

return sf.bulk2.wait_for_job(job['id'], is_query=False)

655

656

# Process multiple objects concurrently

657

objects_data = {

658

'Account': account_records,

659

'Contact': contact_records,

660

'Opportunity': opportunity_records

661

}

662

663

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:

664

futures = {

665

executor.submit(process_object_bulk2, sf, obj_name, data): obj_name

666

for obj_name, data in objects_data.items()

667

}

668

669

for future in concurrent.futures.as_completed(futures):

670

obj_name = futures[future]

671

try:

672

result = future.result()

673

print(f"{obj_name}: {result['state']}")

674

except Exception as e:

675

print(f"{obj_name} failed: {e}")

676

```