Apache Airflow provider package enabling Salesforce CRM and Tableau Server integration for data workflows
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Tableau Server integration providing workbook refresh operations, job status monitoring, and resource management capabilities. Built on the Tableau Server Client library for robust API communication.
Establishes authenticated connections to Tableau Server with support for multiple authentication methods and site isolation.
class TableauHook(BaseHook):
def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default') -> None:
"""
Initialize Tableau hook with site and connection configuration.
Parameters:
- site_id: Target site ID (uses default site if None)
- tableau_conn_id: Airflow connection ID with Tableau credentials
"""
def __enter__(self): ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
def get_conn(self) -> Auth.contextmgr:
"""
Create authenticated Tableau Server connection.
Supports username/password and personal access token authentication.
Returns context manager that automatically signs out on exit.
Returns:
Authenticated Tableau Server context manager
"""
def _auth_via_password(self) -> Auth.contextmgr:
"""
Authenticate using username and password credentials.
Internal method for password-based authentication flow.
Returns:
Authenticated Tableau Server context manager
"""
def _auth_via_token(self) -> Auth.contextmgr:
"""
Authenticate using personal access token.
Internal method for token-based authentication flow.
Returns:
Authenticated Tableau Server context manager
"""Connection Configuration Options:
Username/Password Authentication:
http{"site_id": "site_name"} (optional)Personal Access Token Authentication:
http{"token_name": "token_name", "personal_access_token": "token_value", "site_id": "site_name"}The hook implements Python context manager protocol for automatic resource cleanup:
def __enter__(self): ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...Usage Example:
with TableauHook(site_id='production', tableau_conn_id='tableau_conn') as hook:
# Perform operations - connection automatically closed on exit
workbooks = list(hook.get_all('workbooks'))
print(f"Found {len(workbooks)} workbooks")Paginated access to Tableau Server resources with automatic handling of large result sets.
def get_all(self, resource_name: str) -> Pager:
"""
Get all items for a Tableau Server resource with pagination.
Parameters:
- resource_name: Resource type ('workbooks', 'jobs', 'users', etc.)
Returns:
Pager object for iterating through all items
"""Usage Example:
hook = TableauHook('production', 'tableau_conn')
# Get all workbooks with pagination
for workbook in hook.get_all('workbooks'):
print(f"Workbook: {workbook.name} (ID: {workbook.id})")
# Get all jobs
for job in hook.get_all('jobs'):
print(f"Job: {job.id}, Status: {job.finish_code}")High-level operator for refreshing Tableau workbooks with optional blocking behavior and status monitoring.
class TableauRefreshWorkbookOperator(BaseOperator):
def __init__(
self,
*,
workbook_name: str,
site_id: Optional[str] = None,
blocking: bool = True,
tableau_conn_id: str = 'tableau_default',
**kwargs,
) -> None:
"""
Initialize workbook refresh operator.
Parameters:
- workbook_name: Name of workbook to refresh
- site_id: Target site ID (optional)
- blocking: Wait for completion if True
- tableau_conn_id: Connection ID for Tableau Server
- **kwargs: Additional BaseOperator parameters
"""
def execute(self, context: dict) -> str:
"""
Execute workbook refresh operation.
Finds workbook by name, initiates refresh, and optionally waits
for completion using TableauJobStatusSensor.
Parameters:
- context: Airflow task execution context
Returns:
Job ID of the refresh operation
"""Usage in DAG:
from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
# Non-blocking refresh (returns job ID immediately)
refresh_task = TableauRefreshWorkbookOperator(
task_id='refresh_sales_dashboard',
workbook_name='Sales Dashboard',
site_id='production',
blocking=False,
tableau_conn_id='tableau_prod'
)
# Blocking refresh (waits for completion)
refresh_and_wait = TableauRefreshWorkbookOperator(
task_id='refresh_and_wait',
workbook_name='Monthly Report',
blocking=True # Will use sensor internally
)Sensor for monitoring Tableau job execution with automatic status polling and error handling.
class TableauJobStatusSensor(BaseSensorOperator):
template_fields = ('job_id',)
def __init__(
self,
*,
job_id: str,
site_id: Optional[str] = None,
tableau_conn_id: str = 'tableau_default',
**kwargs,
) -> None:
"""
Initialize job status sensor.
Parameters:
- job_id: Tableau job ID to monitor
- site_id: Target site ID (optional)
- tableau_conn_id: Connection ID for Tableau Server
- **kwargs: Additional BaseSensorOperator parameters
"""
def poke(self, context: dict) -> bool:
"""
Check job status and return completion state.
Raises TableauJobFailedException for failed/cancelled jobs.
Parameters:
- context: Airflow task execution context
Returns:
True if job completed successfully, False if still running
Raises:
TableauJobFailedException: If job failed or was cancelled
"""Usage Examples:
from airflow.providers.salesforce.sensors.tableau_job_status import TableauJobStatusSensor
# Monitor specific job ID
monitor_job = TableauJobStatusSensor(
task_id='wait_for_refresh',
job_id='{{ ti.xcom_pull("refresh_task") }}', # Templated job ID
site_id='production',
poke_interval=30, # Check every 30 seconds
timeout=1800 # Timeout after 30 minutes
)
# Monitor job from previous task
refresh_task >> monitor_jobEnumeration defining possible Tableau job completion states.
class TableauJobFinishCode(Enum):
"""
Tableau job status enumeration.
Values correspond to Tableau Server API finish codes.
"""
PENDING = -1 # Job is queued or running
SUCCESS = 0 # Job completed successfully
ERROR = 1 # Job failed with error
CANCELED = 2 # Job was cancelledCustom exception for Tableau job failures with detailed error context.
class TableauJobFailedException(AirflowException):
"""
Exception raised when Tableau job fails or is cancelled.
Thrown by TableauJobStatusSensor when job finish code
indicates ERROR or CANCELED state.
"""Error Handling Example:
from airflow.providers.salesforce.sensors.tableau_job_status import (
TableauJobStatusSensor,
TableauJobFailedException
)
try:
sensor = TableauJobStatusSensor(
task_id='monitor_refresh',
job_id='job-123'
)
sensor.execute({})
except TableauJobFailedException as e:
print(f"Tableau job failed: {e}")
# Handle failure (send alert, retry, etc.)Traditional username and password authentication for Tableau Server:
# Connection extras: {}
hook = TableauHook(
site_id='my_site',
tableau_conn_id='tableau_userpass'
)More secure token-based authentication:
# Connection extras: {
# "token_name": "my_token",
# "personal_access_token": "actual_token_value"
# }
hook = TableauHook(
site_id='my_site',
tableau_conn_id='tableau_token'
)from airflow import DAG
from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
from airflow.providers.salesforce.sensors.tableau_job_status import TableauJobStatusSensor
# Refresh workbook and monitor completion
refresh_workbook = TableauRefreshWorkbookOperator(
task_id='refresh_workbook',
workbook_name='Sales Dashboard',
site_id='production',
blocking=False, # Don't block, return job ID
tableau_conn_id='tableau_prod'
)
# Monitor the refresh job
wait_for_completion = TableauJobStatusSensor(
task_id='wait_for_completion',
job_id='{{ ti.xcom_pull("refresh_workbook") }}',
site_id='production',
tableau_conn_id='tableau_prod',
poke_interval=60,
timeout=3600
)
refresh_workbook >> wait_for_completionCommon error conditions and handling:
Always implement proper error handling and monitoring for production workflows.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-salesforce