Apache Airflow provider package enabling Salesforce CRM and Tableau Server integration for data workflows
npx @tessl/cli install tessl/pypi-apache-airflow-providers-salesforce@2.0.00
# Apache Airflow Providers Salesforce
1
2
Apache Airflow provider package that enables integration between Airflow and Salesforce CRM/Tableau Server services. This package provides hooks for connecting to Salesforce, operators for executing Tableau workbook refreshes, and sensors for monitoring job status. It's part of the official Apache Airflow ecosystem for modern data workflows.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-salesforce
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-salesforce`
9
- **Requirements**: apache-airflow>=2.0.0, simple-salesforce>=1.0.0, tableauserverclient>=0.12, pandas>=0.17.1
10
11
## Core Imports
12
13
```python
14
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
15
from airflow.providers.salesforce.hooks.tableau import TableauHook, TableauJobFinishCode
16
from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
17
from airflow.providers.salesforce.sensors.tableau_job_status import TableauJobStatusSensor, TableauJobFailedException
18
```
19
20
## Basic Usage
21
22
### Salesforce Data Extraction
23
24
```python
25
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
26
27
# Create hook with connection
28
hook = SalesforceHook(conn_id='salesforce_default')
29
30
# Query Salesforce data
31
query_results = hook.make_query("SELECT Id, Name FROM Account LIMIT 10")
32
33
# Export to file
34
hook.write_object_to_file(
35
query_results['records'],
36
'accounts.csv',
37
fmt='csv'
38
)
39
```
40
41
### Tableau Workbook Refresh
42
43
```python
44
from airflow.providers.salesforce.operators.tableau_refresh_workbook import TableauRefreshWorkbookOperator
45
46
# Define refresh operator
47
refresh_task = TableauRefreshWorkbookOperator(
48
task_id='refresh_workbook',
49
workbook_name='Sales Dashboard',
50
tableau_conn_id='tableau_default',
51
blocking=True # Wait for completion
52
)
53
```
54
55
## Architecture
56
57
The package follows Airflow's provider pattern with three main component types:
58
59
- **Hooks**: Low-level connection interfaces to external systems (Salesforce, Tableau)
60
- **Operators**: Higher-level task components that perform specific operations
61
- **Sensors**: Components that monitor external system states and conditions
62
63
All components integrate with Airflow's connection management system and support templating, retry logic, and other standard Airflow features.
64
65
## Capabilities
66
67
### Salesforce Integration
68
69
Connect to Salesforce CRM, execute SOQL queries, retrieve object metadata, and export data to various formats. Supports security token authentication and sandbox environments.
70
71
```python { .api }
72
class SalesforceHook(BaseHook):
73
def __init__(self, conn_id: str) -> None: ...
74
def get_conn(self) -> api.Salesforce: ...
75
def make_query(self, query: str, include_deleted: bool = False, query_params: Optional[dict] = None) -> dict: ...
76
def describe_object(self, obj: str) -> dict: ...
77
def get_available_fields(self, obj: str) -> List[str]: ...
78
def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict: ...
79
def write_object_to_file(self, query_results: List[dict], filename: str, fmt: str = "csv", coerce_to_timestamp: bool = False, record_time_added: bool = False) -> pd.DataFrame: ...
80
def object_to_df(self, query_results: List[dict], coerce_to_timestamp: bool = False, record_time_added: bool = False) -> pd.DataFrame: ...
81
```
82
83
[Salesforce Integration](./salesforce.md)
84
85
### Tableau Server Operations
86
87
Connect to Tableau Server, refresh workbooks, and monitor job execution. Supports both username/password and personal access token authentication methods.
88
89
```python { .api }
90
class TableauHook(BaseHook):
91
def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default') -> None: ...
92
def __enter__(self): ...
93
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
94
def get_conn(self) -> Auth.contextmgr: ...
95
def get_all(self, resource_name: str) -> Pager: ...
96
97
class TableauRefreshWorkbookOperator(BaseOperator):
98
def __init__(self, *, workbook_name: str, site_id: Optional[str] = None, blocking: bool = True, tableau_conn_id: str = 'tableau_default', **kwargs) -> None: ...
99
def execute(self, context: dict) -> str: ...
100
101
class TableauJobStatusSensor(BaseSensorOperator):
102
def __init__(self, *, job_id: str, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default', **kwargs) -> None: ...
103
def poke(self, context: dict) -> bool: ...
104
```
105
106
[Tableau Operations](./tableau.md)
107
108
## Types
109
110
```python { .api }
111
from enum import Enum
112
from typing import Optional, List, Iterable, Any
113
114
class TableauJobFinishCode(Enum):
115
"""Job status enumeration for Tableau operations."""
116
PENDING = -1
117
SUCCESS = 0
118
ERROR = 1
119
CANCELED = 2
120
121
class TableauJobFailedException(AirflowException):
122
"""Exception raised when Tableau job fails."""
123
pass
124
```