CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-simple-salesforce

A basic Salesforce.com REST API client for Python applications.

Pending
Overview
Eval results
Files

bulk-operations.mddocs/

Bulk API v1.0 Operations

High-performance bulk operations for large-scale data manipulation using Salesforce's original Bulk API. This interface supports insert, update, upsert, delete, and query operations with automatic batching for processing thousands to millions of records efficiently.

SFBulkHandler Class

The main handler class for Bulk API v1.0 operations, providing access to bulk functionality and managing job lifecycles.

class SFBulkHandler:
    def __init__(
        self,
        session_id,
        bulk_url,
        proxies=None,
        session=None
    ):
        """
        Initialize Bulk API v1.0 handler.

        Parameters:
        - session_id: Authenticated Salesforce session ID
        - bulk_url: Bulk API endpoint URL
        - proxies: HTTP proxy configuration dictionary
        - session: Optional custom requests.Session object
        """

Accessing Bulk Operations

The SFBulkHandler is accessed through the bulk property of the main Salesforce client:

from simple_salesforce import Salesforce

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

# Access bulk handler
bulk_handler = sf.bulk

# Access specific object types for bulk operations  
bulk_accounts = bulk_handler.Account
bulk_contacts = bulk_handler.Contact
bulk_custom = bulk_handler.MyCustomObject__c

Generic DML Operations

High-level method for any bulk DML operation with automatic job and batch management.

class SFBulkHandler:
    def submit_dml(
        self,
        object_name,
        dml,
        data,
        external_id_field=None,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Submit any DML operation for bulk processing.

        Parameters:
        - object_name: Salesforce SObject API name
        - dml: DML operation ('insert', 'update', 'upsert', 'delete', 'hard_delete')
        - data: List of record dictionaries or CSV string
        - external_id_field: External ID field name (required for upsert)
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially vs parallel
        - bypass_results: Skip downloading results for faster processing
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Results from all batches, containing success/error details per record
        """

SFBulkType Class

Interface for Bulk API v1.0 operations on specific SObject types, providing convenient methods for each DML operation type.

class SFBulkType:
    def __init__(
        self,
        object_name,
        bulk_url,
        headers,
        session
    ):
        """
        Initialize bulk operations for specific SObject type.

        Parameters:
        - object_name: Salesforce SObject API name  
        - bulk_url: Bulk API endpoint URL
        - headers: HTTP headers for authentication
        - session: requests.Session object
        """

DML Operations

All standard DML operations with consistent parameter interface and automatic batching.

class SFBulkType:
    def insert(
        self,
        data,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Bulk insert records.

        Parameters:
        - data: List of record dictionaries or CSV string  
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially
        - bypass_results: Skip downloading results
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Insert results with record IDs and success status
        """

    def update(
        self,
        data,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Bulk update records (requires Id field in data).

        Parameters:
        - data: List of record dictionaries with Id field or CSV string
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially  
        - bypass_results: Skip downloading results
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Update results with success status per record
        """

    def upsert(
        self,
        data,
        external_id_field,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Bulk upsert records using external ID field.

        Parameters:
        - data: List of record dictionaries or CSV string
        - external_id_field: External ID field API name for matching
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially
        - bypass_results: Skip downloading results  
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Upsert results with created/updated status per record
        """

    def delete(
        self,
        data,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Bulk soft delete records (requires Id field in data).

        Parameters:
        - data: List of record dictionaries with Id field or CSV string
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially
        - bypass_results: Skip downloading results
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Delete results with success status per record
        """

    def hard_delete(
        self,
        data,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Bulk hard delete records (permanently removes from Recycle Bin).

        Parameters:
        - data: List of record dictionaries with Id field or CSV string
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially
        - bypass_results: Skip downloading results
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Hard delete results with success status per record
        """

Query Operations

Bulk query capabilities for retrieving large datasets efficiently.

class SFBulkType:
    def query(self, data, lazy_operation=False, wait=5):
        """
        Execute bulk query to retrieve large datasets.

        Parameters:
        - data: SOQL query string
        - lazy_operation: Return job info instead of waiting for completion
        - wait: Polling interval in seconds for job completion

        Returns:
        list|dict: Query results or job information if lazy_operation=True
        """

    def query_all(self, data, lazy_operation=False, wait=5):
        """
        Execute bulk queryAll to include deleted and archived records.

        Parameters:
        - data: SOQL query string  
        - lazy_operation: Return job info instead of waiting for completion
        - wait: Polling interval in seconds for job completion

        Returns:
        list|dict: Query results including deleted records or job info
        """

Generic Operations

Flexible method for any DML operation type.

class SFBulkType:
    def submit_dml(
        self,
        function_name,
        data,
        external_id_field=None,
        batch_size=10000,
        use_serial=False,
        bypass_results=False,
        include_detailed_results=False
    ):
        """
        Submit generic DML operation for this SObject type.

        Parameters:
        - function_name: DML operation name ('insert', 'update', etc.)
        - data: List of record dictionaries or CSV string
        - external_id_field: External ID field (for upsert operations)
        - batch_size: Records per batch (max 10,000)
        - use_serial: Process batches sequentially
        - bypass_results: Skip downloading results
        - include_detailed_results: Include detailed success/error info

        Returns:
        list: Operation results with success/error details per record
        """

Usage Examples

Basic Bulk Insert

from simple_salesforce import Salesforce

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

# Prepare data for bulk insert
accounts_data = [
    {'Name': 'Bulk Account 1', 'Type': 'Customer', 'Industry': 'Technology'},
    {'Name': 'Bulk Account 2', 'Type': 'Partner', 'Industry': 'Manufacturing'},
    {'Name': 'Bulk Account 3', 'Type': 'Customer', 'Industry': 'Healthcare'}
    # ... up to 10,000 records per batch
]

# Execute bulk insert
insert_results = sf.bulk.Account.insert(accounts_data)

# Process results
for i, result in enumerate(insert_results):
    if result['success']:
        print(f"Account {i+1} created with ID: {result['id']}")
    else:
        print(f"Account {i+1} failed: {result['error']}")

Bulk Update with Error Handling

# Prepare update data (must include Id field)
update_data = [
    {'Id': '001XX000003DHPr', 'Phone': '555-123-4567'},
    {'Id': '001XX000003DHPs', 'Phone': '555-234-5678'},
    {'Id': '001XX000003DHPt', 'Phone': '555-345-6789'}
]

try:
    update_results = sf.bulk.Account.update(
        update_data,
        batch_size=5000,
        include_detailed_results=True
    )
    
    success_count = sum(1 for r in update_results if r['success'])
    error_count = len(update_results) - success_count
    
    print(f"Updated {success_count} records successfully")
    print(f"Failed to update {error_count} records")
    
    # Handle errors
    for result in update_results:
        if not result['success']:
            print(f"Error updating {result['id']}: {result['error']}")
            
except Exception as e:
    print(f"Bulk update failed: {e}")

Bulk Upsert with External ID

# Data with external ID field
upsert_data = [
    {'External_ID__c': 'EXT001', 'Name': 'Upsert Account 1', 'Type': 'Customer'},
    {'External_ID__c': 'EXT002', 'Name': 'Upsert Account 2', 'Type': 'Partner'},
    {'External_ID__c': 'EXT003', 'Name': 'Updated Account 3', 'Industry': 'Technology'}
]

# Execute upsert using external ID field
upsert_results = sf.bulk.Account.upsert(
    upsert_data,
    external_id_field='External_ID__c',
    batch_size=1000
)

# Check created vs updated records
for result in upsert_results:
    if result['success']:
        action = 'Created' if result['created'] else 'Updated'
        print(f"{action} record ID: {result['id']}")

Bulk Query for Large Datasets

# Query large dataset using bulk API
query = "SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"

query_results = sf.bulk.Account.query(query)

print(f"Retrieved {len(query_results)} accounts")
for record in query_results[:10]:  # Show first 10
    print(f"Account: {record['Name']} - {record['Type']}")

# Query including deleted records
deleted_query = "SELECT Id, Name FROM Account WHERE IsDeleted = true"
deleted_results = sf.bulk.Account.query_all(deleted_query)
print(f"Found {len(deleted_results)} deleted accounts")

Performance Optimization

# Large dataset with performance optimizations
large_dataset = generate_large_dataset(50000)  # 50K records

# Use larger batches for better throughput
results = sf.bulk.Contact.insert(
    large_dataset,
    batch_size=10000,        # Maximum batch size
    use_serial=False,        # Parallel processing
    bypass_results=True      # Skip result download for speed
)

# For operations where you need results but want speed
results = sf.bulk.Contact.insert(
    large_dataset,
    batch_size=10000,
    use_serial=False,
    bypass_results=False,
    include_detailed_results=False  # Less detail = faster processing
)

CSV Data Processing

# Work with CSV data directly
csv_data = """Name,Type,Industry
CSV Account 1,Customer,Technology
CSV Account 2,Partner,Manufacturing
CSV Account 3,Customer,Healthcare"""

# Insert CSV data
csv_results = sf.bulk.Account.insert(csv_data)

# Read CSV file and process
with open('accounts.csv', 'r') as csvfile:
    csv_content = csvfile.read()
    
bulk_results = sf.bulk.Account.insert(
    csv_content,
    batch_size=5000,
    include_detailed_results=True
)

Job Management and Monitoring

# For long-running operations, use lazy mode
job_info = sf.bulk.Account.insert(
    large_dataset,
    lazy_operation=True  # Returns job info instead of waiting
)

print(f"Started bulk job: {job_info['jobId']}")

# Monitor job progress (would need custom polling)
# This is handled automatically by default, but lazy mode gives you control

Job Management Methods (Advanced)

Lower-level job management methods for advanced use cases and custom workflows.

class SFBulkType:
    def _create_job(self, operation, use_serial, external_id_field=None):
        """
        Create a new bulk job (internal method).

        Parameters:
        - operation: Bulk operation type
        - use_serial: Sequential vs parallel batch processing
        - external_id_field: External ID field name (for upsert)

        Returns:
        dict: Job creation response with job ID
        """

    def _close_job(self, job_id):
        """
        Close a bulk job to stop accepting new batches.

        Parameters:
        - job_id: Bulk job identifier

        Returns:
        dict: Job status after closing
        """

    def _get_job(self, job_id):
        """
        Get current job status and information.

        Parameters:
        - job_id: Bulk job identifier

        Returns:
        dict: Complete job status and statistics
        """

    def _add_batch(self, job_id, data, operation):
        """
        Add a batch of records to an existing job.

        Parameters:
        - job_id: Bulk job identifier
        - data: Record data for the batch
        - operation: Operation type for data formatting

        Returns:
        dict: Batch creation response with batch ID
        """

    def _get_batch(self, job_id, batch_id):
        """
        Get batch status and processing information.

        Parameters:
        - job_id: Bulk job identifier
        - batch_id: Batch identifier within the job

        Returns:
        dict: Batch status and statistics
        """

    def _get_batch_results(self, job_id, batch_id, operation):
        """
        Retrieve results for a completed batch.

        Parameters:
        - job_id: Bulk job identifier
        - batch_id: Batch identifier  
        - operation: Operation type for result parsing

        Returns:
        list: Batch results with success/error details per record
        """

Best Practices

Data Preparation

# Ensure data is properly formatted
def prepare_bulk_data(records):
    """Prepare records for bulk operations."""
    prepared = []
    for record in records:
        # Remove None values
        clean_record = {k: v for k, v in record.items() if v is not None}
        
        # Ensure required fields are present
        if 'Name' not in clean_record:
            clean_record['Name'] = 'Default Name'
            
        prepared.append(clean_record)
    
    return prepared

# Use prepared data
clean_data = prepare_bulk_data(raw_data)
results = sf.bulk.Account.insert(clean_data)

Error Handling and Retry Logic

def bulk_insert_with_retry(bulk_type, data, max_retries=3):
    """Bulk insert with retry logic for failed records."""
    
    for attempt in range(max_retries):
        try:
            results = bulk_type.insert(
                data,
                include_detailed_results=True
            )
            
            # Separate successful and failed records
            failed_data = []
            for i, result in enumerate(results):
                if not result['success']:
                    failed_data.append(data[i])
                    print(f"Failed record: {result['error']}")
            
            if not failed_data:
                print(f"All records processed successfully on attempt {attempt + 1}")
                return results
                
            # Retry with failed records only
            data = failed_data
            print(f"Retrying {len(failed_data)} failed records...")
            
        except Exception as e:
            print(f"Attempt {attempt + 1} failed with error: {e}")
            if attempt == max_retries - 1:
                raise
    
    return results

# Usage
results = bulk_insert_with_retry(sf.bulk.Account, account_data)

Memory Management for Large Datasets

def process_large_file(filename, bulk_type, chunk_size=10000):
    """Process large CSV files in chunks to manage memory."""
    
    with open(filename, 'r') as file:
        header = file.readline().strip().split(',')
        chunk = []
        
        for line_num, line in enumerate(file, 1):
            values = line.strip().split(',')
            record = dict(zip(header, values))
            chunk.append(record)
            
            if len(chunk) >= chunk_size:
                # Process chunk
                results = bulk_type.insert(chunk, bypass_results=True)
                print(f"Processed chunk ending at line {line_num}")
                chunk = []
        
        # Process remaining records
        if chunk:
            results = bulk_type.insert(chunk, bypass_results=True)
            print(f"Processed final chunk of {len(chunk)} records")

# Usage
process_large_file('massive_accounts.csv', sf.bulk.Account)

Install with Tessl CLI

npx tessl i tessl/pypi-simple-salesforce

docs

authentication.md

bulk-operations.md

bulk2-operations.md

exceptions.md

index.md

metadata-api.md

rest-api.md

utilities.md

tile.json