Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Integration with Google Marketing Platform services including Google Analytics Admin, Campaign Manager, Display & Video 360, and Search Ads. Provides comprehensive digital marketing automation and reporting capabilities for advertising campaign management and performance analysis.
Google Analytics Admin API integration for managing Analytics properties, data streams, and administrative settings.
class GoogleAnalyticsAdminHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1beta",
**kwargs
): ...
def list_accounts(
self,
page_size: Optional[int] = None,
page_token: Optional[str] = None,
show_deleted: bool = False
): ...
def create_property(
self,
analytics_property: Dict[str, Any],
**kwargs
): ...
def delete_property(
self,
property_name: str,
**kwargs
): ...
def create_data_stream(
self,
parent: str,
data_stream: Dict[str, Any],
**kwargs
): ...
class GoogleAnalyticsAdminListAccountsOperator(BaseOperator):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1beta",
**kwargs
): ...
class GoogleAnalyticsAdminCreatePropertyOperator(BaseOperator):
def __init__(
self,
analytics_property: Dict[str, Any],
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1beta",
**kwargs
): ...
class GoogleAnalyticsAdminDeletePropertyOperator(BaseOperator):
def __init__(
self,
property_id: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1beta",
**kwargs
): ...
class GoogleAnalyticsAdminCreateDataStreamOperator(BaseOperator):
def __init__(
self,
property_id: str,
data_stream: Dict[str, Any],
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1beta",
**kwargs
): ...Google Campaign Manager API integration for managing advertising campaigns, reports, and conversions.
class GoogleCampaignManagerHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...
def insert_report(
self,
profile_id: str,
report: Dict[str, Any],
**kwargs
): ...
def run_report(
self,
profile_id: str,
report_id: str,
synchronous: bool = True,
**kwargs
): ...
def get_report(
self,
profile_id: str,
report_id: str,
**kwargs
): ...
def get_report_file(
self,
profile_id: str,
report_id: str,
file_id: str,
**kwargs
): ...
def batch_insert_conversions(
self,
profile_id: str,
conversions: List[Dict[str, Any]],
encryption_entity_type: str,
encryption_entity_id: str,
encryption_source: str,
**kwargs
): ...
class GoogleCampaignManagerInsertReportOperator(BaseOperator):
def __init__(
self,
profile_id: str,
report: Dict[str, Any],
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...
class GoogleCampaignManagerRunReportOperator(BaseOperator):
def __init__(
self,
profile_id: str,
report_id: str,
synchronous: bool = True,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...
class GoogleCampaignManagerDownloadReportOperator(BaseOperator):
def __init__(
self,
profile_id: str,
report_id: str,
file_id: str,
bucket_name: str,
report_name: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...
class GoogleCampaignManagerBatchInsertConversionsOperator(BaseOperator):
def __init__(
self,
profile_id: str,
conversions: List[Dict[str, Any]],
encryption_entity_type: str,
encryption_entity_id: str,
encryption_source: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...
class GoogleCampaignManagerReportSensor(BaseSensorOperator):
def __init__(
self,
profile_id: str,
report_id: str,
file_id: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...Google Display & Video 360 API integration for programmatic advertising campaign management and reporting.
class GoogleDisplayVideo360Hook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...
def create_query(
self,
query: Dict[str, Any],
**kwargs
): ...
def run_query(
self,
query_id: str,
**kwargs
): ...
def get_query(
self,
query_id: str,
**kwargs
): ...
def download_report(
self,
query_id: str,
**kwargs
): ...
def create_sdf_download_operation(
self,
body_request: Dict[str, Any],
**kwargs
): ...
def get_sdf_download_operation(
self,
operation_name: str,
**kwargs
): ...
class GoogleDisplayVideo360CreateQueryOperator(BaseOperator):
def __init__(
self,
query: Dict[str, Any],
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...
class GoogleDisplayVideo360RunQueryOperator(BaseOperator):
def __init__(
self,
query_id: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...
class GoogleDisplayVideo360DownloadReportV2Operator(BaseOperator):
def __init__(
self,
query_id: str,
bucket_name: str,
report_name: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...
class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
def __init__(
self,
body_request: Dict[str, Any],
bucket_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...
class GoogleDisplayVideo360RunQuerySensor(BaseSensorOperator):
def __init__(
self,
query_id: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...
class GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
def __init__(
self,
operation_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
**kwargs
): ...Google Search Ads API integration for search advertising campaign management and reporting.
class GoogleSearchAdsHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v0",
**kwargs
): ...
def search(
self,
customer_id: str,
query: str,
**kwargs
): ...
def get_field(
self,
field_name: str,
customer_id: str,
**kwargs
): ...
def list_custom_columns(
self,
customer_id: str,
**kwargs
): ...
class GoogleSearchAdsReportingHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v0",
**kwargs
): ...
def generate_report(
self,
customer_id: str,
report_request: Dict[str, Any],
**kwargs
): ...
class GoogleSearchAdsSearchOperator(BaseOperator):
def __init__(
self,
customer_id: str,
query: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v0",
**kwargs
): ...
class GoogleSearchAdsGetFieldOperator(BaseOperator):
def __init__(
self,
field_name: str,
customer_id: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v0",
**kwargs
): ...
class GoogleSearchAdsListCustomColumnsOperator(BaseOperator):
def __init__(
self,
customer_id: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v0",
**kwargs
): ...from airflow import DAG
from airflow.providers.google.marketing_platform.operators.analytics_admin import (
GoogleAnalyticsAdminCreatePropertyOperator,
GoogleAnalyticsAdminCreateDataStreamOperator
)
from datetime import datetime
dag = DAG(
'analytics_property_setup',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval=None,
catchup=False
)
# Create Analytics property
create_property = GoogleAnalyticsAdminCreatePropertyOperator(
task_id='create_property',
analytics_property={
'displayName': 'My Website Analytics',
'industryCategory': 'TECHNOLOGY',
'timeZone': 'America/New_York',
'currencyCode': 'USD'
},
dag=dag
)
# Create web data stream
create_stream = GoogleAnalyticsAdminCreateDataStreamOperator(
task_id='create_stream',
property_id='{{ task_instance.xcom_pull(task_ids="create_property", key="property_id") }}',
data_stream={
'type_': 'WEB_DATA_STREAM',
'displayName': 'Website Stream',
'webStreamData': {
'defaultUri': 'https://example.com'
}
},
dag=dag
)
create_property >> create_streamfrom airflow import DAG
from airflow.providers.google.marketing_platform.operators.campaign_manager import (
GoogleCampaignManagerInsertReportOperator,
GoogleCampaignManagerRunReportOperator,
GoogleCampaignManagerDownloadReportOperator
)
from airflow.providers.google.marketing_platform.sensors.campaign_manager import (
GoogleCampaignManagerReportSensor
)
from datetime import datetime, timedelta
dag = DAG(
'campaign_manager_reporting',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Define report configuration
report_config = {
'name': 'Daily Campaign Performance',
'type': 'STANDARD',
'criteria': {
'dateRange': {
'relativeDateRange': 'YESTERDAY'
},
'dimensions': [
{'name': 'date'},
{'name': 'campaign'},
{'name': 'placement'}
],
'metricNames': [
'impressions',
'clicks',
'totalConversions'
]
}
}
# Insert report definition
insert_report = GoogleCampaignManagerInsertReportOperator(
task_id='insert_report',
profile_id='12345678',
report=report_config,
dag=dag
)
# Run the report
run_report = GoogleCampaignManagerRunReportOperator(
task_id='run_report',
profile_id='12345678',
report_id='{{ task_instance.xcom_pull(task_ids="insert_report", key="report_id") }}',
synchronous=False,
dag=dag
)
# Wait for report completion
wait_for_report = GoogleCampaignManagerReportSensor(
task_id='wait_for_report',
profile_id='12345678',
report_id='{{ task_instance.xcom_pull(task_ids="insert_report", key="report_id") }}',
file_id='{{ task_instance.xcom_pull(task_ids="run_report", key="file_id") }}',
timeout=300,
poke_interval=30,
dag=dag
)
# Download report to GCS
download_report = GoogleCampaignManagerDownloadReportOperator(
task_id='download_report',
profile_id='12345678',
report_id='{{ task_instance.xcom_pull(task_ids="insert_report", key="report_id") }}',
file_id='{{ task_instance.xcom_pull(task_ids="run_report", key="file_id") }}',
bucket_name='marketing-reports',
report_name='campaign-performance/{{ ds }}/daily_report.csv',
dag=dag
)
insert_report >> run_report >> wait_for_report >> download_reportfrom typing import Dict, List, Optional, Any, Union
from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator
# Analytics Admin types
AnalyticsProperty = Dict[str, Any]
DataStream = Dict[str, Any]
PropertyId = str
# Campaign Manager types
CampaignManagerReport = Dict[str, Any]
ProfileId = str
ReportId = str
FileId = str
ConversionData = Dict[str, Any]
# Display & Video 360 types
DV360Query = Dict[str, Any]
QueryId = str
SDFRequest = Dict[str, Any]
OperationName = str
# Search Ads types
SearchQuery = str
CustomerId = str
FieldName = str
CustomColumn = Dict[str, Any]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-google