Facebook Ads provider for Apache Airflow that enables integration with Facebook Marketing API
npx @tessl/cli install tessl/pypi-apache-airflow-providers-facebook@1.0.0A provider package that enables Apache Airflow to integrate with Facebook Ads API for advertising data collection and reporting. The package provides a FacebookAdsReportingHook that wraps the Facebook Business SDK to authenticate with Facebook Ads API, execute asynchronous report generation jobs, and retrieve advertising insights and metrics.
pip install apache-airflow-providers-facebookfacebook-business>=6.0.2from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook, JobStatusAdditional imports for type annotations:
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.api import FacebookAdsApi
from typing import Any, Dict, Listfrom airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook
# Initialize the hook with connection details
hook = FacebookAdsReportingHook(
facebook_conn_id='facebook_default',
api_version='v6.0'
)
# Define report parameters
params = {
'level': 'ad',
'date_preset': 'yesterday',
'time_range': {
'since': '2021-01-01',
'until': '2021-01-31'
}
}
# Define fields to retrieve
fields = [
'campaign_name',
'campaign_id',
'ad_id',
'clicks',
'impressions',
'spend'
]
# Execute bulk Facebook report
insights = hook.bulk_facebook_report(
params=params,
fields=fields,
sleep_time=5
)
# Process the results
for insight in insights:
print(f"Campaign: {insight['campaign_name']}, Clicks: {insight['clicks']}")The Facebook Ads provider is built around the FacebookAdsReportingHook which integrates with the Facebook Business SDK:
Main hook class for integrating with Facebook Ads API, providing authentication and asynchronous report generation capabilities.
class FacebookAdsReportingHook(BaseHook):
"""
Hook for the Facebook Ads API
"""
conn_name_attr = 'facebook_conn_id'
default_conn_name = 'facebook_default'
conn_type = 'facebook_social'
hook_name = 'Facebook Ads'
def __init__(
self,
facebook_conn_id: str = 'facebook_default',
api_version: str = "v6.0"
) -> None:
"""
Initialize the Facebook Ads Reporting Hook.
Args:
facebook_conn_id (str): Airflow Facebook Ads connection ID (default: 'facebook_default')
api_version (str): The version of Facebook API (default: 'v6.0')
"""
client_required_fields = ["app_id", "app_secret", "access_token", "account_id"]Returns Facebook Ads Client using a service account with authenticated configuration.
def _get_service(self) -> FacebookAdsApi:
"""
Returns Facebook Ads Client using a service account.
Returns:
FacebookAdsApi: Initialized Facebook Ads API client
"""Executes asynchronous Facebook Ads reporting queries and returns insights data.
def bulk_facebook_report(
self,
params: Dict[str, Any],
fields: List[str],
sleep_time: int = 5
) -> List[AdsInsights]:
"""
Pulls data from the Facebook Ads API using asynchronous reporting.
Args:
params (Dict[str, Any]): Parameters that determine the query for Facebook.
See https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
fields (List[str]): List of fields obtained from Facebook AdsInsights.Field class.
See https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
sleep_time (int): Time to sleep when async call is happening (default: 5)
Returns:
List[AdsInsights]: Facebook Ads API response converted to Facebook Ads Row objects
Raises:
AirflowException: If async job fails, is skipped, or API communication errors occur
"""Retrieves and validates Facebook Ads connection configuration from Airflow metadata database.
@cached_property
def facebook_ads_config(self) -> Dict:
"""
Gets Facebook ads connection from meta db and sets facebook_ads_config attribute with returned config file.
Returns:
Dict: Facebook Ads configuration containing required fields (app_id, app_secret, access_token, account_id)
Raises:
AirflowException: If required fields are missing from connection configuration
"""Status values for Facebook asynchronous reporting tasks.
class JobStatus(Enum):
"""Available options for facebook async task status"""
COMPLETED = 'Job Completed'
STARTED = 'Job Started'
RUNNING = 'Job Running'
FAILED = 'Job Failed'
SKIPPED = 'Job Skipped'The Facebook Ads connection must include the following fields in the extra configuration:
FacebookAdsConfig = Dict[str, str]
# Required fields:
# - app_id: Facebook App ID
# - app_secret: Facebook App Secret
# - access_token: Facebook Access Token
# - account_id: Facebook Ad Account IDConfiguration for Facebook Ads reporting queries:
ReportParams = Dict[str, Any]
# Common parameters:
# - level: str - Reporting level ('account', 'campaign', 'adset', 'ad')
# - date_preset: str - Predefined date range ('today', 'yesterday', 'last_7_days', etc.)
# - time_range: Dict[str, str] - Custom date range with 'since' and 'until' keys
# - filtering: List[Dict] - Filters to apply to the data
# - breakdowns: List[str] - Dimensions to break down the data byAvailable fields for Facebook Ads insights reporting:
ReportFields = List[str]
# Common fields include:
# - 'campaign_name', 'campaign_id'
# - 'adset_name', 'adset_id'
# - 'ad_name', 'ad_id'
# - 'clicks', 'impressions', 'spend'
# - 'ctr', 'cpm', 'cpp'
# - 'reach', 'frequency'
# See Facebook Marketing API documentation for complete field listThe hook raises AirflowException in the following scenarios:
app_id, app_secret, access_token, account_id) are missing from the connection configurationTo use this provider, configure an Airflow connection with:
facebook_socialfacebook_default (or custom){
"app_id": "your_facebook_app_id",
"app_secret": "your_facebook_app_secret",
"access_token": "your_facebook_access_token",
"account_id": "your_facebook_account_id"
}The hook supports Facebook Marketing API versions, with v6.0 as the default. You can specify a different version during initialization:
hook = FacebookAdsReportingHook(api_version='v12.0')