CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-salesforce

Apache Airflow provider package enabling Salesforce CRM and Tableau Server integration for data workflows

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

salesforce.mddocs/

Salesforce Integration

Comprehensive Salesforce CRM integration capabilities including connection management, SOQL query execution, object metadata retrieval, and data export functionality. Uses the simple-salesforce library for robust API communication.

Capabilities

Connection Management

Establishes and manages authenticated connections to Salesforce using security tokens. Supports both production and sandbox environments through connection configuration.

class SalesforceHook(BaseHook):
    def __init__(self, conn_id: str) -> None:
        """
        Initialize Salesforce hook with connection ID.
        
        Parameters:
        - conn_id: Airflow connection ID containing Salesforce credentials
        """

    def get_conn(self) -> api.Salesforce:
        """
        Sign into Salesforce and return authenticated connection.
        
        Connection uses security token from connection extras.
        For sandbox: add {"domain": "test"} to connection extras.
        
        Returns:
        Authenticated Salesforce API connection
        """

Connection Configuration:

  • Connection Type: http
  • Host: Salesforce instance URL (optional)
  • Login: Salesforce username
  • Password: Salesforce password
  • Extras: {"security_token": "your_token"} or {"security_token": "your_token", "domain": "test"} for sandbox

Data Querying

Execute SOQL queries against Salesforce objects with support for deleted records and custom query parameters.

def make_query(self, query: str, include_deleted: bool = False, query_params: Optional[dict] = None) -> dict:
    """
    Execute SOQL query against Salesforce.
    
    Parameters:
    - query: SOQL query string
    - include_deleted: Include deleted records in results
    - query_params: Additional query parameters
    
    Returns:
    Query results dictionary with 'records', 'totalSize', 'done' keys
    """

Usage Example:

hook = SalesforceHook('salesforce_conn')

# Basic query
results = hook.make_query("SELECT Id, Name FROM Account WHERE Type = 'Customer'")

# Query with deleted records
results = hook.make_query(
    "SELECT Id, Name FROM Contact", 
    include_deleted=True
)

# Process results
for record in results['records']:
    print(f"ID: {record['Id']}, Name: {record['Name']}")

Object Metadata

Retrieve detailed schema information and available fields for Salesforce objects.

def describe_object(self, obj: str) -> dict:
    """
    Get complete object description including schema and metadata.
    
    Parameters:
    - obj: Salesforce object name (e.g., 'Account', 'Contact')
    
    Returns:
    Object description dictionary with fields, relationships, metadata
    """

def get_available_fields(self, obj: str) -> List[str]:
    """
    Get list of all available field names for an object.
    
    Parameters:
    - obj: Salesforce object name
    
    Returns:
    List of field names
    """

Usage Example:

hook = SalesforceHook('salesforce_conn')

# Get object schema
account_schema = hook.describe_object('Account')
print(f"Account has {len(account_schema['fields'])} fields")

# Get field names only
field_names = hook.get_available_fields('Account')
print(f"Available fields: {', '.join(field_names[:5])}...")

Data Retrieval

Simplified data retrieval for complete object datasets with field selection.

def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict:
    """
    Retrieve all records for an object with specified fields.
    
    Equivalent to: SELECT <fields> FROM <obj>
    
    Parameters:
    - obj: Salesforce object name
    - fields: List/iterable of field names to retrieve
    
    Returns:
    Query results dictionary
    """

Usage Example:

hook = SalesforceHook('salesforce_conn')

# Get specific fields from all accounts
accounts = hook.get_object_from_salesforce(
    'Account', 
    ['Id', 'Name', 'Type', 'Industry', 'CreatedDate']
)

print(f"Retrieved {accounts['totalSize']} accounts")

Data Export

Export query results to files in multiple formats with data processing options.

def write_object_to_file(
    self,
    query_results: List[dict],
    filename: str,
    fmt: str = "csv",
    coerce_to_timestamp: bool = False,
    record_time_added: bool = False,
) -> pd.DataFrame:
    """
    Write query results to file with format options.
    
    Parameters:
    - query_results: List of record dictionaries from query
    - filename: Output file path
    - fmt: Output format ('csv', 'json', 'ndjson')
    - coerce_to_timestamp: Convert datetime fields to Unix timestamps
    - record_time_added: Add fetch timestamp field
    
    Returns:
    DataFrame that was written to file
    """

def object_to_df(
    self,
    query_results: List[dict],
    coerce_to_timestamp: bool = False,
    record_time_added: bool = False
) -> pd.DataFrame:
    """
    Convert query results to pandas DataFrame.
    
    Parameters:
    - query_results: List of record dictionaries
    - coerce_to_timestamp: Convert datetime fields to Unix timestamps  
    - record_time_added: Add fetch timestamp field
    
    Returns:
    pandas DataFrame with processed data
    """

@classmethod
def _to_timestamp(cls, column: pd.Series) -> pd.Series:
    """
    Convert DataFrame column to UNIX timestamps if applicable.
    
    Internal method used for datetime field conversion with error handling.
    Attempts to convert string datetime fields to timestamps while preserving 
    non-datetime columns unchanged.
    
    Parameters:
    - column: pandas Series representing a DataFrame column
    
    Returns:
    Series with timestamps or original values if conversion fails
    """

Usage Examples:

hook = SalesforceHook('salesforce_conn')

# Query data
results = hook.make_query("SELECT Id, Name, CreatedDate FROM Account LIMIT 100")

# Export to CSV (default)
df = hook.write_object_to_file(results['records'], 'accounts.csv')

# Export to JSON with timestamps
df = hook.write_object_to_file(
    results['records'],
    'accounts.json',
    fmt='json',
    coerce_to_timestamp=True,
    record_time_added=True
)

# Convert to DataFrame without file export
df = hook.object_to_df(results['records'])
print(f"DataFrame shape: {df.shape}")

Data Processing Features

  • Automatic column lowercasing: All DataFrame columns converted to lowercase
  • Newline cleaning: Removes newlines from string fields for CSV compatibility
  • Datetime handling: Flexible timestamp conversion with error handling
  • Missing value handling: Proper NaN handling for integer columns with nulls
  • Attribute filtering: Removes Salesforce 'attributes' metadata from records

Error Handling

The hook handles various error conditions:

  • Authentication failures: Invalid credentials or security tokens
  • Query errors: Malformed SOQL or field access issues
  • Connection timeouts: Network connectivity problems
  • Data conversion errors: Type conversion failures during export

Always wrap hook operations in try-catch blocks for production use:

try:
    hook = SalesforceHook('salesforce_conn')
    results = hook.make_query("SELECT Id FROM Account")
except Exception as e:
    print(f"Salesforce operation failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-salesforce

docs

index.md

salesforce.md

tableau.md

tile.json