Backport provider package for Facebook Ads API integration with Apache Airflow workflows
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-facebook@2020.10.00
# Apache Airflow Facebook Provider (Backport)
1
2
A backport provider package that enables Facebook Ads API integration for Apache Airflow 1.10.x installations. This package provides hooks for connecting to Facebook's Marketing API to extract advertising data, manage campaigns, and integrate Facebook Ads reporting into Airflow workflows with asynchronous job execution and comprehensive error handling.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-backport-providers-facebook
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-backport-providers-facebook`
10
- **Python Support**: 3.6+
11
- **Dependencies**: `apache-airflow~=1.10`, `facebook-business>=6.0.2`
12
13
## Core Imports
14
15
```python
16
from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook, JobStatus
17
```
18
19
## Basic Usage
20
21
```python
22
from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook
23
from airflow.operators.python_operator import PythonOperator
24
from datetime import datetime, timedelta
25
26
def extract_facebook_data(**context):
27
# Initialize the hook with connection ID and API version
28
hook = FacebookAdsReportingHook(
29
facebook_conn_id="facebook_default",
30
api_version="v6.0"
31
)
32
33
# Define report parameters
34
params = {
35
"level": "ad",
36
"date_preset": "yesterday",
37
"time_increment": 1
38
}
39
40
# Define fields to extract
41
fields = [
42
"campaign_name",
43
"campaign_id",
44
"ad_id",
45
"clicks",
46
"impressions",
47
"spend",
48
"cpc",
49
"cpm"
50
]
51
52
# Extract data using bulk reporting
53
data = hook.bulk_facebook_report(
54
params=params,
55
fields=fields,
56
sleep_time=5
57
)
58
59
# Process the returned AdsInsights objects
60
for insight in data:
61
print(f"Campaign: {insight['campaign_name']}, Clicks: {insight['clicks']}")
62
63
return data
64
65
# Use in an Airflow DAG
66
facebook_extract_task = PythonOperator(
67
task_id='extract_facebook_ads_data',
68
python_callable=extract_facebook_data,
69
dag=dag
70
)
71
```
72
73
## Connection Configuration
74
75
The hook requires an Airflow connection with the following configuration:
76
77
- **Connection ID**: `facebook_default` (or custom)
78
- **Connection Type**: Any (e.g., HTTP)
79
- **Extra**: JSON containing required Facebook API credentials:
80
81
```json
82
{
83
"app_id": "your_facebook_app_id",
84
"app_secret": "your_facebook_app_secret",
85
"access_token": "your_facebook_access_token",
86
"account_id": "act_your_facebook_account_id"
87
}
88
```
89
90
## Capabilities
91
92
### Facebook Ads Reporting Hook
93
94
The main integration component for connecting to Facebook's Marketing API and extracting advertising data through asynchronous report generation.
95
96
```python { .api }
97
class FacebookAdsReportingHook(BaseHook):
98
"""
99
Hook for the Facebook Ads API.
100
101
Inherits from BaseHook to integrate with Airflow's connection management system.
102
Validates connection configuration on initialization and provides methods for
103
asynchronous Facebook Ads data extraction.
104
105
Args:
106
facebook_conn_id (str): Airflow Facebook Ads connection ID. Defaults to "facebook_default"
107
api_version (str): The version of Facebook API. Defaults to "v6.0"
108
109
Attributes:
110
client_required_fields (List[str]): Required fields for Facebook API connection
111
["app_id", "app_secret", "access_token", "account_id"]
112
"""
113
114
def __init__(
115
self,
116
facebook_conn_id: str = "facebook_default",
117
api_version: str = "v6.0",
118
) -> None: ...
119
120
def bulk_facebook_report(
121
self,
122
params: Dict[str, Any],
123
fields: List[str],
124
sleep_time: int = 5,
125
) -> List[AdsInsights]:
126
"""
127
Pulls data from the Facebook Ads API using asynchronous reporting.
128
129
Args:
130
params (Dict[str, Any]): Parameters that determine the query for Facebook Ads API.
131
Must follow Facebook Marketing API Insights Parameters format.
132
Common parameters include:
133
- level (str): Report level - "account", "campaign", "adset", "ad"
134
- date_preset (str): Date range preset - "today", "yesterday", "last_7_days",
135
"last_14_days", "last_28_days", "this_month", "last_month", etc.
136
- time_range (Dict): Custom date range with "since" and "until" keys in YYYY-MM-DD format
137
Example: {"since": "2023-01-01", "until": "2023-01-31"}
138
- time_increment (int): Time increment for date breakdown (1=daily, 7=weekly, "monthly")
139
- breakdowns (List[str]): Breakdown dimensions like ["age", "gender", "placement", "device_platform"]
140
- action_breakdowns (List[str]): Action breakdown dimensions like ["action_type", "action_target_id"]
141
fields (List[str]): List of fields to obtain from Facebook Ads Insights API.
142
Must be valid Facebook Ads Insights fields. Common fields include:
143
- Identifiers: "campaign_name", "campaign_id", "adset_name", "adset_id", "ad_name", "ad_id"
144
- Metrics: "impressions", "clicks", "spend", "reach", "frequency"
145
- Calculated: "cpc", "cpm", "ctr", "cpp", "cost_per_unique_click"
146
- Conversions: "conversions", "cost_per_conversion", "conversion_rate"
147
- Video: "video_plays", "video_p25_watched_actions", "video_p50_watched_actions"
148
sleep_time (int): Time to sleep between async job status checks. Defaults to 5 seconds.
149
Increase for large reports to reduce API calls.
150
151
Returns:
152
List[AdsInsights]: Facebook Ads API response as list of AdsInsights objects.
153
Each AdsInsights object is a dictionary-like object containing the requested
154
fields as key-value pairs (e.g., {"campaign_name": "My Campaign", "clicks": "150"}).
155
156
Raises:
157
AirflowException: If any required connection fields (app_id, app_secret, access_token,
158
account_id) are missing from the connection's extra configuration, or if
159
the async report job fails or is skipped by Facebook's API.
160
"""
161
162
@cached_property
163
def facebook_ads_config(self) -> Dict:
164
"""
165
Gets Facebook ads connection configuration from Airflow connections.
166
167
Retrieves connection details from Airflow's meta database using the configured
168
facebook_conn_id and validates that all required fields are present in the
169
connection's extra_dejson configuration.
170
171
Required connection fields:
172
- app_id (str): Facebook App ID
173
- app_secret (str): Facebook App Secret
174
- access_token (str): Facebook Access Token
175
- account_id (str): Facebook Account ID (format: "act_12345")
176
177
Returns:
178
Dict: Configuration dictionary containing app_id, app_secret, access_token, and account_id.
179
180
Raises:
181
AirflowException: If any required fields (app_id, app_secret, access_token, account_id)
182
are missing from the connection's extra configuration.
183
"""
184
```
185
186
### Job Status Enumeration
187
188
Enumeration of available status options for Facebook async task monitoring.
189
190
```python { .api }
191
class JobStatus(Enum):
192
"""Available options for facebook async task status"""
193
194
COMPLETED = 'Job Completed'
195
STARTED = 'Job Started'
196
RUNNING = 'Job Running'
197
FAILED = 'Job Failed'
198
SKIPPED = 'Job Skipped'
199
```
200
201
## Types
202
203
```python { .api }
204
from facebook_business.adobjects.adsinsights import AdsInsights
205
206
class AdsInsights:
207
"""
208
Facebook Ads Insights object from facebook-business SDK.
209
210
Dictionary-like object containing Facebook Ads data with requested fields
211
as key-value pairs. Can be accessed like a dictionary:
212
213
Example:
214
insight['campaign_name'] # Returns campaign name string
215
insight['clicks'] # Returns clicks count as string
216
insight.get('spend', '0') # Returns spend with default fallback
217
"""
218
```
219
220
## Error Handling
221
222
The hook implements comprehensive error handling for Facebook API operations:
223
224
- **Missing Configuration**: Raises `AirflowException` with message "{missing_keys} fields are missing"
225
if any required connection fields are missing (`app_id`, `app_secret`, `access_token`, `account_id`)
226
- **Failed Async Jobs**: Raises `AirflowException` with message "{async_status}. Please retry."
227
if async report jobs fail or are skipped by Facebook's API
228
- **API Errors**: Propagates Facebook Business SDK exceptions for API-level errors such as:
229
- Invalid authentication credentials
230
- Insufficient permissions for requested data
231
- Invalid parameter values or field names
232
- Rate limiting and quota exceeded errors
233
234
## Common Use Cases
235
236
### Daily Campaign Performance Reports
237
238
```python
239
def get_daily_campaign_performance():
240
hook = FacebookAdsReportingHook()
241
242
params = {
243
"level": "campaign",
244
"date_preset": "yesterday"
245
}
246
247
fields = [
248
"campaign_name",
249
"impressions",
250
"clicks",
251
"spend",
252
"cpc",
253
"cpm"
254
]
255
256
return hook.bulk_facebook_report(params, fields)
257
```
258
259
### Custom Date Range with Breakdowns
260
261
```python
262
def get_ad_performance_with_breakdowns():
263
hook = FacebookAdsReportingHook()
264
265
params = {
266
"level": "ad",
267
"time_range": {
268
"since": "2023-01-01",
269
"until": "2023-01-31"
270
},
271
"breakdowns": ["age", "gender"],
272
"time_increment": 1
273
}
274
275
fields = [
276
"ad_name",
277
"impressions",
278
"clicks",
279
"conversions",
280
"spend"
281
]
282
283
return hook.bulk_facebook_report(params, fields, sleep_time=10)
284
```
285
286
## API References
287
288
- **Facebook Marketing APIs**: https://developers.facebook.com/docs/marketing-apis/
289
- **Insights Parameters**: https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
290
- **Available Fields**: https://developers.facebook.com/docs/marketing-api/insights/fields/v6.0