CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

google-ads.mddocs/

Google Ads Integration

Google Ads API integration with OAuth authentication, account management, and reporting capabilities. Supports campaign data extraction and automated reporting workflows for digital advertising management.

Capabilities

Google Ads API Hook

Core hook for connecting to Google Ads API with support for OAuth Service Account Flow and Application Default Credentials authentication.

class GoogleAdsHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        google_ads_conn_id: str = "google_ads_default",
        api_version: str = "v16",
        **kwargs
    ): ...
    
    def get_service(self, service_name: str, version: Optional[str] = None): ...
    def search(
        self,
        client_ids: List[str],
        query: str,
        page_size: int = 1000,
        **kwargs
    ): ...
    def search_stream(
        self,
        client_ids: List[str],
        query: str,
        **kwargs
    ): ...
    def list_accessible_customers(self): ...
    def get_customer(self, client_id: str): ...

Account Management

Operations for listing and managing Google Ads accounts.

class GoogleAdsListAccountsOperator(BaseOperator):
    """
    Lists all Google Ads accounts accessible by the authenticated user.
    
    Args:
        gcp_conn_id (str): The connection ID for Google Cloud Platform
        google_ads_conn_id (str): The connection ID for Google Ads API
        api_version (str): Google Ads API version to use
        
    Returns:
        List of accessible customer accounts with metadata
    """
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        google_ads_conn_id: str = "google_ads_default",
        api_version: str = "v16",
        **kwargs
    ): ...

Data Export

Transfer Google Ads data to Google Cloud Storage for further processing and analysis.

class GoogleAdsToGcsOperator(BaseOperator):
    """
    Exports Google Ads data to Google Cloud Storage using GAQL queries.
    
    Args:
        client_ids (List[str]): Google Ads customer IDs to query
        query (str): Google Ads Query Language (GAQL) query
        obj (str): GCS object path for output
        bucket (str): GCS bucket name
        gcp_conn_id (str): The connection ID for Google Cloud Platform
        google_ads_conn_id (str): The connection ID for Google Ads API
        api_version (str): Google Ads API version to use
        gzip (bool): Whether to gzip the output file
        
    Returns:
        GCS object path of exported data
    """
    def __init__(
        self,
        client_ids: List[str],
        query: str,
        obj: str,
        bucket: str,
        gcp_conn_id: str = "google_cloud_default",
        google_ads_conn_id: str = "google_ads_default",
        api_version: str = "v16",
        gzip: bool = False,
        **kwargs
    ): ...

Authentication Setup

Google Ads integration requires OAuth authentication setup:

# Connection configuration for Google Ads API
connection_config = {
    "conn_id": "google_ads_default",
    "conn_type": "google_ads",
    "login": "customer_id",  # Your Google Ads customer ID
    "password": "",  # Leave empty
    "extra": {
        "developer_token": "your_developer_token",
        "client_id": "your_oauth_client_id",
        "client_secret": "your_oauth_client_secret", 
        "refresh_token": "your_refresh_token",
        "login_customer_id": "manager_account_id"  # Optional: for manager accounts
    }
}

Usage Examples

Basic Account Listing

from airflow import DAG
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator
from datetime import datetime

dag = DAG(
    'google_ads_accounts',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

list_accounts = GoogleAdsListAccountsOperator(
    task_id='list_accounts',
    google_ads_conn_id='google_ads_default',
    dag=dag
)

Campaign Data Export

from airflow import DAG
from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
from datetime import datetime

dag = DAG(
    'google_ads_export',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

export_campaigns = GoogleAdsToGcsOperator(
    task_id='export_campaigns',
    client_ids=['1234567890'],
    query='''
        SELECT 
            campaign.id,
            campaign.name,
            campaign.status,
            metrics.impressions,
            metrics.clicks,
            metrics.cost_micros
        FROM campaign 
        WHERE segments.date DURING LAST_7_DAYS
    ''',
    bucket='my-ads-data-bucket',
    obj='campaigns/{{ ds }}/campaign_data.jsonl',
    google_ads_conn_id='google_ads_default',
    gcp_conn_id='google_cloud_default',
    dag=dag
)

Advanced Reporting Pipeline

from airflow import DAG
from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime

dag = DAG(
    'google_ads_to_bigquery',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Export Google Ads data to GCS
export_ads_data = GoogleAdsToGcsOperator(
    task_id='export_ads_data',
    client_ids=['1234567890', '0987654321'],
    query='''
        SELECT 
            customer.id,
            campaign.id,
            campaign.name,
            ad_group.id,
            ad_group.name,
            segments.date,
            metrics.impressions,
            metrics.clicks,
            metrics.conversions,
            metrics.cost_micros
        FROM keyword_view 
        WHERE segments.date = '{{ ds }}'
    ''',
    bucket='ads-data-lake',
    obj='raw/ads_performance/{{ ds }}/performance.jsonl',
    google_ads_conn_id='google_ads_default',
    dag=dag
)

# Load data into BigQuery
load_to_bigquery = GCSToBigQueryOperator(
    task_id='load_to_bigquery',
    bucket='ads-data-lake',
    source_objects=['raw/ads_performance/{{ ds }}/performance.jsonl'],
    destination_project_dataset_table='analytics.ads_performance.daily_performance',
    schema_fields=[
        {'name': 'customer_id', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'campaign_id', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'campaign_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'ad_group_id', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'ad_group_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'date', 'type': 'DATE', 'mode': 'REQUIRED'},
        {'name': 'impressions', 'type': 'INTEGER', 'mode': 'NULLABLE'},
        {'name': 'clicks', 'type': 'INTEGER', 'mode': 'NULLABLE'},
        {'name': 'conversions', 'type': 'FLOAT', 'mode': 'NULLABLE'},
        {'name': 'cost_micros', 'type': 'INTEGER', 'mode': 'NULLABLE'},
    ],
    source_format='NEWLINE_DELIMITED_JSON',
    write_disposition='WRITE_TRUNCATE',
    dag=dag
)

export_ads_data >> load_to_bigquery

Types

from typing import List, Optional, Dict, Any, Union
from airflow.models import BaseOperator

# Google Ads specific types
CustomerInfo = Dict[str, Any]
CampaignInfo = Dict[str, Any]
AdGroupInfo = Dict[str, Any]
GoogleAdsQuery = str
ClientId = str
GoogleAdsApiVersion = str

# Authentication types
DeveloperToken = str
RefreshToken = str
OAuthCredentials = Dict[str, str]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json