0
# Bulk API v2.0 Operations
1
2
Next-generation bulk operations with improved performance, simplified job management, and enhanced monitoring capabilities for modern high-volume data processing. Bulk API v2.0 provides streamlined job workflows, better error handling, and more efficient data transfer compared to the original Bulk API.
3
4
## SFBulk2Handler Class
5
6
The main handler class for Bulk API v2.0 operations, providing modern bulk functionality with improved job management and monitoring.
7
8
```python { .api }
9
class SFBulk2Handler:
10
def __init__(
11
self,
12
session_id,
13
bulk2_url,
14
proxies=None,
15
session=None
16
):
17
"""
18
Initialize Bulk API v2.0 handler.
19
20
Parameters:
21
- session_id: Authenticated Salesforce session ID
22
- bulk2_url: Bulk 2.0 API endpoint URL
23
- proxies: HTTP proxy configuration dictionary
24
- session: Optional custom requests.Session object
25
"""
26
```
27
28
### Constants and Configuration
29
30
```python { .api }
31
class SFBulk2Handler:
32
JSON_CONTENT_TYPE = "application/json"
33
CSV_CONTENT_TYPE = "text/csv"
34
DEFAULT_WAIT_TIMEOUT_SECONDS = 300
35
MAX_CHECK_INTERVAL_SECONDS = 30
36
DEFAULT_QUERY_PAGE_SIZE = 50000
37
```
38
39
### Accessing Bulk 2.0 Operations
40
41
The SFBulk2Handler is accessed through the `bulk2` property of the main Salesforce client:
42
43
```python
44
from simple_salesforce import Salesforce
45
46
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
47
48
# Access Bulk 2.0 handler
49
bulk2_handler = sf.bulk2
50
51
# Bulk 2.0 uses direct method calls rather than object attributes
52
```
53
54
## Job Management
55
56
Comprehensive job lifecycle management with improved status monitoring and control.
57
58
```python { .api }
59
class SFBulk2Handler:
60
def create_job(
61
self,
62
operation,
63
object_name=None,
64
external_id_field=None,
65
query=None
66
):
67
"""
68
Create a new Bulk 2.0 job.
69
70
Parameters:
71
- operation: Operation type (insert, upsert, update, delete, hard_delete, query, query_all)
72
- object_name: Salesforce SObject API name (required for DML operations)
73
- external_id_field: External ID field name (required for upsert)
74
- query: SOQL query string (required for query operations)
75
76
Returns:
77
dict: Job creation response with job ID and status
78
"""
79
80
def get_job(self, job_id, is_query):
81
"""
82
Get comprehensive job information and status.
83
84
Parameters:
85
- job_id: Bulk 2.0 job identifier
86
- is_query: True for query jobs, False for ingest jobs
87
88
Returns:
89
dict: Complete job information including progress and statistics
90
"""
91
92
def close_job(self, job_id):
93
"""
94
Close ingest job to begin processing uploaded data.
95
96
Parameters:
97
- job_id: Ingest job identifier
98
99
Returns:
100
dict: Job status after closing
101
"""
102
103
def abort_job(self, job_id, is_query):
104
"""
105
Abort running job to stop processing.
106
107
Parameters:
108
- job_id: Job identifier to abort
109
- is_query: True for query jobs, False for ingest jobs
110
111
Returns:
112
dict: Job status after aborting
113
"""
114
115
def delete_job(self, job_id, is_query):
116
"""
117
Delete completed job and its data.
118
119
Parameters:
120
- job_id: Job identifier to delete
121
- is_query: True for query jobs, False for ingest jobs
122
123
Returns:
124
dict: Deletion confirmation
125
"""
126
127
def wait_for_job(self, job_id, is_query, wait=0.5):
128
"""
129
Wait for job completion with intelligent polling.
130
131
Parameters:
132
- job_id: Job identifier to monitor
133
- is_query: True for query jobs, False for ingest jobs
134
- wait: Initial polling interval in seconds
135
136
Returns:
137
dict: Final job status when completed or failed
138
"""
139
```
140
141
## Data Operations
142
143
Efficient data upload and download operations with support for large datasets.
144
145
```python { .api }
146
class SFBulk2Handler:
147
def upload_job_data(self, job_id, data, content_url=None):
148
"""
149
Upload data for ingest job processing.
150
151
Parameters:
152
- job_id: Ingest job identifier
153
- data: CSV data string or file-like object
154
- content_url: Optional alternative upload URL
155
156
Returns:
157
dict: Upload confirmation and status
158
"""
159
160
def get_query_results(
161
self,
162
job_id,
163
locator="",
164
max_records=None
165
):
166
"""
167
Get query results with pagination support.
168
169
Parameters:
170
- job_id: Query job identifier
171
- locator: Pagination locator for subsequent pages
172
- max_records: Maximum records per page (default: DEFAULT_QUERY_PAGE_SIZE)
173
174
Returns:
175
dict: Query results with data and pagination info
176
"""
177
178
def download_job_data(
179
self,
180
path,
181
job_id,
182
locator="",
183
max_records=None,
184
chunk_size=1024
185
):
186
"""
187
Download query results directly to file.
188
189
Parameters:
190
- path: Local file path for saving results
191
- job_id: Query job identifier
192
- locator: Pagination locator for specific page
193
- max_records: Maximum records per download
194
- chunk_size: File write chunk size in bytes
195
196
Returns:
197
dict: Download status and file information
198
"""
199
```
200
201
## Enums and Types
202
203
Bulk API v2.0 operation types and job states for type-safe operations.
204
205
```python { .api }
206
class Operation:
207
"""Enumeration of supported Bulk 2.0 operations."""
208
INSERT = "insert"
209
UPSERT = "upsert"
210
UPDATE = "update"
211
DELETE = "delete"
212
HARD_DELETE = "hard_delete"
213
QUERY = "query"
214
QUERY_ALL = "query_all"
215
216
class JobState:
217
"""Enumeration of Bulk 2.0 job states."""
218
OPEN = "open"
219
ABORTED = "aborted"
220
FAILED = "failed"
221
UPLOAD_COMPLETE = "upload_complete"
222
IN_PROGRESS = "in_progress"
223
JOB_COMPLETE = "job_complete"
224
225
class ColumnDelimiter:
226
"""CSV column delimiter options."""
227
COMMA = "COMMA"
228
TAB = "TAB"
229
PIPE = "PIPE"
230
SEMICOLON = "SEMICOLON"
231
CARET = "CARET"
232
233
class LineEnding:
234
"""CSV line ending options."""
235
LF = "LF" # Unix/Linux
236
CRLF = "CRLF" # Windows
237
238
class ResultsType:
239
"""Query result format types."""
240
CSV = "CSV"
241
JSON = "JSON"
242
```
243
244
## Usage Examples
245
246
### Basic Bulk 2.0 Insert
247
248
```python
249
from simple_salesforce import Salesforce
250
251
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
252
253
# Prepare CSV data
254
csv_data = """Name,Type,Industry
255
Bulk2 Account 1,Customer,Technology
256
Bulk2 Account 2,Partner,Manufacturing
257
Bulk2 Account 3,Customer,Healthcare"""
258
259
# Create ingest job
260
job_response = sf.bulk2.create_job(
261
operation='insert',
262
object_name='Account'
263
)
264
job_id = job_response['id']
265
print(f"Created job: {job_id}")
266
267
# Upload data
268
sf.bulk2.upload_job_data(job_id, csv_data)
269
270
# Close job to start processing
271
sf.bulk2.close_job(job_id)
272
273
# Wait for completion
274
final_status = sf.bulk2.wait_for_job(job_id, is_query=False)
275
276
print(f"Job completed with state: {final_status['state']}")
277
print(f"Records processed: {final_status['numberRecordsProcessed']}")
278
print(f"Records failed: {final_status['numberRecordsFailed']}")
279
```
280
281
### Bulk 2.0 Query Operations
282
283
```python
284
# Create query job
285
query_job = sf.bulk2.create_job(
286
operation='query',
287
query="SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"
288
)
289
query_job_id = query_job['id']
290
291
# Wait for query completion
292
final_query_status = sf.bulk2.wait_for_job(query_job_id, is_query=True)
293
294
if final_query_status['state'] == 'JobComplete':
295
# Get query results
296
results = sf.bulk2.get_query_results(query_job_id)
297
298
# Process CSV results
299
csv_data = results['data']
300
lines = csv_data.strip().split('\n')
301
header = lines[0].split(',')
302
303
for line in lines[1:]:
304
values = line.split(',')
305
record = dict(zip(header, values))
306
print(f"Account: {record['Name']} - {record['Type']}")
307
308
# Download large results to file
309
sf.bulk2.download_job_data(
310
'/path/to/results.csv',
311
query_job_id,
312
max_records=100000
313
)
314
```
315
316
### Advanced Bulk 2.0 Upsert
317
318
```python
319
# Prepare data with external ID
320
upsert_csv = """External_ID__c,Name,Type,Industry
321
EXT001,Upsert Account 1,Customer,Technology
322
EXT002,Upsert Account 2,Partner,Manufacturing
323
EXT003,Updated Account 3,Customer,Finance"""
324
325
# Create upsert job with external ID field
326
upsert_job = sf.bulk2.create_job(
327
operation='upsert',
328
object_name='Account',
329
external_id_field='External_ID__c'
330
)
331
upsert_job_id = upsert_job['id']
332
333
# Upload and process
334
sf.bulk2.upload_job_data(upsert_job_id, upsert_csv)
335
sf.bulk2.close_job(upsert_job_id)
336
337
# Monitor with custom polling
338
import time
339
340
while True:
341
status = sf.bulk2.get_job(upsert_job_id, is_query=False)
342
print(f"Job state: {status['state']}")
343
344
if status['state'] in ['JobComplete', 'Failed', 'Aborted']:
345
break
346
347
time.sleep(2)
348
349
# Check final results
350
final_status = sf.bulk2.get_job(upsert_job_id, is_query=False)
351
print(f"Created: {final_status['numberRecordsProcessed'] - final_status['numberRecordsFailed']}")
352
print(f"Failed: {final_status['numberRecordsFailed']}")
353
```
354
355
### Bulk 2.0 with Error Handling
356
357
```python
358
def bulk2_insert_with_monitoring(sf, object_name, csv_data):
359
"""Bulk 2.0 insert with comprehensive error handling."""
360
361
try:
362
# Create job
363
job_response = sf.bulk2.create_job(
364
operation='insert',
365
object_name=object_name
366
)
367
job_id = job_response['id']
368
369
print(f"Created job {job_id} for {object_name}")
370
371
# Upload data
372
upload_response = sf.bulk2.upload_job_data(job_id, csv_data)
373
print("Data uploaded successfully")
374
375
# Close job
376
close_response = sf.bulk2.close_job(job_id)
377
print(f"Job closed, state: {close_response['state']}")
378
379
# Wait for completion with timeout
380
start_time = time.time()
381
timeout = 300 # 5 minutes
382
383
while True:
384
status = sf.bulk2.get_job(job_id, is_query=False)
385
elapsed = time.time() - start_time
386
387
print(f"Job state: {status['state']} (elapsed: {elapsed:.1f}s)")
388
389
if status['state'] == 'JobComplete':
390
print("Job completed successfully!")
391
return {
392
'job_id': job_id,
393
'success': True,
394
'processed': status['numberRecordsProcessed'],
395
'failed': status['numberRecordsFailed']
396
}
397
elif status['state'] in ['Failed', 'Aborted']:
398
print(f"Job failed with state: {status['state']}")
399
return {
400
'job_id': job_id,
401
'success': False,
402
'error': status.get('stateMessage', 'Unknown error')
403
}
404
elif elapsed > timeout:
405
# Abort timed-out job
406
sf.bulk2.abort_job(job_id, is_query=False)
407
raise TimeoutError(f"Job {job_id} timed out after {timeout} seconds")
408
409
time.sleep(5)
410
411
except Exception as e:
412
print(f"Error in bulk operation: {e}")
413
# Clean up job if possible
414
try:
415
sf.bulk2.abort_job(job_id, is_query=False)
416
except:
417
pass
418
raise
419
420
# Usage
421
result = bulk2_insert_with_monitoring(sf, 'Contact', contact_csv_data)
422
if result['success']:
423
print(f"Processed {result['processed']} records")
424
else:
425
print(f"Job failed: {result['error']}")
426
```
427
428
### Large Dataset Query with Pagination
429
430
```python
431
def bulk2_query_all_pages(sf, query, output_dir):
432
"""Query large dataset with automatic pagination."""
433
434
# Create query job
435
query_job = sf.bulk2.create_job(operation='query', query=query)
436
job_id = query_job['id']
437
438
# Wait for completion
439
sf.bulk2.wait_for_job(job_id, is_query=True)
440
441
page_num = 1
442
locator = ""
443
all_records = []
444
445
while True:
446
# Get page of results
447
page_results = sf.bulk2.get_query_results(
448
job_id,
449
locator=locator,
450
max_records=50000
451
)
452
453
# Download page to file
454
page_file = f"{output_dir}/page_{page_num}.csv"
455
sf.bulk2.download_job_data(
456
page_file,
457
job_id,
458
locator=locator,
459
max_records=50000
460
)
461
462
print(f"Downloaded page {page_num} to {page_file}")
463
464
# Check for more pages
465
if 'nextRecordsUrl' not in page_results or not page_results['nextRecordsUrl']:
466
break
467
468
# Extract locator for next page
469
locator = page_results['nextRecordsUrl'].split('locator=')[1]
470
page_num += 1
471
472
print(f"Downloaded {page_num} pages of results")
473
return page_num
474
475
# Usage
476
pages_downloaded = bulk2_query_all_pages(
477
sf,
478
"SELECT Id, Name, Email FROM Contact WHERE CreatedDate = LAST_N_DAYS:30",
479
"/tmp/bulk_results"
480
)
481
```
482
483
### Bulk 2.0 Job Management
484
485
```python
486
def manage_bulk2_jobs(sf):
487
"""Example of advanced job management operations."""
488
489
# Create multiple jobs
490
jobs = []
491
492
# Insert job
493
insert_job = sf.bulk2.create_job(operation='insert', object_name='Account')
494
jobs.append(('insert', insert_job['id']))
495
496
# Query job
497
query_job = sf.bulk2.create_job(
498
operation='query',
499
query="SELECT Id, Name FROM Account LIMIT 1000"
500
)
501
jobs.append(('query', query_job['id']))
502
503
# Monitor all jobs
504
for job_type, job_id in jobs:
505
is_query = (job_type == 'query')
506
507
# Get job status
508
status = sf.bulk2.get_job(job_id, is_query=is_query)
509
print(f"{job_type.title()} job {job_id}: {status['state']}")
510
511
# Abort if needed
512
if status['state'] == 'InProgress':
513
print(f"Aborting {job_type} job {job_id}")
514
sf.bulk2.abort_job(job_id, is_query=is_query)
515
516
# Delete completed jobs
517
elif status['state'] in ['JobComplete', 'Failed', 'Aborted']:
518
print(f"Deleting {job_type} job {job_id}")
519
sf.bulk2.delete_job(job_id, is_query=is_query)
520
521
# Usage
522
manage_bulk2_jobs(sf)
523
```
524
525
## Utility Functions
526
527
Data processing utilities for Bulk API v2.0 operations.
528
529
```python { .api }
530
class SFBulk2Handler:
531
def filter_null_bytes(self, b):
532
"""
533
Filter null bytes from strings or bytes objects.
534
535
Parameters:
536
- b: String or bytes object to filter
537
538
Returns:
539
str|bytes: Filtered content with null bytes removed
540
"""
541
```
542
543
## Best Practices
544
545
### Data Preparation for Bulk 2.0
546
547
```python
548
def prepare_csv_for_bulk2(records, field_mapping=None):
549
"""Prepare record data as CSV for Bulk 2.0 operations."""
550
551
if not records:
552
return ""
553
554
# Apply field mapping if provided
555
if field_mapping:
556
records = [
557
{field_mapping.get(k, k): v for k, v in record.items()}
558
for record in records
559
]
560
561
# Get headers from first record
562
headers = list(records[0].keys())
563
564
# Build CSV
565
csv_lines = [','.join(headers)]
566
567
for record in records:
568
values = []
569
for header in headers:
570
value = record.get(header, '')
571
572
# Handle CSV escaping
573
if isinstance(value, str):
574
if ',' in value or '"' in value or '\n' in value:
575
value = f'"{value.replace('"', '""')}"'
576
577
values.append(str(value))
578
579
csv_lines.append(','.join(values))
580
581
return '\n'.join(csv_lines)
582
583
# Usage
584
csv_data = prepare_csv_for_bulk2(
585
account_records,
586
field_mapping={'company_name': 'Name', 'account_type': 'Type'}
587
)
588
```
589
590
### Error Analysis
591
592
```python
593
def analyze_bulk2_results(sf, job_id):
594
"""Analyze Bulk 2.0 job results for errors and success rates."""
595
596
# Get final job status
597
job_status = sf.bulk2.get_job(job_id, is_query=False)
598
599
total_records = job_status['numberRecordsProcessed']
600
failed_records = job_status['numberRecordsFailed']
601
success_records = total_records - failed_records
602
603
success_rate = (success_records / total_records * 100) if total_records > 0 else 0
604
605
analysis = {
606
'job_id': job_id,
607
'total_records': total_records,
608
'successful_records': success_records,
609
'failed_records': failed_records,
610
'success_rate': f"{success_rate:.1f}%",
611
'job_state': job_status['state']
612
}
613
614
# Add error details if available
615
if 'stateMessage' in job_status:
616
analysis['error_message'] = job_status['stateMessage']
617
618
return analysis
619
620
# Usage
621
results = analyze_bulk2_results(sf, completed_job_id)
622
print(f"Job {results['job_id']}: {results['success_rate']} success rate")
623
```
624
625
### Performance Optimization
626
627
```python
628
def optimize_bulk2_performance():
629
"""Best practices for Bulk 2.0 performance optimization."""
630
631
recommendations = {
632
'batch_sizing': 'Use up to 100MB per job (no artificial batch limits)',
633
'data_format': 'Use CSV format for better performance vs JSON',
634
'field_selection': 'Only include necessary fields in queries',
635
'parallel_jobs': 'Run multiple concurrent jobs for different objects',
636
'monitoring': 'Use wait_for_job() with appropriate polling intervals',
637
'cleanup': 'Delete completed jobs to avoid storage limits'
638
}
639
640
return recommendations
641
642
# Example of concurrent processing
643
import concurrent.futures
644
import threading
645
646
def process_object_bulk2(sf, object_name, data):
647
"""Process single object with Bulk 2.0."""
648
csv_data = prepare_csv_for_bulk2(data)
649
650
job = sf.bulk2.create_job(operation='insert', object_name=object_name)
651
sf.bulk2.upload_job_data(job['id'], csv_data)
652
sf.bulk2.close_job(job['id'])
653
654
return sf.bulk2.wait_for_job(job['id'], is_query=False)
655
656
# Process multiple objects concurrently
657
objects_data = {
658
'Account': account_records,
659
'Contact': contact_records,
660
'Opportunity': opportunity_records
661
}
662
663
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
664
futures = {
665
executor.submit(process_object_bulk2, sf, obj_name, data): obj_name
666
for obj_name, data in objects_data.items()
667
}
668
669
for future in concurrent.futures.as_completed(futures):
670
obj_name = futures[future]
671
try:
672
result = future.result()
673
print(f"{obj_name}: {result['state']}")
674
except Exception as e:
675
print(f"{obj_name} failed: {e}")
676
```