A basic Salesforce.com REST API client for Python applications.
—
Next-generation bulk operations with improved performance, simplified job management, and enhanced monitoring capabilities for modern high-volume data processing. Bulk API v2.0 provides streamlined job workflows, better error handling, and more efficient data transfer compared to the original Bulk API.
The main handler class for Bulk API v2.0 operations, providing modern bulk functionality with improved job management and monitoring.
class SFBulk2Handler:
def __init__(
self,
session_id,
bulk2_url,
proxies=None,
session=None
):
"""
Initialize Bulk API v2.0 handler.
Parameters:
- session_id: Authenticated Salesforce session ID
- bulk2_url: Bulk 2.0 API endpoint URL
- proxies: HTTP proxy configuration dictionary
- session: Optional custom requests.Session object
"""class SFBulk2Handler:
JSON_CONTENT_TYPE = "application/json"
CSV_CONTENT_TYPE = "text/csv"
DEFAULT_WAIT_TIMEOUT_SECONDS = 300
MAX_CHECK_INTERVAL_SECONDS = 30
DEFAULT_QUERY_PAGE_SIZE = 50000The SFBulk2Handler is accessed through the bulk2 property of the main Salesforce client:
from simple_salesforce import Salesforce
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
# Access Bulk 2.0 handler
bulk2_handler = sf.bulk2
# Bulk 2.0 uses direct method calls rather than object attributesComprehensive job lifecycle management with improved status monitoring and control.
class SFBulk2Handler:
def create_job(
self,
operation,
object_name=None,
external_id_field=None,
query=None
):
"""
Create a new Bulk 2.0 job.
Parameters:
- operation: Operation type (insert, upsert, update, delete, hard_delete, query, query_all)
- object_name: Salesforce SObject API name (required for DML operations)
- external_id_field: External ID field name (required for upsert)
- query: SOQL query string (required for query operations)
Returns:
dict: Job creation response with job ID and status
"""
def get_job(self, job_id, is_query):
"""
Get comprehensive job information and status.
Parameters:
- job_id: Bulk 2.0 job identifier
- is_query: True for query jobs, False for ingest jobs
Returns:
dict: Complete job information including progress and statistics
"""
def close_job(self, job_id):
"""
Close ingest job to begin processing uploaded data.
Parameters:
- job_id: Ingest job identifier
Returns:
dict: Job status after closing
"""
def abort_job(self, job_id, is_query):
"""
Abort running job to stop processing.
Parameters:
- job_id: Job identifier to abort
- is_query: True for query jobs, False for ingest jobs
Returns:
dict: Job status after aborting
"""
def delete_job(self, job_id, is_query):
"""
Delete completed job and its data.
Parameters:
- job_id: Job identifier to delete
- is_query: True for query jobs, False for ingest jobs
Returns:
dict: Deletion confirmation
"""
def wait_for_job(self, job_id, is_query, wait=0.5):
"""
Wait for job completion with intelligent polling.
Parameters:
- job_id: Job identifier to monitor
- is_query: True for query jobs, False for ingest jobs
- wait: Initial polling interval in seconds
Returns:
dict: Final job status when completed or failed
"""Efficient data upload and download operations with support for large datasets.
class SFBulk2Handler:
def upload_job_data(self, job_id, data, content_url=None):
"""
Upload data for ingest job processing.
Parameters:
- job_id: Ingest job identifier
- data: CSV data string or file-like object
- content_url: Optional alternative upload URL
Returns:
dict: Upload confirmation and status
"""
def get_query_results(
self,
job_id,
locator="",
max_records=None
):
"""
Get query results with pagination support.
Parameters:
- job_id: Query job identifier
- locator: Pagination locator for subsequent pages
- max_records: Maximum records per page (default: DEFAULT_QUERY_PAGE_SIZE)
Returns:
dict: Query results with data and pagination info
"""
def download_job_data(
self,
path,
job_id,
locator="",
max_records=None,
chunk_size=1024
):
"""
Download query results directly to file.
Parameters:
- path: Local file path for saving results
- job_id: Query job identifier
- locator: Pagination locator for specific page
- max_records: Maximum records per download
- chunk_size: File write chunk size in bytes
Returns:
dict: Download status and file information
"""Bulk API v2.0 operation types and job states for type-safe operations.
class Operation:
"""Enumeration of supported Bulk 2.0 operations."""
INSERT = "insert"
UPSERT = "upsert"
UPDATE = "update"
DELETE = "delete"
HARD_DELETE = "hard_delete"
QUERY = "query"
QUERY_ALL = "query_all"
class JobState:
"""Enumeration of Bulk 2.0 job states."""
OPEN = "open"
ABORTED = "aborted"
FAILED = "failed"
UPLOAD_COMPLETE = "upload_complete"
IN_PROGRESS = "in_progress"
JOB_COMPLETE = "job_complete"
class ColumnDelimiter:
"""CSV column delimiter options."""
COMMA = "COMMA"
TAB = "TAB"
PIPE = "PIPE"
SEMICOLON = "SEMICOLON"
CARET = "CARET"
class LineEnding:
"""CSV line ending options."""
LF = "LF" # Unix/Linux
CRLF = "CRLF" # Windows
class ResultsType:
"""Query result format types."""
CSV = "CSV"
JSON = "JSON"from simple_salesforce import Salesforce
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
# Prepare CSV data
csv_data = """Name,Type,Industry
Bulk2 Account 1,Customer,Technology
Bulk2 Account 2,Partner,Manufacturing
Bulk2 Account 3,Customer,Healthcare"""
# Create ingest job
job_response = sf.bulk2.create_job(
operation='insert',
object_name='Account'
)
job_id = job_response['id']
print(f"Created job: {job_id}")
# Upload data
sf.bulk2.upload_job_data(job_id, csv_data)
# Close job to start processing
sf.bulk2.close_job(job_id)
# Wait for completion
final_status = sf.bulk2.wait_for_job(job_id, is_query=False)
print(f"Job completed with state: {final_status['state']}")
print(f"Records processed: {final_status['numberRecordsProcessed']}")
print(f"Records failed: {final_status['numberRecordsFailed']}")# Create query job
query_job = sf.bulk2.create_job(
operation='query',
query="SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"
)
query_job_id = query_job['id']
# Wait for query completion
final_query_status = sf.bulk2.wait_for_job(query_job_id, is_query=True)
if final_query_status['state'] == 'JobComplete':
# Get query results
results = sf.bulk2.get_query_results(query_job_id)
# Process CSV results
csv_data = results['data']
lines = csv_data.strip().split('\n')
header = lines[0].split(',')
for line in lines[1:]:
values = line.split(',')
record = dict(zip(header, values))
print(f"Account: {record['Name']} - {record['Type']}")
# Download large results to file
sf.bulk2.download_job_data(
'/path/to/results.csv',
query_job_id,
max_records=100000
)# Prepare data with external ID
upsert_csv = """External_ID__c,Name,Type,Industry
EXT001,Upsert Account 1,Customer,Technology
EXT002,Upsert Account 2,Partner,Manufacturing
EXT003,Updated Account 3,Customer,Finance"""
# Create upsert job with external ID field
upsert_job = sf.bulk2.create_job(
operation='upsert',
object_name='Account',
external_id_field='External_ID__c'
)
upsert_job_id = upsert_job['id']
# Upload and process
sf.bulk2.upload_job_data(upsert_job_id, upsert_csv)
sf.bulk2.close_job(upsert_job_id)
# Monitor with custom polling
import time
while True:
status = sf.bulk2.get_job(upsert_job_id, is_query=False)
print(f"Job state: {status['state']}")
if status['state'] in ['JobComplete', 'Failed', 'Aborted']:
break
time.sleep(2)
# Check final results
final_status = sf.bulk2.get_job(upsert_job_id, is_query=False)
print(f"Created: {final_status['numberRecordsProcessed'] - final_status['numberRecordsFailed']}")
print(f"Failed: {final_status['numberRecordsFailed']}")def bulk2_insert_with_monitoring(sf, object_name, csv_data):
"""Bulk 2.0 insert with comprehensive error handling."""
try:
# Create job
job_response = sf.bulk2.create_job(
operation='insert',
object_name=object_name
)
job_id = job_response['id']
print(f"Created job {job_id} for {object_name}")
# Upload data
upload_response = sf.bulk2.upload_job_data(job_id, csv_data)
print("Data uploaded successfully")
# Close job
close_response = sf.bulk2.close_job(job_id)
print(f"Job closed, state: {close_response['state']}")
# Wait for completion with timeout
start_time = time.time()
timeout = 300 # 5 minutes
while True:
status = sf.bulk2.get_job(job_id, is_query=False)
elapsed = time.time() - start_time
print(f"Job state: {status['state']} (elapsed: {elapsed:.1f}s)")
if status['state'] == 'JobComplete':
print("Job completed successfully!")
return {
'job_id': job_id,
'success': True,
'processed': status['numberRecordsProcessed'],
'failed': status['numberRecordsFailed']
}
elif status['state'] in ['Failed', 'Aborted']:
print(f"Job failed with state: {status['state']}")
return {
'job_id': job_id,
'success': False,
'error': status.get('stateMessage', 'Unknown error')
}
elif elapsed > timeout:
# Abort timed-out job
sf.bulk2.abort_job(job_id, is_query=False)
raise TimeoutError(f"Job {job_id} timed out after {timeout} seconds")
time.sleep(5)
except Exception as e:
print(f"Error in bulk operation: {e}")
# Clean up job if possible
try:
sf.bulk2.abort_job(job_id, is_query=False)
except:
pass
raise
# Usage
result = bulk2_insert_with_monitoring(sf, 'Contact', contact_csv_data)
if result['success']:
print(f"Processed {result['processed']} records")
else:
print(f"Job failed: {result['error']}")def bulk2_query_all_pages(sf, query, output_dir):
"""Query large dataset with automatic pagination."""
# Create query job
query_job = sf.bulk2.create_job(operation='query', query=query)
job_id = query_job['id']
# Wait for completion
sf.bulk2.wait_for_job(job_id, is_query=True)
page_num = 1
locator = ""
all_records = []
while True:
# Get page of results
page_results = sf.bulk2.get_query_results(
job_id,
locator=locator,
max_records=50000
)
# Download page to file
page_file = f"{output_dir}/page_{page_num}.csv"
sf.bulk2.download_job_data(
page_file,
job_id,
locator=locator,
max_records=50000
)
print(f"Downloaded page {page_num} to {page_file}")
# Check for more pages
if 'nextRecordsUrl' not in page_results or not page_results['nextRecordsUrl']:
break
# Extract locator for next page
locator = page_results['nextRecordsUrl'].split('locator=')[1]
page_num += 1
print(f"Downloaded {page_num} pages of results")
return page_num
# Usage
pages_downloaded = bulk2_query_all_pages(
sf,
"SELECT Id, Name, Email FROM Contact WHERE CreatedDate = LAST_N_DAYS:30",
"/tmp/bulk_results"
)def manage_bulk2_jobs(sf):
"""Example of advanced job management operations."""
# Create multiple jobs
jobs = []
# Insert job
insert_job = sf.bulk2.create_job(operation='insert', object_name='Account')
jobs.append(('insert', insert_job['id']))
# Query job
query_job = sf.bulk2.create_job(
operation='query',
query="SELECT Id, Name FROM Account LIMIT 1000"
)
jobs.append(('query', query_job['id']))
# Monitor all jobs
for job_type, job_id in jobs:
is_query = (job_type == 'query')
# Get job status
status = sf.bulk2.get_job(job_id, is_query=is_query)
print(f"{job_type.title()} job {job_id}: {status['state']}")
# Abort if needed
if status['state'] == 'InProgress':
print(f"Aborting {job_type} job {job_id}")
sf.bulk2.abort_job(job_id, is_query=is_query)
# Delete completed jobs
elif status['state'] in ['JobComplete', 'Failed', 'Aborted']:
print(f"Deleting {job_type} job {job_id}")
sf.bulk2.delete_job(job_id, is_query=is_query)
# Usage
manage_bulk2_jobs(sf)Data processing utilities for Bulk API v2.0 operations.
class SFBulk2Handler:
def filter_null_bytes(self, b):
"""
Filter null bytes from strings or bytes objects.
Parameters:
- b: String or bytes object to filter
Returns:
str|bytes: Filtered content with null bytes removed
"""def prepare_csv_for_bulk2(records, field_mapping=None):
"""Prepare record data as CSV for Bulk 2.0 operations."""
if not records:
return ""
# Apply field mapping if provided
if field_mapping:
records = [
{field_mapping.get(k, k): v for k, v in record.items()}
for record in records
]
# Get headers from first record
headers = list(records[0].keys())
# Build CSV
csv_lines = [','.join(headers)]
for record in records:
values = []
for header in headers:
value = record.get(header, '')
# Handle CSV escaping
if isinstance(value, str):
if ',' in value or '"' in value or '\n' in value:
value = f'"{value.replace('"', '""')}"'
values.append(str(value))
csv_lines.append(','.join(values))
return '\n'.join(csv_lines)
# Usage
csv_data = prepare_csv_for_bulk2(
account_records,
field_mapping={'company_name': 'Name', 'account_type': 'Type'}
)def analyze_bulk2_results(sf, job_id):
"""Analyze Bulk 2.0 job results for errors and success rates."""
# Get final job status
job_status = sf.bulk2.get_job(job_id, is_query=False)
total_records = job_status['numberRecordsProcessed']
failed_records = job_status['numberRecordsFailed']
success_records = total_records - failed_records
success_rate = (success_records / total_records * 100) if total_records > 0 else 0
analysis = {
'job_id': job_id,
'total_records': total_records,
'successful_records': success_records,
'failed_records': failed_records,
'success_rate': f"{success_rate:.1f}%",
'job_state': job_status['state']
}
# Add error details if available
if 'stateMessage' in job_status:
analysis['error_message'] = job_status['stateMessage']
return analysis
# Usage
results = analyze_bulk2_results(sf, completed_job_id)
print(f"Job {results['job_id']}: {results['success_rate']} success rate")def optimize_bulk2_performance():
"""Best practices for Bulk 2.0 performance optimization."""
recommendations = {
'batch_sizing': 'Use up to 100MB per job (no artificial batch limits)',
'data_format': 'Use CSV format for better performance vs JSON',
'field_selection': 'Only include necessary fields in queries',
'parallel_jobs': 'Run multiple concurrent jobs for different objects',
'monitoring': 'Use wait_for_job() with appropriate polling intervals',
'cleanup': 'Delete completed jobs to avoid storage limits'
}
return recommendations
# Example of concurrent processing
import concurrent.futures
import threading
def process_object_bulk2(sf, object_name, data):
"""Process single object with Bulk 2.0."""
csv_data = prepare_csv_for_bulk2(data)
job = sf.bulk2.create_job(operation='insert', object_name=object_name)
sf.bulk2.upload_job_data(job['id'], csv_data)
sf.bulk2.close_job(job['id'])
return sf.bulk2.wait_for_job(job['id'], is_query=False)
# Process multiple objects concurrently
objects_data = {
'Account': account_records,
'Contact': contact_records,
'Opportunity': opportunity_records
}
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(process_object_bulk2, sf, obj_name, data): obj_name
for obj_name, data in objects_data.items()
}
for future in concurrent.futures.as_completed(futures):
obj_name = futures[future]
try:
result = future.result()
print(f"{obj_name}: {result['state']}")
except Exception as e:
print(f"{obj_name} failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-simple-salesforce