A basic Salesforce.com REST API client for Python applications.
—
High-performance bulk operations for large-scale data manipulation using Salesforce's original Bulk API. This interface supports insert, update, upsert, delete, and query operations with automatic batching for processing thousands to millions of records efficiently.
The main handler class for Bulk API v1.0 operations, providing access to bulk functionality and managing job lifecycles.
class SFBulkHandler:
def __init__(
self,
session_id,
bulk_url,
proxies=None,
session=None
):
"""
Initialize Bulk API v1.0 handler.
Parameters:
- session_id: Authenticated Salesforce session ID
- bulk_url: Bulk API endpoint URL
- proxies: HTTP proxy configuration dictionary
- session: Optional custom requests.Session object
"""The SFBulkHandler is accessed through the bulk property of the main Salesforce client:
from simple_salesforce import Salesforce
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
# Access bulk handler
bulk_handler = sf.bulk
# Access specific object types for bulk operations
bulk_accounts = bulk_handler.Account
bulk_contacts = bulk_handler.Contact
bulk_custom = bulk_handler.MyCustomObject__cHigh-level method for any bulk DML operation with automatic job and batch management.
class SFBulkHandler:
def submit_dml(
self,
object_name,
dml,
data,
external_id_field=None,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Submit any DML operation for bulk processing.
Parameters:
- object_name: Salesforce SObject API name
- dml: DML operation ('insert', 'update', 'upsert', 'delete', 'hard_delete')
- data: List of record dictionaries or CSV string
- external_id_field: External ID field name (required for upsert)
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially vs parallel
- bypass_results: Skip downloading results for faster processing
- include_detailed_results: Include detailed success/error info
Returns:
list: Results from all batches, containing success/error details per record
"""Interface for Bulk API v1.0 operations on specific SObject types, providing convenient methods for each DML operation type.
class SFBulkType:
def __init__(
self,
object_name,
bulk_url,
headers,
session
):
"""
Initialize bulk operations for specific SObject type.
Parameters:
- object_name: Salesforce SObject API name
- bulk_url: Bulk API endpoint URL
- headers: HTTP headers for authentication
- session: requests.Session object
"""All standard DML operations with consistent parameter interface and automatic batching.
class SFBulkType:
def insert(
self,
data,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Bulk insert records.
Parameters:
- data: List of record dictionaries or CSV string
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially
- bypass_results: Skip downloading results
- include_detailed_results: Include detailed success/error info
Returns:
list: Insert results with record IDs and success status
"""
def update(
self,
data,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Bulk update records (requires Id field in data).
Parameters:
- data: List of record dictionaries with Id field or CSV string
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially
- bypass_results: Skip downloading results
- include_detailed_results: Include detailed success/error info
Returns:
list: Update results with success status per record
"""
def upsert(
self,
data,
external_id_field,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Bulk upsert records using external ID field.
Parameters:
- data: List of record dictionaries or CSV string
- external_id_field: External ID field API name for matching
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially
- bypass_results: Skip downloading results
- include_detailed_results: Include detailed success/error info
Returns:
list: Upsert results with created/updated status per record
"""
def delete(
self,
data,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Bulk soft delete records (requires Id field in data).
Parameters:
- data: List of record dictionaries with Id field or CSV string
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially
- bypass_results: Skip downloading results
- include_detailed_results: Include detailed success/error info
Returns:
list: Delete results with success status per record
"""
def hard_delete(
self,
data,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Bulk hard delete records (permanently removes from Recycle Bin).
Parameters:
- data: List of record dictionaries with Id field or CSV string
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially
- bypass_results: Skip downloading results
- include_detailed_results: Include detailed success/error info
Returns:
list: Hard delete results with success status per record
"""Bulk query capabilities for retrieving large datasets efficiently.
class SFBulkType:
def query(self, data, lazy_operation=False, wait=5):
"""
Execute bulk query to retrieve large datasets.
Parameters:
- data: SOQL query string
- lazy_operation: Return job info instead of waiting for completion
- wait: Polling interval in seconds for job completion
Returns:
list|dict: Query results or job information if lazy_operation=True
"""
def query_all(self, data, lazy_operation=False, wait=5):
"""
Execute bulk queryAll to include deleted and archived records.
Parameters:
- data: SOQL query string
- lazy_operation: Return job info instead of waiting for completion
- wait: Polling interval in seconds for job completion
Returns:
list|dict: Query results including deleted records or job info
"""Flexible method for any DML operation type.
class SFBulkType:
def submit_dml(
self,
function_name,
data,
external_id_field=None,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False
):
"""
Submit generic DML operation for this SObject type.
Parameters:
- function_name: DML operation name ('insert', 'update', etc.)
- data: List of record dictionaries or CSV string
- external_id_field: External ID field (for upsert operations)
- batch_size: Records per batch (max 10,000)
- use_serial: Process batches sequentially
- bypass_results: Skip downloading results
- include_detailed_results: Include detailed success/error info
Returns:
list: Operation results with success/error details per record
"""from simple_salesforce import Salesforce
sf = Salesforce(username='user@example.com', password='pass', security_token='token')
# Prepare data for bulk insert
accounts_data = [
{'Name': 'Bulk Account 1', 'Type': 'Customer', 'Industry': 'Technology'},
{'Name': 'Bulk Account 2', 'Type': 'Partner', 'Industry': 'Manufacturing'},
{'Name': 'Bulk Account 3', 'Type': 'Customer', 'Industry': 'Healthcare'}
# ... up to 10,000 records per batch
]
# Execute bulk insert
insert_results = sf.bulk.Account.insert(accounts_data)
# Process results
for i, result in enumerate(insert_results):
if result['success']:
print(f"Account {i+1} created with ID: {result['id']}")
else:
print(f"Account {i+1} failed: {result['error']}")# Prepare update data (must include Id field)
update_data = [
{'Id': '001XX000003DHPr', 'Phone': '555-123-4567'},
{'Id': '001XX000003DHPs', 'Phone': '555-234-5678'},
{'Id': '001XX000003DHPt', 'Phone': '555-345-6789'}
]
try:
update_results = sf.bulk.Account.update(
update_data,
batch_size=5000,
include_detailed_results=True
)
success_count = sum(1 for r in update_results if r['success'])
error_count = len(update_results) - success_count
print(f"Updated {success_count} records successfully")
print(f"Failed to update {error_count} records")
# Handle errors
for result in update_results:
if not result['success']:
print(f"Error updating {result['id']}: {result['error']}")
except Exception as e:
print(f"Bulk update failed: {e}")# Data with external ID field
upsert_data = [
{'External_ID__c': 'EXT001', 'Name': 'Upsert Account 1', 'Type': 'Customer'},
{'External_ID__c': 'EXT002', 'Name': 'Upsert Account 2', 'Type': 'Partner'},
{'External_ID__c': 'EXT003', 'Name': 'Updated Account 3', 'Industry': 'Technology'}
]
# Execute upsert using external ID field
upsert_results = sf.bulk.Account.upsert(
upsert_data,
external_id_field='External_ID__c',
batch_size=1000
)
# Check created vs updated records
for result in upsert_results:
if result['success']:
action = 'Created' if result['created'] else 'Updated'
print(f"{action} record ID: {result['id']}")# Query large dataset using bulk API
query = "SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"
query_results = sf.bulk.Account.query(query)
print(f"Retrieved {len(query_results)} accounts")
for record in query_results[:10]: # Show first 10
print(f"Account: {record['Name']} - {record['Type']}")
# Query including deleted records
deleted_query = "SELECT Id, Name FROM Account WHERE IsDeleted = true"
deleted_results = sf.bulk.Account.query_all(deleted_query)
print(f"Found {len(deleted_results)} deleted accounts")# Large dataset with performance optimizations
large_dataset = generate_large_dataset(50000) # 50K records
# Use larger batches for better throughput
results = sf.bulk.Contact.insert(
large_dataset,
batch_size=10000, # Maximum batch size
use_serial=False, # Parallel processing
bypass_results=True # Skip result download for speed
)
# For operations where you need results but want speed
results = sf.bulk.Contact.insert(
large_dataset,
batch_size=10000,
use_serial=False,
bypass_results=False,
include_detailed_results=False # Less detail = faster processing
)# Work with CSV data directly
csv_data = """Name,Type,Industry
CSV Account 1,Customer,Technology
CSV Account 2,Partner,Manufacturing
CSV Account 3,Customer,Healthcare"""
# Insert CSV data
csv_results = sf.bulk.Account.insert(csv_data)
# Read CSV file and process
with open('accounts.csv', 'r') as csvfile:
csv_content = csvfile.read()
bulk_results = sf.bulk.Account.insert(
csv_content,
batch_size=5000,
include_detailed_results=True
)# For long-running operations, use lazy mode
job_info = sf.bulk.Account.insert(
large_dataset,
lazy_operation=True # Returns job info instead of waiting
)
print(f"Started bulk job: {job_info['jobId']}")
# Monitor job progress (would need custom polling)
# This is handled automatically by default, but lazy mode gives you controlLower-level job management methods for advanced use cases and custom workflows.
class SFBulkType:
def _create_job(self, operation, use_serial, external_id_field=None):
"""
Create a new bulk job (internal method).
Parameters:
- operation: Bulk operation type
- use_serial: Sequential vs parallel batch processing
- external_id_field: External ID field name (for upsert)
Returns:
dict: Job creation response with job ID
"""
def _close_job(self, job_id):
"""
Close a bulk job to stop accepting new batches.
Parameters:
- job_id: Bulk job identifier
Returns:
dict: Job status after closing
"""
def _get_job(self, job_id):
"""
Get current job status and information.
Parameters:
- job_id: Bulk job identifier
Returns:
dict: Complete job status and statistics
"""
def _add_batch(self, job_id, data, operation):
"""
Add a batch of records to an existing job.
Parameters:
- job_id: Bulk job identifier
- data: Record data for the batch
- operation: Operation type for data formatting
Returns:
dict: Batch creation response with batch ID
"""
def _get_batch(self, job_id, batch_id):
"""
Get batch status and processing information.
Parameters:
- job_id: Bulk job identifier
- batch_id: Batch identifier within the job
Returns:
dict: Batch status and statistics
"""
def _get_batch_results(self, job_id, batch_id, operation):
"""
Retrieve results for a completed batch.
Parameters:
- job_id: Bulk job identifier
- batch_id: Batch identifier
- operation: Operation type for result parsing
Returns:
list: Batch results with success/error details per record
"""# Ensure data is properly formatted
def prepare_bulk_data(records):
"""Prepare records for bulk operations."""
prepared = []
for record in records:
# Remove None values
clean_record = {k: v for k, v in record.items() if v is not None}
# Ensure required fields are present
if 'Name' not in clean_record:
clean_record['Name'] = 'Default Name'
prepared.append(clean_record)
return prepared
# Use prepared data
clean_data = prepare_bulk_data(raw_data)
results = sf.bulk.Account.insert(clean_data)def bulk_insert_with_retry(bulk_type, data, max_retries=3):
"""Bulk insert with retry logic for failed records."""
for attempt in range(max_retries):
try:
results = bulk_type.insert(
data,
include_detailed_results=True
)
# Separate successful and failed records
failed_data = []
for i, result in enumerate(results):
if not result['success']:
failed_data.append(data[i])
print(f"Failed record: {result['error']}")
if not failed_data:
print(f"All records processed successfully on attempt {attempt + 1}")
return results
# Retry with failed records only
data = failed_data
print(f"Retrying {len(failed_data)} failed records...")
except Exception as e:
print(f"Attempt {attempt + 1} failed with error: {e}")
if attempt == max_retries - 1:
raise
return results
# Usage
results = bulk_insert_with_retry(sf.bulk.Account, account_data)def process_large_file(filename, bulk_type, chunk_size=10000):
"""Process large CSV files in chunks to manage memory."""
with open(filename, 'r') as file:
header = file.readline().strip().split(',')
chunk = []
for line_num, line in enumerate(file, 1):
values = line.strip().split(',')
record = dict(zip(header, values))
chunk.append(record)
if len(chunk) >= chunk_size:
# Process chunk
results = bulk_type.insert(chunk, bypass_results=True)
print(f"Processed chunk ending at line {line_num}")
chunk = []
# Process remaining records
if chunk:
results = bulk_type.insert(chunk, bypass_results=True)
print(f"Processed final chunk of {len(chunk)} records")
# Usage
process_large_file('massive_accounts.csv', sf.bulk.Account)Install with Tessl CLI
npx tessl i tessl/pypi-simple-salesforce