or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdbulk-operations.mdbulk2-operations.mdexceptions.mdindex.mdmetadata-api.mdrest-api.mdutilities.md

bulk-operations.mddocs/

0

# Bulk API v1.0 Operations

1

2

High-performance bulk operations for large-scale data manipulation using Salesforce's original Bulk API. This interface supports insert, update, upsert, delete, and query operations with automatic batching for processing thousands to millions of records efficiently.

3

4

## SFBulkHandler Class

5

6

The main handler class for Bulk API v1.0 operations, providing access to bulk functionality and managing job lifecycles.

7

8

```python { .api }

9

class SFBulkHandler:

10

def __init__(

11

self,

12

session_id,

13

bulk_url,

14

proxies=None,

15

session=None

16

):

17

"""

18

Initialize Bulk API v1.0 handler.

19

20

Parameters:

21

- session_id: Authenticated Salesforce session ID

22

- bulk_url: Bulk API endpoint URL

23

- proxies: HTTP proxy configuration dictionary

24

- session: Optional custom requests.Session object

25

"""

26

```

27

28

### Accessing Bulk Operations

29

30

The SFBulkHandler is accessed through the `bulk` property of the main Salesforce client:

31

32

```python

33

from simple_salesforce import Salesforce

34

35

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

36

37

# Access bulk handler

38

bulk_handler = sf.bulk

39

40

# Access specific object types for bulk operations

41

bulk_accounts = bulk_handler.Account

42

bulk_contacts = bulk_handler.Contact

43

bulk_custom = bulk_handler.MyCustomObject__c

44

```

45

46

### Generic DML Operations

47

48

High-level method for any bulk DML operation with automatic job and batch management.

49

50

```python { .api }

51

class SFBulkHandler:

52

def submit_dml(

53

self,

54

object_name,

55

dml,

56

data,

57

external_id_field=None,

58

batch_size=10000,

59

use_serial=False,

60

bypass_results=False,

61

include_detailed_results=False

62

):

63

"""

64

Submit any DML operation for bulk processing.

65

66

Parameters:

67

- object_name: Salesforce SObject API name

68

- dml: DML operation ('insert', 'update', 'upsert', 'delete', 'hard_delete')

69

- data: List of record dictionaries or CSV string

70

- external_id_field: External ID field name (required for upsert)

71

- batch_size: Records per batch (max 10,000)

72

- use_serial: Process batches sequentially vs parallel

73

- bypass_results: Skip downloading results for faster processing

74

- include_detailed_results: Include detailed success/error info

75

76

Returns:

77

list: Results from all batches, containing success/error details per record

78

"""

79

```

80

81

## SFBulkType Class

82

83

Interface for Bulk API v1.0 operations on specific SObject types, providing convenient methods for each DML operation type.

84

85

```python { .api }

86

class SFBulkType:

87

def __init__(

88

self,

89

object_name,

90

bulk_url,

91

headers,

92

session

93

):

94

"""

95

Initialize bulk operations for specific SObject type.

96

97

Parameters:

98

- object_name: Salesforce SObject API name

99

- bulk_url: Bulk API endpoint URL

100

- headers: HTTP headers for authentication

101

- session: requests.Session object

102

"""

103

```

104

105

### DML Operations

106

107

All standard DML operations with consistent parameter interface and automatic batching.

108

109

```python { .api }

110

class SFBulkType:

111

def insert(

112

self,

113

data,

114

batch_size=10000,

115

use_serial=False,

116

bypass_results=False,

117

include_detailed_results=False

118

):

119

"""

120

Bulk insert records.

121

122

Parameters:

123

- data: List of record dictionaries or CSV string

124

- batch_size: Records per batch (max 10,000)

125

- use_serial: Process batches sequentially

126

- bypass_results: Skip downloading results

127

- include_detailed_results: Include detailed success/error info

128

129

Returns:

130

list: Insert results with record IDs and success status

131

"""

132

133

def update(

134

self,

135

data,

136

batch_size=10000,

137

use_serial=False,

138

bypass_results=False,

139

include_detailed_results=False

140

):

141

"""

142

Bulk update records (requires Id field in data).

143

144

Parameters:

145

- data: List of record dictionaries with Id field or CSV string

146

- batch_size: Records per batch (max 10,000)

147

- use_serial: Process batches sequentially

148

- bypass_results: Skip downloading results

149

- include_detailed_results: Include detailed success/error info

150

151

Returns:

152

list: Update results with success status per record

153

"""

154

155

def upsert(

156

self,

157

data,

158

external_id_field,

159

batch_size=10000,

160

use_serial=False,

161

bypass_results=False,

162

include_detailed_results=False

163

):

164

"""

165

Bulk upsert records using external ID field.

166

167

Parameters:

168

- data: List of record dictionaries or CSV string

169

- external_id_field: External ID field API name for matching

170

- batch_size: Records per batch (max 10,000)

171

- use_serial: Process batches sequentially

172

- bypass_results: Skip downloading results

173

- include_detailed_results: Include detailed success/error info

174

175

Returns:

176

list: Upsert results with created/updated status per record

177

"""

178

179

def delete(

180

self,

181

data,

182

batch_size=10000,

183

use_serial=False,

184

bypass_results=False,

185

include_detailed_results=False

186

):

187

"""

188

Bulk soft delete records (requires Id field in data).

189

190

Parameters:

191

- data: List of record dictionaries with Id field or CSV string

192

- batch_size: Records per batch (max 10,000)

193

- use_serial: Process batches sequentially

194

- bypass_results: Skip downloading results

195

- include_detailed_results: Include detailed success/error info

196

197

Returns:

198

list: Delete results with success status per record

199

"""

200

201

def hard_delete(

202

self,

203

data,

204

batch_size=10000,

205

use_serial=False,

206

bypass_results=False,

207

include_detailed_results=False

208

):

209

"""

210

Bulk hard delete records (permanently removes from Recycle Bin).

211

212

Parameters:

213

- data: List of record dictionaries with Id field or CSV string

214

- batch_size: Records per batch (max 10,000)

215

- use_serial: Process batches sequentially

216

- bypass_results: Skip downloading results

217

- include_detailed_results: Include detailed success/error info

218

219

Returns:

220

list: Hard delete results with success status per record

221

"""

222

```

223

224

### Query Operations

225

226

Bulk query capabilities for retrieving large datasets efficiently.

227

228

```python { .api }

229

class SFBulkType:

230

def query(self, data, lazy_operation=False, wait=5):

231

"""

232

Execute bulk query to retrieve large datasets.

233

234

Parameters:

235

- data: SOQL query string

236

- lazy_operation: Return job info instead of waiting for completion

237

- wait: Polling interval in seconds for job completion

238

239

Returns:

240

list|dict: Query results or job information if lazy_operation=True

241

"""

242

243

def query_all(self, data, lazy_operation=False, wait=5):

244

"""

245

Execute bulk queryAll to include deleted and archived records.

246

247

Parameters:

248

- data: SOQL query string

249

- lazy_operation: Return job info instead of waiting for completion

250

- wait: Polling interval in seconds for job completion

251

252

Returns:

253

list|dict: Query results including deleted records or job info

254

"""

255

```

256

257

### Generic Operations

258

259

Flexible method for any DML operation type.

260

261

```python { .api }

262

class SFBulkType:

263

def submit_dml(

264

self,

265

function_name,

266

data,

267

external_id_field=None,

268

batch_size=10000,

269

use_serial=False,

270

bypass_results=False,

271

include_detailed_results=False

272

):

273

"""

274

Submit generic DML operation for this SObject type.

275

276

Parameters:

277

- function_name: DML operation name ('insert', 'update', etc.)

278

- data: List of record dictionaries or CSV string

279

- external_id_field: External ID field (for upsert operations)

280

- batch_size: Records per batch (max 10,000)

281

- use_serial: Process batches sequentially

282

- bypass_results: Skip downloading results

283

- include_detailed_results: Include detailed success/error info

284

285

Returns:

286

list: Operation results with success/error details per record

287

"""

288

```

289

290

## Usage Examples

291

292

### Basic Bulk Insert

293

294

```python

295

from simple_salesforce import Salesforce

296

297

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

298

299

# Prepare data for bulk insert

300

accounts_data = [

301

{'Name': 'Bulk Account 1', 'Type': 'Customer', 'Industry': 'Technology'},

302

{'Name': 'Bulk Account 2', 'Type': 'Partner', 'Industry': 'Manufacturing'},

303

{'Name': 'Bulk Account 3', 'Type': 'Customer', 'Industry': 'Healthcare'}

304

# ... up to 10,000 records per batch

305

]

306

307

# Execute bulk insert

308

insert_results = sf.bulk.Account.insert(accounts_data)

309

310

# Process results

311

for i, result in enumerate(insert_results):

312

if result['success']:

313

print(f"Account {i+1} created with ID: {result['id']}")

314

else:

315

print(f"Account {i+1} failed: {result['error']}")

316

```

317

318

### Bulk Update with Error Handling

319

320

```python

321

# Prepare update data (must include Id field)

322

update_data = [

323

{'Id': '001XX000003DHPr', 'Phone': '555-123-4567'},

324

{'Id': '001XX000003DHPs', 'Phone': '555-234-5678'},

325

{'Id': '001XX000003DHPt', 'Phone': '555-345-6789'}

326

]

327

328

try:

329

update_results = sf.bulk.Account.update(

330

update_data,

331

batch_size=5000,

332

include_detailed_results=True

333

)

334

335

success_count = sum(1 for r in update_results if r['success'])

336

error_count = len(update_results) - success_count

337

338

print(f"Updated {success_count} records successfully")

339

print(f"Failed to update {error_count} records")

340

341

# Handle errors

342

for result in update_results:

343

if not result['success']:

344

print(f"Error updating {result['id']}: {result['error']}")

345

346

except Exception as e:

347

print(f"Bulk update failed: {e}")

348

```

349

350

### Bulk Upsert with External ID

351

352

```python

353

# Data with external ID field

354

upsert_data = [

355

{'External_ID__c': 'EXT001', 'Name': 'Upsert Account 1', 'Type': 'Customer'},

356

{'External_ID__c': 'EXT002', 'Name': 'Upsert Account 2', 'Type': 'Partner'},

357

{'External_ID__c': 'EXT003', 'Name': 'Updated Account 3', 'Industry': 'Technology'}

358

]

359

360

# Execute upsert using external ID field

361

upsert_results = sf.bulk.Account.upsert(

362

upsert_data,

363

external_id_field='External_ID__c',

364

batch_size=1000

365

)

366

367

# Check created vs updated records

368

for result in upsert_results:

369

if result['success']:

370

action = 'Created' if result['created'] else 'Updated'

371

print(f"{action} record ID: {result['id']}")

372

```

373

374

### Bulk Query for Large Datasets

375

376

```python

377

# Query large dataset using bulk API

378

query = "SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"

379

380

query_results = sf.bulk.Account.query(query)

381

382

print(f"Retrieved {len(query_results)} accounts")

383

for record in query_results[:10]: # Show first 10

384

print(f"Account: {record['Name']} - {record['Type']}")

385

386

# Query including deleted records

387

deleted_query = "SELECT Id, Name FROM Account WHERE IsDeleted = true"

388

deleted_results = sf.bulk.Account.query_all(deleted_query)

389

print(f"Found {len(deleted_results)} deleted accounts")

390

```

391

392

### Performance Optimization

393

394

```python

395

# Large dataset with performance optimizations

396

large_dataset = generate_large_dataset(50000) # 50K records

397

398

# Use larger batches for better throughput

399

results = sf.bulk.Contact.insert(

400

large_dataset,

401

batch_size=10000, # Maximum batch size

402

use_serial=False, # Parallel processing

403

bypass_results=True # Skip result download for speed

404

)

405

406

# For operations where you need results but want speed

407

results = sf.bulk.Contact.insert(

408

large_dataset,

409

batch_size=10000,

410

use_serial=False,

411

bypass_results=False,

412

include_detailed_results=False # Less detail = faster processing

413

)

414

```

415

416

### CSV Data Processing

417

418

```python

419

# Work with CSV data directly

420

csv_data = """Name,Type,Industry

421

CSV Account 1,Customer,Technology

422

CSV Account 2,Partner,Manufacturing

423

CSV Account 3,Customer,Healthcare"""

424

425

# Insert CSV data

426

csv_results = sf.bulk.Account.insert(csv_data)

427

428

# Read CSV file and process

429

with open('accounts.csv', 'r') as csvfile:

430

csv_content = csvfile.read()

431

432

bulk_results = sf.bulk.Account.insert(

433

csv_content,

434

batch_size=5000,

435

include_detailed_results=True

436

)

437

```

438

439

### Job Management and Monitoring

440

441

```python

442

# For long-running operations, use lazy mode

443

job_info = sf.bulk.Account.insert(

444

large_dataset,

445

lazy_operation=True # Returns job info instead of waiting

446

)

447

448

print(f"Started bulk job: {job_info['jobId']}")

449

450

# Monitor job progress (would need custom polling)

451

# This is handled automatically by default, but lazy mode gives you control

452

```

453

454

## Job Management Methods (Advanced)

455

456

Lower-level job management methods for advanced use cases and custom workflows.

457

458

```python { .api }

459

class SFBulkType:

460

def _create_job(self, operation, use_serial, external_id_field=None):

461

"""

462

Create a new bulk job (internal method).

463

464

Parameters:

465

- operation: Bulk operation type

466

- use_serial: Sequential vs parallel batch processing

467

- external_id_field: External ID field name (for upsert)

468

469

Returns:

470

dict: Job creation response with job ID

471

"""

472

473

def _close_job(self, job_id):

474

"""

475

Close a bulk job to stop accepting new batches.

476

477

Parameters:

478

- job_id: Bulk job identifier

479

480

Returns:

481

dict: Job status after closing

482

"""

483

484

def _get_job(self, job_id):

485

"""

486

Get current job status and information.

487

488

Parameters:

489

- job_id: Bulk job identifier

490

491

Returns:

492

dict: Complete job status and statistics

493

"""

494

495

def _add_batch(self, job_id, data, operation):

496

"""

497

Add a batch of records to an existing job.

498

499

Parameters:

500

- job_id: Bulk job identifier

501

- data: Record data for the batch

502

- operation: Operation type for data formatting

503

504

Returns:

505

dict: Batch creation response with batch ID

506

"""

507

508

def _get_batch(self, job_id, batch_id):

509

"""

510

Get batch status and processing information.

511

512

Parameters:

513

- job_id: Bulk job identifier

514

- batch_id: Batch identifier within the job

515

516

Returns:

517

dict: Batch status and statistics

518

"""

519

520

def _get_batch_results(self, job_id, batch_id, operation):

521

"""

522

Retrieve results for a completed batch.

523

524

Parameters:

525

- job_id: Bulk job identifier

526

- batch_id: Batch identifier

527

- operation: Operation type for result parsing

528

529

Returns:

530

list: Batch results with success/error details per record

531

"""

532

```

533

534

## Best Practices

535

536

### Data Preparation

537

538

```python

539

# Ensure data is properly formatted

540

def prepare_bulk_data(records):

541

"""Prepare records for bulk operations."""

542

prepared = []

543

for record in records:

544

# Remove None values

545

clean_record = {k: v for k, v in record.items() if v is not None}

546

547

# Ensure required fields are present

548

if 'Name' not in clean_record:

549

clean_record['Name'] = 'Default Name'

550

551

prepared.append(clean_record)

552

553

return prepared

554

555

# Use prepared data

556

clean_data = prepare_bulk_data(raw_data)

557

results = sf.bulk.Account.insert(clean_data)

558

```

559

560

### Error Handling and Retry Logic

561

562

```python

563

def bulk_insert_with_retry(bulk_type, data, max_retries=3):

564

"""Bulk insert with retry logic for failed records."""

565

566

for attempt in range(max_retries):

567

try:

568

results = bulk_type.insert(

569

data,

570

include_detailed_results=True

571

)

572

573

# Separate successful and failed records

574

failed_data = []

575

for i, result in enumerate(results):

576

if not result['success']:

577

failed_data.append(data[i])

578

print(f"Failed record: {result['error']}")

579

580

if not failed_data:

581

print(f"All records processed successfully on attempt {attempt + 1}")

582

return results

583

584

# Retry with failed records only

585

data = failed_data

586

print(f"Retrying {len(failed_data)} failed records...")

587

588

except Exception as e:

589

print(f"Attempt {attempt + 1} failed with error: {e}")

590

if attempt == max_retries - 1:

591

raise

592

593

return results

594

595

# Usage

596

results = bulk_insert_with_retry(sf.bulk.Account, account_data)

597

```

598

599

### Memory Management for Large Datasets

600

601

```python

602

def process_large_file(filename, bulk_type, chunk_size=10000):

603

"""Process large CSV files in chunks to manage memory."""

604

605

with open(filename, 'r') as file:

606

header = file.readline().strip().split(',')

607

chunk = []

608

609

for line_num, line in enumerate(file, 1):

610

values = line.strip().split(',')

611

record = dict(zip(header, values))

612

chunk.append(record)

613

614

if len(chunk) >= chunk_size:

615

# Process chunk

616

results = bulk_type.insert(chunk, bypass_results=True)

617

print(f"Processed chunk ending at line {line_num}")

618

chunk = []

619

620

# Process remaining records

621

if chunk:

622

results = bulk_type.insert(chunk, bypass_results=True)

623

print(f"Processed final chunk of {len(chunk)} records")

624

625

# Usage

626

process_large_file('massive_accounts.csv', sf.bulk.Account)

627

```