Backport provider package for Facebook Ads API integration with Apache Airflow workflows
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-facebook@2020.10.0A backport provider package that enables Facebook Ads API integration for Apache Airflow 1.10.x installations. This package provides hooks for connecting to Facebook's Marketing API to extract advertising data, manage campaigns, and integrate Facebook Ads reporting into Airflow workflows with asynchronous job execution and comprehensive error handling.
pip install apache-airflow-backport-providers-facebookapache-airflow~=1.10, facebook-business>=6.0.2from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook, JobStatusfrom airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def extract_facebook_data(**context):
# Initialize the hook with connection ID and API version
hook = FacebookAdsReportingHook(
facebook_conn_id="facebook_default",
api_version="v6.0"
)
# Define report parameters
params = {
"level": "ad",
"date_preset": "yesterday",
"time_increment": 1
}
# Define fields to extract
fields = [
"campaign_name",
"campaign_id",
"ad_id",
"clicks",
"impressions",
"spend",
"cpc",
"cpm"
]
# Extract data using bulk reporting
data = hook.bulk_facebook_report(
params=params,
fields=fields,
sleep_time=5
)
# Process the returned AdsInsights objects
for insight in data:
print(f"Campaign: {insight['campaign_name']}, Clicks: {insight['clicks']}")
return data
# Use in an Airflow DAG
facebook_extract_task = PythonOperator(
task_id='extract_facebook_ads_data',
python_callable=extract_facebook_data,
dag=dag
)The hook requires an Airflow connection with the following configuration:
facebook_default (or custom){
"app_id": "your_facebook_app_id",
"app_secret": "your_facebook_app_secret",
"access_token": "your_facebook_access_token",
"account_id": "act_your_facebook_account_id"
}The main integration component for connecting to Facebook's Marketing API and extracting advertising data through asynchronous report generation.
class FacebookAdsReportingHook(BaseHook):
"""
Hook for the Facebook Ads API.
Inherits from BaseHook to integrate with Airflow's connection management system.
Validates connection configuration on initialization and provides methods for
asynchronous Facebook Ads data extraction.
Args:
facebook_conn_id (str): Airflow Facebook Ads connection ID. Defaults to "facebook_default"
api_version (str): The version of Facebook API. Defaults to "v6.0"
Attributes:
client_required_fields (List[str]): Required fields for Facebook API connection
["app_id", "app_secret", "access_token", "account_id"]
"""
def __init__(
self,
facebook_conn_id: str = "facebook_default",
api_version: str = "v6.0",
) -> None: ...
def bulk_facebook_report(
self,
params: Dict[str, Any],
fields: List[str],
sleep_time: int = 5,
) -> List[AdsInsights]:
"""
Pulls data from the Facebook Ads API using asynchronous reporting.
Args:
params (Dict[str, Any]): Parameters that determine the query for Facebook Ads API.
Must follow Facebook Marketing API Insights Parameters format.
Common parameters include:
- level (str): Report level - "account", "campaign", "adset", "ad"
- date_preset (str): Date range preset - "today", "yesterday", "last_7_days",
"last_14_days", "last_28_days", "this_month", "last_month", etc.
- time_range (Dict): Custom date range with "since" and "until" keys in YYYY-MM-DD format
Example: {"since": "2023-01-01", "until": "2023-01-31"}
- time_increment (int): Time increment for date breakdown (1=daily, 7=weekly, "monthly")
- breakdowns (List[str]): Breakdown dimensions like ["age", "gender", "placement", "device_platform"]
- action_breakdowns (List[str]): Action breakdown dimensions like ["action_type", "action_target_id"]
fields (List[str]): List of fields to obtain from Facebook Ads Insights API.
Must be valid Facebook Ads Insights fields. Common fields include:
- Identifiers: "campaign_name", "campaign_id", "adset_name", "adset_id", "ad_name", "ad_id"
- Metrics: "impressions", "clicks", "spend", "reach", "frequency"
- Calculated: "cpc", "cpm", "ctr", "cpp", "cost_per_unique_click"
- Conversions: "conversions", "cost_per_conversion", "conversion_rate"
- Video: "video_plays", "video_p25_watched_actions", "video_p50_watched_actions"
sleep_time (int): Time to sleep between async job status checks. Defaults to 5 seconds.
Increase for large reports to reduce API calls.
Returns:
List[AdsInsights]: Facebook Ads API response as list of AdsInsights objects.
Each AdsInsights object is a dictionary-like object containing the requested
fields as key-value pairs (e.g., {"campaign_name": "My Campaign", "clicks": "150"}).
Raises:
AirflowException: If any required connection fields (app_id, app_secret, access_token,
account_id) are missing from the connection's extra configuration, or if
the async report job fails or is skipped by Facebook's API.
"""
@cached_property
def facebook_ads_config(self) -> Dict:
"""
Gets Facebook ads connection configuration from Airflow connections.
Retrieves connection details from Airflow's meta database using the configured
facebook_conn_id and validates that all required fields are present in the
connection's extra_dejson configuration.
Required connection fields:
- app_id (str): Facebook App ID
- app_secret (str): Facebook App Secret
- access_token (str): Facebook Access Token
- account_id (str): Facebook Account ID (format: "act_12345")
Returns:
Dict: Configuration dictionary containing app_id, app_secret, access_token, and account_id.
Raises:
AirflowException: If any required fields (app_id, app_secret, access_token, account_id)
are missing from the connection's extra configuration.
"""Enumeration of available status options for Facebook async task monitoring.
class JobStatus(Enum):
"""Available options for facebook async task status"""
COMPLETED = 'Job Completed'
STARTED = 'Job Started'
RUNNING = 'Job Running'
FAILED = 'Job Failed'
SKIPPED = 'Job Skipped'from facebook_business.adobjects.adsinsights import AdsInsights
class AdsInsights:
"""
Facebook Ads Insights object from facebook-business SDK.
Dictionary-like object containing Facebook Ads data with requested fields
as key-value pairs. Can be accessed like a dictionary:
Example:
insight['campaign_name'] # Returns campaign name string
insight['clicks'] # Returns clicks count as string
insight.get('spend', '0') # Returns spend with default fallback
"""The hook implements comprehensive error handling for Facebook API operations:
AirflowException with message "{missing_keys} fields are missing"
if any required connection fields are missing (app_id, app_secret, access_token, account_id)AirflowException with message "{async_status}. Please retry."
if async report jobs fail or are skipped by Facebook's APIdef get_daily_campaign_performance():
hook = FacebookAdsReportingHook()
params = {
"level": "campaign",
"date_preset": "yesterday"
}
fields = [
"campaign_name",
"impressions",
"clicks",
"spend",
"cpc",
"cpm"
]
return hook.bulk_facebook_report(params, fields)def get_ad_performance_with_breakdowns():
hook = FacebookAdsReportingHook()
params = {
"level": "ad",
"time_range": {
"since": "2023-01-01",
"until": "2023-01-31"
},
"breakdowns": ["age", "gender"],
"time_increment": 1
}
fields = [
"ad_name",
"impressions",
"clicks",
"conversions",
"spend"
]
return hook.bulk_facebook_report(params, fields, sleep_time=10)