CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-simple-salesforce

A basic Salesforce.com REST API client for Python applications.

Pending
Overview
Eval results
Files

bulk2-operations.mddocs/

Bulk API v2.0 Operations

Next-generation bulk operations with improved performance, simplified job management, and enhanced monitoring capabilities for modern high-volume data processing. Bulk API v2.0 provides streamlined job workflows, better error handling, and more efficient data transfer compared to the original Bulk API.

SFBulk2Handler Class

The main handler class for Bulk API v2.0 operations, providing modern bulk functionality with improved job management and monitoring.

class SFBulk2Handler:
    def __init__(
        self,
        session_id,
        bulk2_url,
        proxies=None,
        session=None
    ):
        """
        Initialize Bulk API v2.0 handler.

        Parameters:
        - session_id: Authenticated Salesforce session ID
        - bulk2_url: Bulk 2.0 API endpoint URL
        - proxies: HTTP proxy configuration dictionary
        - session: Optional custom requests.Session object
        """

Constants and Configuration

class SFBulk2Handler:
    JSON_CONTENT_TYPE = "application/json"
    CSV_CONTENT_TYPE = "text/csv"
    DEFAULT_WAIT_TIMEOUT_SECONDS = 300
    MAX_CHECK_INTERVAL_SECONDS = 30
    DEFAULT_QUERY_PAGE_SIZE = 50000

Accessing Bulk 2.0 Operations

The SFBulk2Handler is accessed through the bulk2 property of the main Salesforce client:

from simple_salesforce import Salesforce

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

# Access Bulk 2.0 handler
bulk2_handler = sf.bulk2

# Bulk 2.0 uses direct method calls rather than object attributes

Job Management

Comprehensive job lifecycle management with improved status monitoring and control.

class SFBulk2Handler:
    def create_job(
        self,
        operation,
        object_name=None,
        external_id_field=None,
        query=None
    ):
        """
        Create a new Bulk 2.0 job.

        Parameters:
        - operation: Operation type (insert, upsert, update, delete, hard_delete, query, query_all)
        - object_name: Salesforce SObject API name (required for DML operations)
        - external_id_field: External ID field name (required for upsert)
        - query: SOQL query string (required for query operations)

        Returns:
        dict: Job creation response with job ID and status
        """

    def get_job(self, job_id, is_query):
        """
        Get comprehensive job information and status.

        Parameters:
        - job_id: Bulk 2.0 job identifier
        - is_query: True for query jobs, False for ingest jobs

        Returns:
        dict: Complete job information including progress and statistics
        """

    def close_job(self, job_id):
        """
        Close ingest job to begin processing uploaded data.

        Parameters:
        - job_id: Ingest job identifier

        Returns:
        dict: Job status after closing
        """

    def abort_job(self, job_id, is_query):
        """
        Abort running job to stop processing.

        Parameters:
        - job_id: Job identifier to abort
        - is_query: True for query jobs, False for ingest jobs

        Returns:
        dict: Job status after aborting
        """

    def delete_job(self, job_id, is_query):
        """
        Delete completed job and its data.

        Parameters:
        - job_id: Job identifier to delete
        - is_query: True for query jobs, False for ingest jobs

        Returns:
        dict: Deletion confirmation
        """

    def wait_for_job(self, job_id, is_query, wait=0.5):
        """
        Wait for job completion with intelligent polling.

        Parameters:
        - job_id: Job identifier to monitor
        - is_query: True for query jobs, False for ingest jobs
        - wait: Initial polling interval in seconds

        Returns:
        dict: Final job status when completed or failed
        """

Data Operations

Efficient data upload and download operations with support for large datasets.

class SFBulk2Handler:
    def upload_job_data(self, job_id, data, content_url=None):
        """
        Upload data for ingest job processing.

        Parameters:
        - job_id: Ingest job identifier
        - data: CSV data string or file-like object
        - content_url: Optional alternative upload URL

        Returns:
        dict: Upload confirmation and status
        """

    def get_query_results(
        self,
        job_id,
        locator="",
        max_records=None
    ):
        """
        Get query results with pagination support.

        Parameters:
        - job_id: Query job identifier
        - locator: Pagination locator for subsequent pages
        - max_records: Maximum records per page (default: DEFAULT_QUERY_PAGE_SIZE)

        Returns:
        dict: Query results with data and pagination info
        """

    def download_job_data(
        self,
        path,
        job_id,
        locator="",
        max_records=None,
        chunk_size=1024
    ):
        """
        Download query results directly to file.

        Parameters:
        - path: Local file path for saving results
        - job_id: Query job identifier  
        - locator: Pagination locator for specific page
        - max_records: Maximum records per download
        - chunk_size: File write chunk size in bytes

        Returns:
        dict: Download status and file information
        """

Enums and Types

Bulk API v2.0 operation types and job states for type-safe operations.

class Operation:
    """Enumeration of supported Bulk 2.0 operations."""
    INSERT = "insert"
    UPSERT = "upsert" 
    UPDATE = "update"
    DELETE = "delete"
    HARD_DELETE = "hard_delete"
    QUERY = "query"
    QUERY_ALL = "query_all"

class JobState:
    """Enumeration of Bulk 2.0 job states."""
    OPEN = "open"
    ABORTED = "aborted"
    FAILED = "failed"
    UPLOAD_COMPLETE = "upload_complete"
    IN_PROGRESS = "in_progress"
    JOB_COMPLETE = "job_complete"

class ColumnDelimiter:
    """CSV column delimiter options."""
    COMMA = "COMMA"
    TAB = "TAB"
    PIPE = "PIPE"
    SEMICOLON = "SEMICOLON"
    CARET = "CARET"

class LineEnding:
    """CSV line ending options."""
    LF = "LF"      # Unix/Linux
    CRLF = "CRLF"  # Windows

class ResultsType:
    """Query result format types."""
    CSV = "CSV"
    JSON = "JSON"

Usage Examples

Basic Bulk 2.0 Insert

from simple_salesforce import Salesforce

sf = Salesforce(username='user@example.com', password='pass', security_token='token')

# Prepare CSV data
csv_data = """Name,Type,Industry
Bulk2 Account 1,Customer,Technology
Bulk2 Account 2,Partner,Manufacturing  
Bulk2 Account 3,Customer,Healthcare"""

# Create ingest job
job_response = sf.bulk2.create_job(
    operation='insert',
    object_name='Account'
)
job_id = job_response['id']
print(f"Created job: {job_id}")

# Upload data
sf.bulk2.upload_job_data(job_id, csv_data)

# Close job to start processing
sf.bulk2.close_job(job_id)

# Wait for completion
final_status = sf.bulk2.wait_for_job(job_id, is_query=False)

print(f"Job completed with state: {final_status['state']}")
print(f"Records processed: {final_status['numberRecordsProcessed']}")
print(f"Records failed: {final_status['numberRecordsFailed']}")

Bulk 2.0 Query Operations

# Create query job
query_job = sf.bulk2.create_job(
    operation='query',
    query="SELECT Id, Name, Type, Industry FROM Account WHERE CreatedDate = THIS_YEAR"
)
query_job_id = query_job['id']

# Wait for query completion
final_query_status = sf.bulk2.wait_for_job(query_job_id, is_query=True)

if final_query_status['state'] == 'JobComplete':
    # Get query results
    results = sf.bulk2.get_query_results(query_job_id)
    
    # Process CSV results
    csv_data = results['data']
    lines = csv_data.strip().split('\n')
    header = lines[0].split(',')
    
    for line in lines[1:]:
        values = line.split(',')
        record = dict(zip(header, values))
        print(f"Account: {record['Name']} - {record['Type']}")

# Download large results to file
sf.bulk2.download_job_data(
    '/path/to/results.csv',
    query_job_id,
    max_records=100000
)

Advanced Bulk 2.0 Upsert

# Prepare data with external ID
upsert_csv = """External_ID__c,Name,Type,Industry
EXT001,Upsert Account 1,Customer,Technology
EXT002,Upsert Account 2,Partner,Manufacturing
EXT003,Updated Account 3,Customer,Finance"""

# Create upsert job with external ID field
upsert_job = sf.bulk2.create_job(
    operation='upsert',
    object_name='Account',
    external_id_field='External_ID__c'
)
upsert_job_id = upsert_job['id']

# Upload and process
sf.bulk2.upload_job_data(upsert_job_id, upsert_csv)
sf.bulk2.close_job(upsert_job_id)

# Monitor with custom polling
import time

while True:
    status = sf.bulk2.get_job(upsert_job_id, is_query=False)
    print(f"Job state: {status['state']}")
    
    if status['state'] in ['JobComplete', 'Failed', 'Aborted']:
        break
        
    time.sleep(2)

# Check final results
final_status = sf.bulk2.get_job(upsert_job_id, is_query=False)
print(f"Created: {final_status['numberRecordsProcessed'] - final_status['numberRecordsFailed']}")
print(f"Failed: {final_status['numberRecordsFailed']}")

Bulk 2.0 with Error Handling

def bulk2_insert_with_monitoring(sf, object_name, csv_data):
    """Bulk 2.0 insert with comprehensive error handling."""
    
    try:
        # Create job
        job_response = sf.bulk2.create_job(
            operation='insert',
            object_name=object_name
        )
        job_id = job_response['id']
        
        print(f"Created job {job_id} for {object_name}")
        
        # Upload data
        upload_response = sf.bulk2.upload_job_data(job_id, csv_data)
        print("Data uploaded successfully")
        
        # Close job
        close_response = sf.bulk2.close_job(job_id)
        print(f"Job closed, state: {close_response['state']}")
        
        # Wait for completion with timeout
        start_time = time.time()
        timeout = 300  # 5 minutes
        
        while True:
            status = sf.bulk2.get_job(job_id, is_query=False)
            elapsed = time.time() - start_time
            
            print(f"Job state: {status['state']} (elapsed: {elapsed:.1f}s)")
            
            if status['state'] == 'JobComplete':
                print("Job completed successfully!")
                return {
                    'job_id': job_id,
                    'success': True,
                    'processed': status['numberRecordsProcessed'],
                    'failed': status['numberRecordsFailed']
                }
            elif status['state'] in ['Failed', 'Aborted']:
                print(f"Job failed with state: {status['state']}")
                return {
                    'job_id': job_id,
                    'success': False,
                    'error': status.get('stateMessage', 'Unknown error')
                }
            elif elapsed > timeout:
                # Abort timed-out job
                sf.bulk2.abort_job(job_id, is_query=False)
                raise TimeoutError(f"Job {job_id} timed out after {timeout} seconds")
            
            time.sleep(5)
            
    except Exception as e:
        print(f"Error in bulk operation: {e}")
        # Clean up job if possible
        try:
            sf.bulk2.abort_job(job_id, is_query=False)
        except:
            pass
        raise

# Usage
result = bulk2_insert_with_monitoring(sf, 'Contact', contact_csv_data)
if result['success']:
    print(f"Processed {result['processed']} records")
else:
    print(f"Job failed: {result['error']}")

Large Dataset Query with Pagination

def bulk2_query_all_pages(sf, query, output_dir):
    """Query large dataset with automatic pagination."""
    
    # Create query job  
    query_job = sf.bulk2.create_job(operation='query', query=query)
    job_id = query_job['id']
    
    # Wait for completion
    sf.bulk2.wait_for_job(job_id, is_query=True)
    
    page_num = 1
    locator = ""
    all_records = []
    
    while True:
        # Get page of results
        page_results = sf.bulk2.get_query_results(
            job_id, 
            locator=locator,
            max_records=50000
        )
        
        # Download page to file
        page_file = f"{output_dir}/page_{page_num}.csv"
        sf.bulk2.download_job_data(
            page_file,
            job_id,
            locator=locator,
            max_records=50000
        )
        
        print(f"Downloaded page {page_num} to {page_file}")
        
        # Check for more pages
        if 'nextRecordsUrl' not in page_results or not page_results['nextRecordsUrl']:
            break
            
        # Extract locator for next page
        locator = page_results['nextRecordsUrl'].split('locator=')[1]
        page_num += 1
    
    print(f"Downloaded {page_num} pages of results")
    return page_num

# Usage  
pages_downloaded = bulk2_query_all_pages(
    sf,
    "SELECT Id, Name, Email FROM Contact WHERE CreatedDate = LAST_N_DAYS:30",
    "/tmp/bulk_results"
)

Bulk 2.0 Job Management

def manage_bulk2_jobs(sf):
    """Example of advanced job management operations."""
    
    # Create multiple jobs
    jobs = []
    
    # Insert job
    insert_job = sf.bulk2.create_job(operation='insert', object_name='Account')
    jobs.append(('insert', insert_job['id']))
    
    # Query job
    query_job = sf.bulk2.create_job(
        operation='query',
        query="SELECT Id, Name FROM Account LIMIT 1000"
    )
    jobs.append(('query', query_job['id']))
    
    # Monitor all jobs
    for job_type, job_id in jobs:
        is_query = (job_type == 'query')
        
        # Get job status
        status = sf.bulk2.get_job(job_id, is_query=is_query)
        print(f"{job_type.title()} job {job_id}: {status['state']}")
        
        # Abort if needed
        if status['state'] == 'InProgress':
            print(f"Aborting {job_type} job {job_id}")
            sf.bulk2.abort_job(job_id, is_query=is_query)
        
        # Delete completed jobs
        elif status['state'] in ['JobComplete', 'Failed', 'Aborted']:
            print(f"Deleting {job_type} job {job_id}")
            sf.bulk2.delete_job(job_id, is_query=is_query)

# Usage
manage_bulk2_jobs(sf)

Utility Functions

Data processing utilities for Bulk API v2.0 operations.

class SFBulk2Handler:
    def filter_null_bytes(self, b):
        """
        Filter null bytes from strings or bytes objects.

        Parameters:
        - b: String or bytes object to filter

        Returns:
        str|bytes: Filtered content with null bytes removed
        """

Best Practices

Data Preparation for Bulk 2.0

def prepare_csv_for_bulk2(records, field_mapping=None):
    """Prepare record data as CSV for Bulk 2.0 operations."""
    
    if not records:
        return ""
    
    # Apply field mapping if provided
    if field_mapping:
        records = [
            {field_mapping.get(k, k): v for k, v in record.items()}
            for record in records
        ]
    
    # Get headers from first record
    headers = list(records[0].keys())
    
    # Build CSV
    csv_lines = [','.join(headers)]
    
    for record in records:
        values = []
        for header in headers:
            value = record.get(header, '')
            
            # Handle CSV escaping
            if isinstance(value, str):
                if ',' in value or '"' in value or '\n' in value:
                    value = f'"{value.replace('"', '""')}"'
            
            values.append(str(value))
        
        csv_lines.append(','.join(values))
    
    return '\n'.join(csv_lines)

# Usage
csv_data = prepare_csv_for_bulk2(
    account_records,
    field_mapping={'company_name': 'Name', 'account_type': 'Type'}
)

Error Analysis

def analyze_bulk2_results(sf, job_id):
    """Analyze Bulk 2.0 job results for errors and success rates."""
    
    # Get final job status
    job_status = sf.bulk2.get_job(job_id, is_query=False)
    
    total_records = job_status['numberRecordsProcessed']
    failed_records = job_status['numberRecordsFailed']
    success_records = total_records - failed_records
    
    success_rate = (success_records / total_records * 100) if total_records > 0 else 0
    
    analysis = {
        'job_id': job_id,
        'total_records': total_records,
        'successful_records': success_records,
        'failed_records': failed_records,
        'success_rate': f"{success_rate:.1f}%",
        'job_state': job_status['state']
    }
    
    # Add error details if available
    if 'stateMessage' in job_status:
        analysis['error_message'] = job_status['stateMessage']
    
    return analysis

# Usage
results = analyze_bulk2_results(sf, completed_job_id)
print(f"Job {results['job_id']}: {results['success_rate']} success rate")

Performance Optimization

def optimize_bulk2_performance():
    """Best practices for Bulk 2.0 performance optimization."""
    
    recommendations = {
        'batch_sizing': 'Use up to 100MB per job (no artificial batch limits)',
        'data_format': 'Use CSV format for better performance vs JSON',
        'field_selection': 'Only include necessary fields in queries',
        'parallel_jobs': 'Run multiple concurrent jobs for different objects',
        'monitoring': 'Use wait_for_job() with appropriate polling intervals',
        'cleanup': 'Delete completed jobs to avoid storage limits'
    }
    
    return recommendations

# Example of concurrent processing
import concurrent.futures
import threading

def process_object_bulk2(sf, object_name, data):
    """Process single object with Bulk 2.0."""
    csv_data = prepare_csv_for_bulk2(data)
    
    job = sf.bulk2.create_job(operation='insert', object_name=object_name)
    sf.bulk2.upload_job_data(job['id'], csv_data)
    sf.bulk2.close_job(job['id'])
    
    return sf.bulk2.wait_for_job(job['id'], is_query=False)

# Process multiple objects concurrently
objects_data = {
    'Account': account_records,
    'Contact': contact_records,
    'Opportunity': opportunity_records
}

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = {
        executor.submit(process_object_bulk2, sf, obj_name, data): obj_name
        for obj_name, data in objects_data.items()
    }
    
    for future in concurrent.futures.as_completed(futures):
        obj_name = futures[future]
        try:
            result = future.result()
            print(f"{obj_name}: {result['state']}")
        except Exception as e:
            print(f"{obj_name} failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-simple-salesforce

docs

authentication.md

bulk-operations.md

bulk2-operations.md

exceptions.md

index.md

metadata-api.md

rest-api.md

utilities.md

tile.json