CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

marketing-platform.mddocs/

Google Marketing Platform

Integration with Google Marketing Platform services including Google Analytics Admin, Campaign Manager, Display & Video 360, and Search Ads. Provides comprehensive digital marketing automation and reporting capabilities for advertising campaign management and performance analysis.

Capabilities

Google Analytics Admin

Google Analytics Admin API integration for managing Analytics properties, data streams, and administrative settings.

class GoogleAnalyticsAdminHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v1beta",
        **kwargs
    ): ...
    
    def list_accounts(
        self,
        page_size: Optional[int] = None,
        page_token: Optional[str] = None,
        show_deleted: bool = False
    ): ...
    def create_property(
        self,
        analytics_property: Dict[str, Any],
        **kwargs
    ): ...
    def delete_property(
        self,
        property_name: str,
        **kwargs
    ): ...
    def create_data_stream(
        self,
        parent: str,
        data_stream: Dict[str, Any],
        **kwargs
    ): ...

class GoogleAnalyticsAdminListAccountsOperator(BaseOperator):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v1beta",
        **kwargs
    ): ...

class GoogleAnalyticsAdminCreatePropertyOperator(BaseOperator):
    def __init__(
        self,
        analytics_property: Dict[str, Any],
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v1beta",
        **kwargs
    ): ...

class GoogleAnalyticsAdminDeletePropertyOperator(BaseOperator):
    def __init__(
        self,
        property_id: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v1beta",
        **kwargs
    ): ...

class GoogleAnalyticsAdminCreateDataStreamOperator(BaseOperator):
    def __init__(
        self,
        property_id: str,
        data_stream: Dict[str, Any],
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v1beta",
        **kwargs
    ): ...

Campaign Manager

Google Campaign Manager API integration for managing advertising campaigns, reports, and conversions.

class GoogleCampaignManagerHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...
    
    def insert_report(
        self,
        profile_id: str,
        report: Dict[str, Any],
        **kwargs
    ): ...
    def run_report(
        self,
        profile_id: str,
        report_id: str,
        synchronous: bool = True,
        **kwargs
    ): ...
    def get_report(
        self,
        profile_id: str,
        report_id: str,
        **kwargs
    ): ...
    def get_report_file(
        self,
        profile_id: str,
        report_id: str,
        file_id: str,
        **kwargs
    ): ...
    def batch_insert_conversions(
        self,
        profile_id: str,
        conversions: List[Dict[str, Any]],
        encryption_entity_type: str,
        encryption_entity_id: str,
        encryption_source: str,
        **kwargs
    ): ...

class GoogleCampaignManagerInsertReportOperator(BaseOperator):
    def __init__(
        self,
        profile_id: str,
        report: Dict[str, Any],
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...

class GoogleCampaignManagerRunReportOperator(BaseOperator):
    def __init__(
        self,
        profile_id: str,
        report_id: str,
        synchronous: bool = True,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...

class GoogleCampaignManagerDownloadReportOperator(BaseOperator):
    def __init__(
        self,
        profile_id: str,
        report_id: str,
        file_id: str,
        bucket_name: str,
        report_name: Optional[str] = None,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...

class GoogleCampaignManagerBatchInsertConversionsOperator(BaseOperator):
    def __init__(
        self,
        profile_id: str,
        conversions: List[Dict[str, Any]],
        encryption_entity_type: str,
        encryption_entity_id: str,
        encryption_source: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...

class GoogleCampaignManagerReportSensor(BaseSensorOperator):
    def __init__(
        self,
        profile_id: str,
        report_id: str,
        file_id: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...

Display & Video 360

Google Display & Video 360 API integration for programmatic advertising campaign management and reporting.

class GoogleDisplayVideo360Hook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...
    
    def create_query(
        self,
        query: Dict[str, Any],
        **kwargs
    ): ...
    def run_query(
        self,
        query_id: str,
        **kwargs
    ): ...
    def get_query(
        self,
        query_id: str,
        **kwargs
    ): ...
    def download_report(
        self,
        query_id: str,
        **kwargs
    ): ...
    def create_sdf_download_operation(
        self,
        body_request: Dict[str, Any],
        **kwargs
    ): ...
    def get_sdf_download_operation(
        self,
        operation_name: str,
        **kwargs
    ): ...

class GoogleDisplayVideo360CreateQueryOperator(BaseOperator):
    def __init__(
        self,
        query: Dict[str, Any],
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...

class GoogleDisplayVideo360RunQueryOperator(BaseOperator):
    def __init__(
        self,
        query_id: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...

class GoogleDisplayVideo360DownloadReportV2Operator(BaseOperator):
    def __init__(
        self,
        query_id: str,
        bucket_name: str,
        report_name: Optional[str] = None,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...

class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
    def __init__(
        self,
        body_request: Dict[str, Any],
        bucket_name: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...

class GoogleDisplayVideo360RunQuerySensor(BaseSensorOperator):
    def __init__(
        self,
        query_id: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...

class GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
    def __init__(
        self,
        operation_name: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v2",
        **kwargs
    ): ...

Search Ads

Google Search Ads API integration for search advertising campaign management and reporting.

class GoogleSearchAdsHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v0",
        **kwargs
    ): ...
    
    def search(
        self,
        customer_id: str,
        query: str,
        **kwargs
    ): ...
    def get_field(
        self,
        field_name: str,
        customer_id: str,
        **kwargs
    ): ...
    def list_custom_columns(
        self,
        customer_id: str,
        **kwargs
    ): ...

class GoogleSearchAdsReportingHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default", 
        api_version: str = "v0",
        **kwargs
    ): ...
    
    def generate_report(
        self,
        customer_id: str,
        report_request: Dict[str, Any],
        **kwargs
    ): ...

class GoogleSearchAdsSearchOperator(BaseOperator):
    def __init__(
        self,
        customer_id: str,
        query: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v0",
        **kwargs
    ): ...

class GoogleSearchAdsGetFieldOperator(BaseOperator):
    def __init__(
        self,
        field_name: str,
        customer_id: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v0",
        **kwargs
    ): ...

class GoogleSearchAdsListCustomColumnsOperator(BaseOperator):
    def __init__(
        self,
        customer_id: str,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v0",
        **kwargs
    ): ...

Usage Examples

Analytics Property Management

from airflow import DAG
from airflow.providers.google.marketing_platform.operators.analytics_admin import (
    GoogleAnalyticsAdminCreatePropertyOperator,
    GoogleAnalyticsAdminCreateDataStreamOperator
)
from datetime import datetime

dag = DAG(
    'analytics_property_setup',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval=None,
    catchup=False
)

# Create Analytics property
create_property = GoogleAnalyticsAdminCreatePropertyOperator(
    task_id='create_property',
    analytics_property={
        'displayName': 'My Website Analytics',
        'industryCategory': 'TECHNOLOGY',
        'timeZone': 'America/New_York',
        'currencyCode': 'USD'
    },
    dag=dag
)

# Create web data stream
create_stream = GoogleAnalyticsAdminCreateDataStreamOperator(
    task_id='create_stream',
    property_id='{{ task_instance.xcom_pull(task_ids="create_property", key="property_id") }}',
    data_stream={
        'type_': 'WEB_DATA_STREAM',
        'displayName': 'Website Stream',
        'webStreamData': {
            'defaultUri': 'https://example.com'
        }
    },
    dag=dag
)

create_property >> create_stream

Campaign Manager Reporting Pipeline

from airflow import DAG
from airflow.providers.google.marketing_platform.operators.campaign_manager import (
    GoogleCampaignManagerInsertReportOperator,
    GoogleCampaignManagerRunReportOperator,
    GoogleCampaignManagerDownloadReportOperator
)
from airflow.providers.google.marketing_platform.sensors.campaign_manager import (
    GoogleCampaignManagerReportSensor
)
from datetime import datetime, timedelta

dag = DAG(
    'campaign_manager_reporting',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Define report configuration
report_config = {
    'name': 'Daily Campaign Performance',
    'type': 'STANDARD',
    'criteria': {
        'dateRange': {
            'relativeDateRange': 'YESTERDAY'
        },
        'dimensions': [
            {'name': 'date'},
            {'name': 'campaign'},
            {'name': 'placement'}
        ],
        'metricNames': [
            'impressions',
            'clicks',
            'totalConversions'
        ]
    }
}

# Insert report definition
insert_report = GoogleCampaignManagerInsertReportOperator(
    task_id='insert_report',
    profile_id='12345678',
    report=report_config,
    dag=dag
)

# Run the report
run_report = GoogleCampaignManagerRunReportOperator(
    task_id='run_report',
    profile_id='12345678',
    report_id='{{ task_instance.xcom_pull(task_ids="insert_report", key="report_id") }}',
    synchronous=False,
    dag=dag
)

# Wait for report completion
wait_for_report = GoogleCampaignManagerReportSensor(
    task_id='wait_for_report',
    profile_id='12345678',
    report_id='{{ task_instance.xcom_pull(task_ids="insert_report", key="report_id") }}',
    file_id='{{ task_instance.xcom_pull(task_ids="run_report", key="file_id") }}',
    timeout=300,
    poke_interval=30,
    dag=dag
)

# Download report to GCS
download_report = GoogleCampaignManagerDownloadReportOperator(
    task_id='download_report',
    profile_id='12345678',
    report_id='{{ task_instance.xcom_pull(task_ids="insert_report", key="report_id") }}',
    file_id='{{ task_instance.xcom_pull(task_ids="run_report", key="file_id") }}',
    bucket_name='marketing-reports',
    report_name='campaign-performance/{{ ds }}/daily_report.csv',
    dag=dag
)

insert_report >> run_report >> wait_for_report >> download_report

Types

from typing import Dict, List, Optional, Any, Union
from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator

# Analytics Admin types
AnalyticsProperty = Dict[str, Any]
DataStream = Dict[str, Any]
PropertyId = str

# Campaign Manager types
CampaignManagerReport = Dict[str, Any]
ProfileId = str
ReportId = str
FileId = str
ConversionData = Dict[str, Any]

# Display & Video 360 types
DV360Query = Dict[str, Any]
QueryId = str
SDFRequest = Dict[str, Any]
OperationName = str

# Search Ads types
SearchQuery = str
CustomerId = str
FieldName = str
CustomColumn = Dict[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json