0
# Bulk API v1.0 Operations
1
2
High-performance bulk operations for large-scale data manipulation using Salesforce's original Bulk API. This interface supports insert, update, upsert, delete, and query operations with automatic batching for processing thousands to millions of records efficiently.
3
4
## SFBulkHandler Class
5
6
The main handler class for Bulk API v1.0 operations, providing access to bulk functionality and managing job lifecycles.
7
8
```python { .api }
9
class SFBulkHandler:
10
def __init__(
11
self,
12
session_id,
13
bulk_url,
14
proxies=None,
15
session=None
16
):
17
"""
18
Initialize Bulk API v1.0 handler.
19
20
Parameters:
21
- session_id: Authenticated Salesforce session ID
22
- bulk_url: Bulk API endpoint URL
23
- proxies: HTTP proxy configuration dictionary
24
- session: Optional custom requests.Session object
25
"""
26
```
27
28
### Accessing Bulk Operations
29
30
The SFBulkHandler is accessed through the `bulk` property of the main Salesforce client:
31
32
```python
33
from simple_salesforce import Salesforce
34
35
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
36
37
# Access bulk handler
38
bulk_handler = sf.bulk
39
40
# Access specific object types for bulk operations
41
bulk_accounts = bulk_handler.Account
42
bulk_contacts = bulk_handler.Contact
43
bulk_custom = bulk_handler.MyCustomObject__c
44
```
45
46
### Generic DML Operations
47
48
High-level method for any bulk DML operation with automatic job and batch management.
49
50
```python { .api }
51
class SFBulkHandler:
52
def submit_dml(
53
self,
54
object_name,
55
dml,
56
data,
57
external_id_field=None,
58
batch_size=10000,
59
use_serial=False,
60
bypass_results=False,
61
include_detailed_results=False
62
):
63
"""
64
Submit any DML operation for bulk processing.
65
66
Parameters:
67
- object_name: Salesforce SObject API name
68
- dml: DML operation ('insert', 'update', 'upsert', 'delete', 'hard_delete')
69
- data: List of record dictionaries or CSV string
70
- external_id_field: External ID field name (required for upsert)
71
- batch_size: Records per batch (max 10,000)
72
- use_serial: Process batches sequentially vs parallel
73
- bypass_results: Skip downloading results for faster processing
74
- include_detailed_results: Include detailed success/error info
75
76
Returns:
77
list: Results from all batches, containing success/error details per record
78
"""
79
```
80
81
## SFBulkType Class
82
83
Interface for Bulk API v1.0 operations on specific SObject types, providing convenient methods for each DML operation type.
84
85
```python { .api }
86
class SFBulkType:
87
def __init__(
88
self,
89
object_name,
90
bulk_url,
91
headers,
92
session
93
):
94
"""
95
Initialize bulk operations for specific SObject type.
96
97
Parameters:
98
- object_name: Salesforce SObject API name
99
- bulk_url: Bulk API endpoint URL
100
- headers: HTTP headers for authentication
101
- session: requests.Session object
102
"""
103
```
104
105
### DML Operations
106
107
All standard DML operations with consistent parameter interface and automatic batching.
108
109
```python { .api }
110
class SFBulkType:
111
def insert(
112
self,
113
data,
114
batch_size=10000,
115
use_serial=False,
116
bypass_results=False,
117
include_detailed_results=False
118
):
119
"""
120
Bulk insert records.
121
122
Parameters:
123
- data: List of record dictionaries or CSV string
124
- batch_size: Records per batch (max 10,000)
125
- use_serial: Process batches sequentially
126
- bypass_results: Skip downloading results
127
- include_detailed_results: Include detailed success/error info
128
129
Returns:
130
list: Insert results with record IDs and success status
131
"""
132
133
def update(
134
self,
135
data,
136
batch_size=10000,
137
use_serial=False,
138
bypass_results=False,
139
include_detailed_results=False
140
):
141
"""
142
Bulk update records (requires Id field in data).
143
144
Parameters:
145
- data: List of record dictionaries with Id field or CSV string
146
- batch_size: Records per batch (max 10,000)
147
- use_serial: Process batches sequentially
148
- bypass_results: Skip downloading results
149
- include_detailed_results: Include detailed success/error info
150
151
Returns:
152
list: Update results with success status per record
153
"""
154
155
def upsert(
156
self,
157
data,
158
external_id_field,
159
batch_size=10000,
160
use_serial=False,
161
bypass_results=False,
162
include_detailed_results=False
163
):
164
"""
165
Bulk upsert records using external ID field.
166
167
Parameters:
168
- data: List of record dictionaries or CSV string
169
- external_id_field: External ID field API name for matching
170
- batch_size: Records per batch (max 10,000)
171
- use_serial: Process batches sequentially
172
- bypass_results: Skip downloading results
173
- include_detailed_results: Include detailed success/error info
174
175
Returns:
176
list: Upsert results with created/updated status per record
177
"""
178
179
def delete(
180
self,
181
data,
182
batch_size=10000,
183
use_serial=False,
184
bypass_results=False,
185
include_detailed_results=False
186
):
187
"""
188
Bulk soft delete records (requires Id field in data).
189
190
Parameters:
191
- data: List of record dictionaries with Id field or CSV string
192
- batch_size: Records per batch (max 10,000)
193
- use_serial: Process batches sequentially
194
- bypass_results: Skip downloading results
195
- include_detailed_results: Include detailed success/error info
196
197
Returns:
198
list: Delete results with success status per record
199
"""
200
201
def hard_delete(
202
self,
203
data,
204
batch_size=10000,
205
use_serial=False,
206
bypass_results=False,
207
include_detailed_results=False
208
):
209
"""
210
Bulk hard delete records (permanently removes from Recycle Bin).
211
212
Parameters:
213
- data: List of record dictionaries with Id field or CSV string
214
- batch_size: Records per batch (max 10,000)
215
- use_serial: Process batches sequentially
216
- bypass_results: Skip downloading results
217
- include_detailed_results: Include detailed success/error info
218
219
Returns:
220
list: Hard delete results with success status per record
221
"""
222
```
223
224
### Query Operations
225
226
Bulk query capabilities for retrieving large datasets efficiently.
227
228
```python { .api }
229
class SFBulkType:
230
def query(self, data, lazy_operation=False, wait=5):
231
"""
232
Execute bulk query to retrieve large datasets.
233
234
Parameters:
235
- data: SOQL query string
236
- lazy_operation: Return job info instead of waiting for completion
237
- wait: Polling interval in seconds for job completion
238
239
Returns:
240
list|dict: Query results or job information if lazy_operation=True
241
"""
242
243
def query_all(self, data, lazy_operation=False, wait=5):
244
"""
245
Execute bulk queryAll to include deleted and archived records.
246
247
Parameters:
248
- data: SOQL query string
249
- lazy_operation: Return job info instead of waiting for completion
250
- wait: Polling interval in seconds for job completion
251
252
Returns:
253
list|dict: Query results including deleted records or job info
254
"""
255
```
256
257
### Generic Operations
258
259
Flexible method for any DML operation type.
260
261
```python { .api }
262
class SFBulkType:
263
def submit_dml(
264
self,
265
function_name,
266
data,
267
external_id_field=None,
268
batch_size=10000,
269
use_serial=False,
270
bypass_results=False,
271
include_detailed_results=False
272
):
273
"""
274
Submit generic DML operation for this SObject type.
275
276
Parameters:
277
- function_name: DML operation name ('insert', 'update', etc.)
278
- data: List of record dictionaries or CSV string
279
- external_id_field: External ID field (for upsert operations)
280
- batch_size: Records per batch (max 10,000)
281
- use_serial: Process batches sequentially
282
- bypass_results: Skip downloading results
283
- include_detailed_results: Include detailed success/error info
284
285
Returns:
286
list: Operation results with success/error details per record
287
"""
288
```
289
290
## Usage Examples
291
292
### Basic Bulk Insert
293
294
```python
295
from simple_salesforce import Salesforce
296
297
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
298
299
# Prepare data for bulk insert
300
accounts_data = [
301
{'Name': 'Bulk Account 1', 'Type': 'Customer', 'Industry': 'Technology'},
302
{'Name': 'Bulk Account 2', 'Type': 'Partner', 'Industry': 'Manufacturing'},
303
{'Name': 'Bulk Account 3', 'Type': 'Customer', 'Industry': 'Healthcare'}
304
# ... up to 10,000 records per batch
305
]
306
307
# Execute bulk insert
308
insert_results = sf.bulk.Account.insert(accounts_data)
309
310
# Process results
311
for i, result in enumerate(insert_results):
312
if result['success']:
313
print(f"Account {i+1} created with ID: {result['id']}")
314
else:
315
print(f"Account {i+1} failed: {result['error']}")
316
```
317
318
### Bulk Update with Error Handling
319
320
```python
321
# Prepare update data (must include Id field)
322
update_data = [
323
{'Id': '001XX000003DHPr', 'Phone': '555-123-4567'},
324
{'Id': '001XX000003DHPs', 'Phone': '555-234-5678'},
325
{'Id': '001XX000003DHPt', 'Phone': '555-345-6789'}
326
]
327
328
try:
329
update_results = sf.bulk.Account.update(
330
update_data,
331
batch_size=5000,
332
include_detailed_results=True
333
)
334
335
success_count = sum(1 for r in update_results if r['success'])
336
error_count = len(update_results) - success_count
337
338
print(f"Updated {success_count} records successfully")
339
print(f"Failed to update {error_count} records")
340
341
# Handle errors
342
for result in update_results:
343
if not result['success']:
344
print(f"Error updating {result['id']}: {result['error']}")
345
346
except Exception as e:
347
print(f"Bulk update failed: {e}")
348
```
349
350
### Bulk Upsert with External ID
351
352
```python
353
# Data with external ID field
354
upsert_data = [
355
{'External_ID__c': 'EXT001', 'Name': 'Upsert Account 1', 'Type': 'Customer'},
356
{'External_ID__c': 'EXT002', 'Name': 'Upsert Account 2', 'Type': 'Partner'},
357
{'External_ID__c': 'EXT003', 'Name': 'Updated Account 3', 'Industry': 'Technology'}
358
]
359
360
# Execute upsert using external ID field
361
upsert_results = sf.bulk.Account.upsert(
362
upsert_data,
363
external_id_field='External_ID__c',
364
batch_size=1000
365
)
366
367
# Check created vs updated records
368
for result in upsert_results:
369
if result['success']:
370
action = 'Created' if result['created'] else 'Updated'
371
print(f"{action} record ID: {result['id']}")
372
```
373
374
### Bulk Query for Large Datasets
375
376
```python
377
# Query large dataset using bulk API
378
query = "SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"
379
380
query_results = sf.bulk.Account.query(query)
381
382
print(f"Retrieved {len(query_results)} accounts")
383
for record in query_results[:10]: # Show first 10
384
print(f"Account: {record['Name']} - {record['Type']}")
385
386
# Query including deleted records
387
deleted_query = "SELECT Id, Name FROM Account WHERE IsDeleted = true"
388
deleted_results = sf.bulk.Account.query_all(deleted_query)
389
print(f"Found {len(deleted_results)} deleted accounts")
390
```
391
392
### Performance Optimization
393
394
```python
395
# Large dataset with performance optimizations
396
large_dataset = generate_large_dataset(50000) # 50K records
397
398
# Use larger batches for better throughput
399
results = sf.bulk.Contact.insert(
400
large_dataset,
401
batch_size=10000, # Maximum batch size
402
use_serial=False, # Parallel processing
403
bypass_results=True # Skip result download for speed
404
)
405
406
# For operations where you need results but want speed
407
results = sf.bulk.Contact.insert(
408
large_dataset,
409
batch_size=10000,
410
use_serial=False,
411
bypass_results=False,
412
include_detailed_results=False # Less detail = faster processing
413
)
414
```
415
416
### CSV Data Processing
417
418
```python
419
# Work with CSV data directly
420
csv_data = """Name,Type,Industry
421
CSV Account 1,Customer,Technology
422
CSV Account 2,Partner,Manufacturing
423
CSV Account 3,Customer,Healthcare"""
424
425
# Insert CSV data
426
csv_results = sf.bulk.Account.insert(csv_data)
427
428
# Read CSV file and process
429
with open('accounts.csv', 'r') as csvfile:
430
csv_content = csvfile.read()
431
432
bulk_results = sf.bulk.Account.insert(
433
csv_content,
434
batch_size=5000,
435
include_detailed_results=True
436
)
437
```
438
439
### Job Management and Monitoring
440
441
```python
442
# For long-running operations, use lazy mode
443
job_info = sf.bulk.Account.insert(
444
large_dataset,
445
lazy_operation=True # Returns job info instead of waiting
446
)
447
448
print(f"Started bulk job: {job_info['jobId']}")
449
450
# Monitor job progress (would need custom polling)
451
# This is handled automatically by default, but lazy mode gives you control
452
```
453
454
## Job Management Methods (Advanced)
455
456
Lower-level job management methods for advanced use cases and custom workflows.
457
458
```python { .api }
459
class SFBulkType:
460
def _create_job(self, operation, use_serial, external_id_field=None):
461
"""
462
Create a new bulk job (internal method).
463
464
Parameters:
465
- operation: Bulk operation type
466
- use_serial: Sequential vs parallel batch processing
467
- external_id_field: External ID field name (for upsert)
468
469
Returns:
470
dict: Job creation response with job ID
471
"""
472
473
def _close_job(self, job_id):
474
"""
475
Close a bulk job to stop accepting new batches.
476
477
Parameters:
478
- job_id: Bulk job identifier
479
480
Returns:
481
dict: Job status after closing
482
"""
483
484
def _get_job(self, job_id):
485
"""
486
Get current job status and information.
487
488
Parameters:
489
- job_id: Bulk job identifier
490
491
Returns:
492
dict: Complete job status and statistics
493
"""
494
495
def _add_batch(self, job_id, data, operation):
496
"""
497
Add a batch of records to an existing job.
498
499
Parameters:
500
- job_id: Bulk job identifier
501
- data: Record data for the batch
502
- operation: Operation type for data formatting
503
504
Returns:
505
dict: Batch creation response with batch ID
506
"""
507
508
def _get_batch(self, job_id, batch_id):
509
"""
510
Get batch status and processing information.
511
512
Parameters:
513
- job_id: Bulk job identifier
514
- batch_id: Batch identifier within the job
515
516
Returns:
517
dict: Batch status and statistics
518
"""
519
520
def _get_batch_results(self, job_id, batch_id, operation):
521
"""
522
Retrieve results for a completed batch.
523
524
Parameters:
525
- job_id: Bulk job identifier
526
- batch_id: Batch identifier
527
- operation: Operation type for result parsing
528
529
Returns:
530
list: Batch results with success/error details per record
531
"""
532
```
533
534
## Best Practices
535
536
### Data Preparation
537
538
```python
539
# Ensure data is properly formatted
540
def prepare_bulk_data(records):
541
"""Prepare records for bulk operations."""
542
prepared = []
543
for record in records:
544
# Remove None values
545
clean_record = {k: v for k, v in record.items() if v is not None}
546
547
# Ensure required fields are present
548
if 'Name' not in clean_record:
549
clean_record['Name'] = 'Default Name'
550
551
prepared.append(clean_record)
552
553
return prepared
554
555
# Use prepared data
556
clean_data = prepare_bulk_data(raw_data)
557
results = sf.bulk.Account.insert(clean_data)
558
```
559
560
### Error Handling and Retry Logic
561
562
```python
563
def bulk_insert_with_retry(bulk_type, data, max_retries=3):
564
"""Bulk insert with retry logic for failed records."""
565
566
for attempt in range(max_retries):
567
try:
568
results = bulk_type.insert(
569
data,
570
include_detailed_results=True
571
)
572
573
# Separate successful and failed records
574
failed_data = []
575
for i, result in enumerate(results):
576
if not result['success']:
577
failed_data.append(data[i])
578
print(f"Failed record: {result['error']}")
579
580
if not failed_data:
581
print(f"All records processed successfully on attempt {attempt + 1}")
582
return results
583
584
# Retry with failed records only
585
data = failed_data
586
print(f"Retrying {len(failed_data)} failed records...")
587
588
except Exception as e:
589
print(f"Attempt {attempt + 1} failed with error: {e}")
590
if attempt == max_retries - 1:
591
raise
592
593
return results
594
595
# Usage
596
results = bulk_insert_with_retry(sf.bulk.Account, account_data)
597
```
598
599
### Memory Management for Large Datasets
600
601
```python
602
def process_large_file(filename, bulk_type, chunk_size=10000):
603
"""Process large CSV files in chunks to manage memory."""
604
605
with open(filename, 'r') as file:
606
header = file.readline().strip().split(',')
607
chunk = []
608
609
for line_num, line in enumerate(file, 1):
610
values = line.strip().split(',')
611
record = dict(zip(header, values))
612
chunk.append(record)
613
614
if len(chunk) >= chunk_size:
615
# Process chunk
616
results = bulk_type.insert(chunk, bypass_results=True)
617
print(f"Processed chunk ending at line {line_num}")
618
chunk = []
619
620
# Process remaining records
621
if chunk:
622
results = bulk_type.insert(chunk, bypass_results=True)
623
print(f"Processed final chunk of {len(chunk)} records")
624
625
# Usage
626
process_large_file('massive_accounts.csv', sf.bulk.Account)
627
```