Apache Airflow provider package enabling Salesforce CRM and Tableau Server integration for data workflows
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Salesforce CRM integration capabilities including connection management, SOQL query execution, object metadata retrieval, and data export functionality. Uses the simple-salesforce library for robust API communication.
Establishes and manages authenticated connections to Salesforce using security tokens. Supports both production and sandbox environments through connection configuration.
class SalesforceHook(BaseHook):
def __init__(self, conn_id: str) -> None:
"""
Initialize Salesforce hook with connection ID.
Parameters:
- conn_id: Airflow connection ID containing Salesforce credentials
"""
def get_conn(self) -> api.Salesforce:
"""
Sign into Salesforce and return authenticated connection.
Connection uses security token from connection extras.
For sandbox: add {"domain": "test"} to connection extras.
Returns:
Authenticated Salesforce API connection
"""Connection Configuration:
http{"security_token": "your_token"} or {"security_token": "your_token", "domain": "test"} for sandboxExecute SOQL queries against Salesforce objects with support for deleted records and custom query parameters.
def make_query(self, query: str, include_deleted: bool = False, query_params: Optional[dict] = None) -> dict:
"""
Execute SOQL query against Salesforce.
Parameters:
- query: SOQL query string
- include_deleted: Include deleted records in results
- query_params: Additional query parameters
Returns:
Query results dictionary with 'records', 'totalSize', 'done' keys
"""Usage Example:
hook = SalesforceHook('salesforce_conn')
# Basic query
results = hook.make_query("SELECT Id, Name FROM Account WHERE Type = 'Customer'")
# Query with deleted records
results = hook.make_query(
"SELECT Id, Name FROM Contact",
include_deleted=True
)
# Process results
for record in results['records']:
print(f"ID: {record['Id']}, Name: {record['Name']}")Retrieve detailed schema information and available fields for Salesforce objects.
def describe_object(self, obj: str) -> dict:
"""
Get complete object description including schema and metadata.
Parameters:
- obj: Salesforce object name (e.g., 'Account', 'Contact')
Returns:
Object description dictionary with fields, relationships, metadata
"""
def get_available_fields(self, obj: str) -> List[str]:
"""
Get list of all available field names for an object.
Parameters:
- obj: Salesforce object name
Returns:
List of field names
"""Usage Example:
hook = SalesforceHook('salesforce_conn')
# Get object schema
account_schema = hook.describe_object('Account')
print(f"Account has {len(account_schema['fields'])} fields")
# Get field names only
field_names = hook.get_available_fields('Account')
print(f"Available fields: {', '.join(field_names[:5])}...")Simplified data retrieval for complete object datasets with field selection.
def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict:
"""
Retrieve all records for an object with specified fields.
Equivalent to: SELECT <fields> FROM <obj>
Parameters:
- obj: Salesforce object name
- fields: List/iterable of field names to retrieve
Returns:
Query results dictionary
"""Usage Example:
hook = SalesforceHook('salesforce_conn')
# Get specific fields from all accounts
accounts = hook.get_object_from_salesforce(
'Account',
['Id', 'Name', 'Type', 'Industry', 'CreatedDate']
)
print(f"Retrieved {accounts['totalSize']} accounts")Export query results to files in multiple formats with data processing options.
def write_object_to_file(
self,
query_results: List[dict],
filename: str,
fmt: str = "csv",
coerce_to_timestamp: bool = False,
record_time_added: bool = False,
) -> pd.DataFrame:
"""
Write query results to file with format options.
Parameters:
- query_results: List of record dictionaries from query
- filename: Output file path
- fmt: Output format ('csv', 'json', 'ndjson')
- coerce_to_timestamp: Convert datetime fields to Unix timestamps
- record_time_added: Add fetch timestamp field
Returns:
DataFrame that was written to file
"""
def object_to_df(
self,
query_results: List[dict],
coerce_to_timestamp: bool = False,
record_time_added: bool = False
) -> pd.DataFrame:
"""
Convert query results to pandas DataFrame.
Parameters:
- query_results: List of record dictionaries
- coerce_to_timestamp: Convert datetime fields to Unix timestamps
- record_time_added: Add fetch timestamp field
Returns:
pandas DataFrame with processed data
"""
@classmethod
def _to_timestamp(cls, column: pd.Series) -> pd.Series:
"""
Convert DataFrame column to UNIX timestamps if applicable.
Internal method used for datetime field conversion with error handling.
Attempts to convert string datetime fields to timestamps while preserving
non-datetime columns unchanged.
Parameters:
- column: pandas Series representing a DataFrame column
Returns:
Series with timestamps or original values if conversion fails
"""Usage Examples:
hook = SalesforceHook('salesforce_conn')
# Query data
results = hook.make_query("SELECT Id, Name, CreatedDate FROM Account LIMIT 100")
# Export to CSV (default)
df = hook.write_object_to_file(results['records'], 'accounts.csv')
# Export to JSON with timestamps
df = hook.write_object_to_file(
results['records'],
'accounts.json',
fmt='json',
coerce_to_timestamp=True,
record_time_added=True
)
# Convert to DataFrame without file export
df = hook.object_to_df(results['records'])
print(f"DataFrame shape: {df.shape}")The hook handles various error conditions:
Always wrap hook operations in try-catch blocks for production use:
try:
hook = SalesforceHook('salesforce_conn')
results = hook.make_query("SELECT Id FROM Account")
except Exception as e:
print(f"Salesforce operation failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-salesforce