CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-salesforce

Apache Airflow provider package enabling Salesforce CRM and Tableau Server integration for data workflows

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

tableau.mddocs/

Tableau Operations

Comprehensive Tableau Server integration providing workbook refresh operations, job status monitoring, and resource management capabilities. Built on the Tableau Server Client library for robust API communication.

Capabilities

Connection Management

Establishes authenticated connections to Tableau Server with support for multiple authentication methods and site isolation.

class TableauHook(BaseHook):
    def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default') -> None:
        """
        Initialize Tableau hook with site and connection configuration.
        
        Parameters:
        - site_id: Target site ID (uses default site if None)
        - tableau_conn_id: Airflow connection ID with Tableau credentials
        """

    def __enter__(self): ...
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...

    def get_conn(self) -> Auth.contextmgr:
        """
        Create authenticated Tableau Server connection.
        
        Supports username/password and personal access token authentication.
        Returns context manager that automatically signs out on exit.
        
        Returns:
        Authenticated Tableau Server context manager
        """

    def _auth_via_password(self) -> Auth.contextmgr:
        """
        Authenticate using username and password credentials.
        
        Internal method for password-based authentication flow.
        
        Returns:
        Authenticated Tableau Server context manager
        """

    def _auth_via_token(self) -> Auth.contextmgr:
        """
        Authenticate using personal access token.
        
        Internal method for token-based authentication flow.
        
        Returns:
        Authenticated Tableau Server context manager
        """

Connection Configuration Options:

Username/Password Authentication:

  • Connection Type: http
  • Host: Tableau Server URL
  • Login: Username
  • Password: Password
  • Extras: {"site_id": "site_name"} (optional)

Personal Access Token Authentication:

  • Connection Type: http
  • Host: Tableau Server URL
  • Extras: {"token_name": "token_name", "personal_access_token": "token_value", "site_id": "site_name"}

Context Manager Usage

The hook implements Python context manager protocol for automatic resource cleanup:

def __enter__(self): ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...

Usage Example:

with TableauHook(site_id='production', tableau_conn_id='tableau_conn') as hook:
    # Perform operations - connection automatically closed on exit
    workbooks = list(hook.get_all('workbooks'))
    print(f"Found {len(workbooks)} workbooks")

Resource Management

Paginated access to Tableau Server resources with automatic handling of large result sets.

def get_all(self, resource_name: str) -> Pager:
    """
    Get all items for a Tableau Server resource with pagination.
    
    Parameters:
    - resource_name: Resource type ('workbooks', 'jobs', 'users', etc.)
    
    Returns:
    Pager object for iterating through all items
    """

Usage Example:

hook = TableauHook('production', 'tableau_conn')

# Get all workbooks with pagination
for workbook in hook.get_all('workbooks'):
    print(f"Workbook: {workbook.name} (ID: {workbook.id})")

# Get all jobs
for job in hook.get_all('jobs'):
    print(f"Job: {job.id}, Status: {job.finish_code}")

Workbook Refresh Operations

High-level operator for refreshing Tableau workbooks with optional blocking behavior and status monitoring.

class TableauRefreshWorkbookOperator(BaseOperator):
    def __init__(
        self,
        *,
        workbook_name: str,
        site_id: Optional[str] = None,
        blocking: bool = True,
        tableau_conn_id: str = 'tableau_default',
        **kwargs,
    ) -> None:
        """
        Initialize workbook refresh operator.
        
        Parameters:
        - workbook_name: Name of workbook to refresh
        - site_id: Target site ID (optional)
        - blocking: Wait for completion if True
        - tableau_conn_id: Connection ID for Tableau Server
        - **kwargs: Additional BaseOperator parameters
        """

    def execute(self, context: dict) -> str:
        """
        Execute workbook refresh operation.
        
        Finds workbook by name, initiates refresh, and optionally waits
        for completion using TableauJobStatusSensor.
        
        Parameters:
        - context: Airflow task execution context
        
        Returns:
        Job ID of the refresh operation
        """

Usage in DAG:

from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator

# Non-blocking refresh (returns job ID immediately)
refresh_task = TableauRefreshWorkbookOperator(
    task_id='refresh_sales_dashboard',
    workbook_name='Sales Dashboard',
    site_id='production',
    blocking=False,
    tableau_conn_id='tableau_prod'
)

# Blocking refresh (waits for completion)
refresh_and_wait = TableauRefreshWorkbookOperator(
    task_id='refresh_and_wait',
    workbook_name='Monthly Report',
    blocking=True  # Will use sensor internally
)

Job Status Monitoring

Sensor for monitoring Tableau job execution with automatic status polling and error handling.

class TableauJobStatusSensor(BaseSensorOperator):
    template_fields = ('job_id',)
    
    def __init__(
        self,
        *,
        job_id: str,
        site_id: Optional[str] = None,
        tableau_conn_id: str = 'tableau_default',  
        **kwargs,
    ) -> None:
        """
        Initialize job status sensor.
        
        Parameters:
        - job_id: Tableau job ID to monitor  
        - site_id: Target site ID (optional)
        - tableau_conn_id: Connection ID for Tableau Server
        - **kwargs: Additional BaseSensorOperator parameters
        """

    def poke(self, context: dict) -> bool:
        """
        Check job status and return completion state.
        
        Raises TableauJobFailedException for failed/cancelled jobs.
        
        Parameters:
        - context: Airflow task execution context
        
        Returns:
        True if job completed successfully, False if still running
        
        Raises:
        TableauJobFailedException: If job failed or was cancelled
        """

Usage Examples:

from airflow.providers.salesforce.sensors.tableau_job_status import TableauJobStatusSensor

# Monitor specific job ID
monitor_job = TableauJobStatusSensor(
    task_id='wait_for_refresh',
    job_id='{{ ti.xcom_pull("refresh_task") }}',  # Templated job ID
    site_id='production',
    poke_interval=30,  # Check every 30 seconds
    timeout=1800       # Timeout after 30 minutes
)

# Monitor job from previous task
refresh_task >> monitor_job

Job Status Types

Enumeration defining possible Tableau job completion states.

class TableauJobFinishCode(Enum):
    """
    Tableau job status enumeration.
    
    Values correspond to Tableau Server API finish codes.
    """
    PENDING = -1    # Job is queued or running
    SUCCESS = 0     # Job completed successfully  
    ERROR = 1       # Job failed with error
    CANCELED = 2    # Job was cancelled

Exception Handling

Custom exception for Tableau job failures with detailed error context.

class TableauJobFailedException(AirflowException):
    """
    Exception raised when Tableau job fails or is cancelled.
    
    Thrown by TableauJobStatusSensor when job finish code
    indicates ERROR or CANCELED state.
    """

Error Handling Example:

from airflow.providers.salesforce.sensors.tableau_job_status import (
    TableauJobStatusSensor, 
    TableauJobFailedException
)

try:
    sensor = TableauJobStatusSensor(
        task_id='monitor_refresh',
        job_id='job-123'
    )
    sensor.execute({})
except TableauJobFailedException as e:
    print(f"Tableau job failed: {e}")
    # Handle failure (send alert, retry, etc.)

Authentication Methods

Username/Password Authentication

Traditional username and password authentication for Tableau Server:

# Connection extras: {}
hook = TableauHook(
    site_id='my_site',
    tableau_conn_id='tableau_userpass'
)

Personal Access Token Authentication

More secure token-based authentication:

# Connection extras: {
#   "token_name": "my_token", 
#   "personal_access_token": "actual_token_value"
# }
hook = TableauHook(
    site_id='my_site', 
    tableau_conn_id='tableau_token'
)

Complete Workflow Example

from airflow import DAG
from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
from airflow.providers.salesforce.sensors.tableau_job_status import TableauJobStatusSensor

# Refresh workbook and monitor completion
refresh_workbook = TableauRefreshWorkbookOperator(
    task_id='refresh_workbook',
    workbook_name='Sales Dashboard',
    site_id='production',
    blocking=False,  # Don't block, return job ID
    tableau_conn_id='tableau_prod'
)

# Monitor the refresh job
wait_for_completion = TableauJobStatusSensor(
    task_id='wait_for_completion',
    job_id='{{ ti.xcom_pull("refresh_workbook") }}',
    site_id='production',
    tableau_conn_id='tableau_prod',
    poke_interval=60,
    timeout=3600
)

refresh_workbook >> wait_for_completion

Error Scenarios

Common error conditions and handling:

  • Authentication failures: Invalid credentials or tokens
  • Workbook not found: Specified workbook doesn't exist or no access
  • Job timeouts: Long-running refreshes exceed sensor timeout
  • Server connectivity: Network issues or server downtime
  • Permission errors: Insufficient privileges for operations

Always implement proper error handling and monitoring for production workflows.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-salesforce

docs

index.md

salesforce.md

tableau.md

tile.json