Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Extensive transfer capabilities between Google services and external systems including AWS S3, Azure Blob Storage, SFTP, local filesystems, and various databases. Enables seamless data movement across cloud platforms and on-premises systems.
Transfer operations to and from Google Cloud Storage with various external systems.
class GCSToBigQueryOperator(BaseOperator):
"""
Transfers data from Google Cloud Storage to BigQuery tables.
Args:
bucket (str): GCS bucket name
source_objects (List[str]): List of GCS object paths
destination_project_dataset_table (str): BigQuery destination in format project.dataset.table
schema_fields (List[Dict]): Table schema definition
write_disposition (str): Write mode (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
source_format (str): Source data format (CSV, JSON, AVRO, PARQUET)
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
BigQuery job result
"""
def __init__(
self,
bucket: str,
source_objects: List[str],
destination_project_dataset_table: str,
schema_fields: Optional[List[Dict]] = None,
write_disposition: str = "WRITE_EMPTY",
source_format: str = "CSV",
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...
class BigQueryToGCSOperator(BaseOperator):
"""
Transfers data from BigQuery to Google Cloud Storage.
Args:
source_project_dataset_table (str): BigQuery source table
destination_cloud_storage_uris (List[str]): GCS destination URIs
compression (str): Output compression format
export_format (str): Export format (CSV, JSON, AVRO, PARQUET)
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
Export job result with file locations
"""
def __init__(
self,
source_project_dataset_table: str,
destination_cloud_storage_uris: List[str],
compression: str = "NONE",
export_format: str = "CSV",
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...
class S3ToGCSOperator(BaseOperator):
"""
Transfers objects from Amazon S3 to Google Cloud Storage.
Args:
bucket (str): S3 bucket name
prefix (str): S3 object key prefix
gcp_conn_id (str): Connection ID for Google Cloud Platform
aws_conn_id (str): Connection ID for Amazon Web Services
dest_gcs_conn_id (str): Destination GCS connection ID
dest_bucket (str): Destination GCS bucket name
dest_prefix (str): Destination GCS object prefix
replace (bool): Whether to replace existing objects
Returns:
List of transferred object keys
"""
def __init__(
self,
bucket: str,
prefix: str = "",
gcp_conn_id: str = "google_cloud_default",
aws_conn_id: str = "aws_default",
dest_gcs_conn_id: Optional[str] = None,
dest_bucket: Optional[str] = None,
dest_prefix: str = "",
replace: bool = True,
**kwargs
): ...
class AzureBlobStorageToGCSOperator(BaseOperator):
"""
Transfers blobs from Azure Blob Storage to Google Cloud Storage.
Args:
blob_name (str): Azure blob name or prefix
file_path (str): Destination GCS object path
container_name (str): Azure container name
bucket_name (str): Destination GCS bucket name
azure_conn_id (str): Connection ID for Azure Blob Storage
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
GCS object path of transferred data
"""
def __init__(
self,
blob_name: str,
file_path: str,
container_name: str,
bucket_name: str,
azure_conn_id: str = "azure_blob_default",
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...Transfer operations between various databases and Google Cloud services.
class MySQLToGCSOperator(BaseOperator):
"""
Transfers data from MySQL database to Google Cloud Storage.
Args:
sql (str): SQL query to execute
bucket (str): Destination GCS bucket name
filename (str): Destination GCS object path
schema_filename (str): Optional schema file path in GCS
mysql_conn_id (str): Connection ID for MySQL database
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
GCS object path of exported data
"""
def __init__(
self,
sql: str,
bucket: str,
filename: str,
schema_filename: Optional[str] = None,
mysql_conn_id: str = "mysql_default",
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...
class PostgrestoGCSOperator(BaseOperator):
"""
Transfers data from PostgreSQL database to Google Cloud Storage.
Args:
sql (str): SQL query to execute
bucket (str): Destination GCS bucket name
filename (str): Destination GCS object path
schema_filename (str): Optional schema file path in GCS
postgres_conn_id (str): Connection ID for PostgreSQL database
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
GCS object path of exported data
"""
def __init__(
self,
sql: str,
bucket: str,
filename: str,
schema_filename: Optional[str] = None,
postgres_conn_id: str = "postgres_default",
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...
class BigQueryToPostgresOperator(BaseOperator):
"""
Transfers data from BigQuery to PostgreSQL database.
Args:
dataset_table (str): BigQuery source table
target_table_name (str): PostgreSQL destination table name
postgres_conn_id (str): Connection ID for PostgreSQL database
bigquery_conn_id (str): Connection ID for BigQuery
Returns:
Number of rows transferred
"""
def __init__(
self,
dataset_table: str,
target_table_name: str,
postgres_conn_id: str = "postgres_default",
bigquery_conn_id: str = "google_cloud_default",
**kwargs
): ...Transfer operations between GCS and various file systems.
class GCSToLocalFilesystemOperator(BaseOperator):
"""
Downloads objects from Google Cloud Storage to local filesystem.
Args:
bucket (str): GCS bucket name
object_name (str): GCS object path
filename (str): Local filesystem destination path
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
Local file path of downloaded object
"""
def __init__(
self,
bucket: str,
object_name: str,
filename: str,
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...
class SFTPToGCSOperator(BaseOperator):
"""
Transfers files from SFTP server to Google Cloud Storage.
Args:
source_path (str): SFTP source file path
destination_bucket (str): GCS destination bucket name
destination_path (str): GCS destination object path
sftp_conn_id (str): Connection ID for SFTP server
gcp_conn_id (str): Connection ID for Google Cloud Platform
move_object (bool): Whether to delete source after transfer
Returns:
GCS object path of transferred file
"""
def __init__(
self,
source_path: str,
destination_bucket: str,
destination_path: str,
sftp_conn_id: str = "sftp_default",
gcp_conn_id: str = "google_cloud_default",
move_object: bool = False,
**kwargs
): ...from airflow import DAG
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from datetime import datetime
dag = DAG(
'multi_cloud_etl',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Transfer raw data from S3 to GCS
s3_to_gcs = S3ToGCSOperator(
task_id='s3_to_gcs',
bucket='source-data-bucket',
prefix='raw-data/{{ ds }}/',
dest_bucket='processed-data-lake',
dest_prefix='staging/{{ ds }}/',
aws_conn_id='aws_default',
dag=dag
)
# Load data into BigQuery for processing
gcs_to_bq = GCSToBigQueryOperator(
task_id='gcs_to_bq',
bucket='processed-data-lake',
source_objects=['staging/{{ ds }}/*.csv'],
destination_project_dataset_table='analytics.raw_data.daily_imports',
schema_fields=[
{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
{'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'event_type', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'properties', 'type': 'JSON', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag
)
# Export processed results
bq_to_gcs = BigQueryToGCSOperator(
task_id='bq_to_gcs',
source_project_dataset_table='analytics.processed_data.user_metrics',
destination_cloud_storage_uris=[
'gs://processed-data-lake/exports/{{ ds }}/user_metrics.parquet'
],
export_format='PARQUET',
dag=dag
)
s3_to_gcs >> gcs_to_bq >> bq_to_gcsfrom airflow import DAG
from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime
dag = DAG(
'database_migration',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Extract data from MySQL
mysql_to_gcs = MySQLToGCSOperator(
task_id='mysql_to_gcs',
sql='''
SELECT
customer_id,
order_date,
total_amount,
status,
created_at,
updated_at
FROM orders
WHERE DATE(created_at) = '{{ ds }}'
''',
bucket='data-migration',
filename='orders/{{ ds }}/orders.csv',
mysql_conn_id='mysql_prod',
dag=dag
)
# Load into BigQuery
gcs_to_bigquery = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='data-migration',
source_objects=['orders/{{ ds }}/orders.csv'],
destination_project_dataset_table='warehouse.sales.orders',
schema_fields=[
{'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
{'name': 'order_date', 'type': 'DATE', 'mode': 'REQUIRED'},
{'name': 'total_amount', 'type': 'NUMERIC', 'mode': 'REQUIRED'},
{'name': 'status', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
{'name': 'updated_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_APPEND',
dag=dag
)
mysql_to_gcs >> gcs_to_bigqueryfrom typing import List, Optional, Dict, Any, Union
from airflow.models import BaseOperator
# Transfer operation types
SourcePath = str
DestinationPath = str
BucketName = str
ObjectKey = str
TableReference = str
# Schema and format types
SchemaField = Dict[str, str]
WriteDisposition = str # WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY
SourceFormat = str # CSV, JSON, AVRO, PARQUET
ExportFormat = str # CSV, JSON, AVRO, PARQUET
CompressionType = str # NONE, GZIP, DEFLATE, SNAPPY
# Connection types
ConnectionId = str
TransferResult = Dict[str, Any]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-google