Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Google Ads API integration with OAuth authentication, account management, and reporting capabilities. Supports campaign data extraction and automated reporting workflows for digital advertising management.
Core hook for connecting to Google Ads API with support for OAuth Service Account Flow and Application Default Credentials authentication.
class GoogleAdsHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
api_version: str = "v16",
**kwargs
): ...
def get_service(self, service_name: str, version: Optional[str] = None): ...
def search(
self,
client_ids: List[str],
query: str,
page_size: int = 1000,
**kwargs
): ...
def search_stream(
self,
client_ids: List[str],
query: str,
**kwargs
): ...
def list_accessible_customers(self): ...
def get_customer(self, client_id: str): ...Operations for listing and managing Google Ads accounts.
class GoogleAdsListAccountsOperator(BaseOperator):
"""
Lists all Google Ads accounts accessible by the authenticated user.
Args:
gcp_conn_id (str): The connection ID for Google Cloud Platform
google_ads_conn_id (str): The connection ID for Google Ads API
api_version (str): Google Ads API version to use
Returns:
List of accessible customer accounts with metadata
"""
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
api_version: str = "v16",
**kwargs
): ...Transfer Google Ads data to Google Cloud Storage for further processing and analysis.
class GoogleAdsToGcsOperator(BaseOperator):
"""
Exports Google Ads data to Google Cloud Storage using GAQL queries.
Args:
client_ids (List[str]): Google Ads customer IDs to query
query (str): Google Ads Query Language (GAQL) query
obj (str): GCS object path for output
bucket (str): GCS bucket name
gcp_conn_id (str): The connection ID for Google Cloud Platform
google_ads_conn_id (str): The connection ID for Google Ads API
api_version (str): Google Ads API version to use
gzip (bool): Whether to gzip the output file
Returns:
GCS object path of exported data
"""
def __init__(
self,
client_ids: List[str],
query: str,
obj: str,
bucket: str,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
api_version: str = "v16",
gzip: bool = False,
**kwargs
): ...Google Ads integration requires OAuth authentication setup:
# Connection configuration for Google Ads API
connection_config = {
"conn_id": "google_ads_default",
"conn_type": "google_ads",
"login": "customer_id", # Your Google Ads customer ID
"password": "", # Leave empty
"extra": {
"developer_token": "your_developer_token",
"client_id": "your_oauth_client_id",
"client_secret": "your_oauth_client_secret",
"refresh_token": "your_refresh_token",
"login_customer_id": "manager_account_id" # Optional: for manager accounts
}
}from airflow import DAG
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator
from datetime import datetime
dag = DAG(
'google_ads_accounts',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
list_accounts = GoogleAdsListAccountsOperator(
task_id='list_accounts',
google_ads_conn_id='google_ads_default',
dag=dag
)from airflow import DAG
from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
from datetime import datetime
dag = DAG(
'google_ads_export',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
export_campaigns = GoogleAdsToGcsOperator(
task_id='export_campaigns',
client_ids=['1234567890'],
query='''
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
metrics.clicks,
metrics.cost_micros
FROM campaign
WHERE segments.date DURING LAST_7_DAYS
''',
bucket='my-ads-data-bucket',
obj='campaigns/{{ ds }}/campaign_data.jsonl',
google_ads_conn_id='google_ads_default',
gcp_conn_id='google_cloud_default',
dag=dag
)from airflow import DAG
from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime
dag = DAG(
'google_ads_to_bigquery',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Export Google Ads data to GCS
export_ads_data = GoogleAdsToGcsOperator(
task_id='export_ads_data',
client_ids=['1234567890', '0987654321'],
query='''
SELECT
customer.id,
campaign.id,
campaign.name,
ad_group.id,
ad_group.name,
segments.date,
metrics.impressions,
metrics.clicks,
metrics.conversions,
metrics.cost_micros
FROM keyword_view
WHERE segments.date = '{{ ds }}'
''',
bucket='ads-data-lake',
obj='raw/ads_performance/{{ ds }}/performance.jsonl',
google_ads_conn_id='google_ads_default',
dag=dag
)
# Load data into BigQuery
load_to_bigquery = GCSToBigQueryOperator(
task_id='load_to_bigquery',
bucket='ads-data-lake',
source_objects=['raw/ads_performance/{{ ds }}/performance.jsonl'],
destination_project_dataset_table='analytics.ads_performance.daily_performance',
schema_fields=[
{'name': 'customer_id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'campaign_id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'campaign_name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'ad_group_id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'ad_group_name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'date', 'type': 'DATE', 'mode': 'REQUIRED'},
{'name': 'impressions', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'clicks', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'conversions', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'cost_micros', 'type': 'INTEGER', 'mode': 'NULLABLE'},
],
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
export_ads_data >> load_to_bigqueryfrom typing import List, Optional, Dict, Any, Union
from airflow.models import BaseOperator
# Google Ads specific types
CustomerInfo = Dict[str, Any]
CampaignInfo = Dict[str, Any]
AdGroupInfo = Dict[str, Any]
GoogleAdsQuery = str
ClientId = str
GoogleAdsApiVersion = str
# Authentication types
DeveloperToken = str
RefreshToken = str
OAuthCredentials = Dict[str, str]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-google