or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsalesforce.mdtableau.md
tile.json

tessl/pypi-apache-airflow-providers-salesforce

Apache Airflow provider package enabling Salesforce CRM and Tableau Server integration for data workflows

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-salesforce@2.0.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-salesforce@2.0.0

index.mddocs/

Apache Airflow Providers Salesforce

Apache Airflow provider package that enables integration between Airflow and Salesforce CRM/Tableau Server services. This package provides hooks for connecting to Salesforce, operators for executing Tableau workbook refreshes, and sensors for monitoring job status. It's part of the official Apache Airflow ecosystem for modern data workflows.

Package Information

  • Package Name: apache-airflow-providers-salesforce
  • Language: Python
  • Installation: pip install apache-airflow-providers-salesforce
  • Requirements: apache-airflow>=2.0.0, simple-salesforce>=1.0.0, tableauserverclient>=0.12, pandas>=0.17.1

Core Imports

from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
from airflow.providers.salesforce.hooks.tableau import TableauHook, TableauJobFinishCode
from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
from airflow.providers.salesforce.sensors.tableau_job_status import TableauJobStatusSensor, TableauJobFailedException

Basic Usage

Salesforce Data Extraction

from airflow.providers.salesforce.hooks.salesforce import SalesforceHook

# Create hook with connection
hook = SalesforceHook(conn_id='salesforce_default')

# Query Salesforce data
query_results = hook.make_query("SELECT Id, Name FROM Account LIMIT 10")

# Export to file
hook.write_object_to_file(
    query_results['records'], 
    'accounts.csv', 
    fmt='csv'
)

Tableau Workbook Refresh

from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator

# Define refresh operator
refresh_task = TableauRefreshWorkbookOperator(
    task_id='refresh_workbook',
    workbook_name='Sales Dashboard',
    tableau_conn_id='tableau_default',
    blocking=True  # Wait for completion
)

Architecture

The package follows Airflow's provider pattern with three main component types:

  • Hooks: Low-level connection interfaces to external systems (Salesforce, Tableau)
  • Operators: Higher-level task components that perform specific operations
  • Sensors: Components that monitor external system states and conditions

All components integrate with Airflow's connection management system and support templating, retry logic, and other standard Airflow features.

Capabilities

Salesforce Integration

Connect to Salesforce CRM, execute SOQL queries, retrieve object metadata, and export data to various formats. Supports security token authentication and sandbox environments.

class SalesforceHook(BaseHook):
    def __init__(self, conn_id: str) -> None: ...
    def get_conn(self) -> api.Salesforce: ...
    def make_query(self, query: str, include_deleted: bool = False, query_params: Optional[dict] = None) -> dict: ...
    def describe_object(self, obj: str) -> dict: ...
    def get_available_fields(self, obj: str) -> List[str]: ...
    def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict: ...
    def write_object_to_file(self, query_results: List[dict], filename: str, fmt: str = "csv", coerce_to_timestamp: bool = False, record_time_added: bool = False) -> pd.DataFrame: ...
    def object_to_df(self, query_results: List[dict], coerce_to_timestamp: bool = False, record_time_added: bool = False) -> pd.DataFrame: ...

Salesforce Integration

Tableau Server Operations

Connect to Tableau Server, refresh workbooks, and monitor job execution. Supports both username/password and personal access token authentication methods.

class TableauHook(BaseHook):
    def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default') -> None: ...
    def __enter__(self): ...
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
    def get_conn(self) -> Auth.contextmgr: ...
    def get_all(self, resource_name: str) -> Pager: ...

class TableauRefreshWorkbookOperator(BaseOperator):
    def __init__(self, *, workbook_name: str, site_id: Optional[str] = None, blocking: bool = True, tableau_conn_id: str = 'tableau_default', **kwargs) -> None: ...
    def execute(self, context: dict) -> str: ...

class TableauJobStatusSensor(BaseSensorOperator):
    def __init__(self, *, job_id: str, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default', **kwargs) -> None: ...
    def poke(self, context: dict) -> bool: ...

Tableau Operations

Types

from enum import Enum
from typing import Optional, List, Iterable, Any

class TableauJobFinishCode(Enum):
    """Job status enumeration for Tableau operations."""
    PENDING = -1
    SUCCESS = 0
    ERROR = 1
    CANCELED = 2

class TableauJobFailedException(AirflowException):
    """Exception raised when Tableau job fails."""
    pass