CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-transfers.mddocs/

Data Transfer Operations

Extensive transfer capabilities between Google services and external systems including AWS S3, Azure Blob Storage, SFTP, local filesystems, and various databases. Enables seamless data movement across cloud platforms and on-premises systems.

Capabilities

Cloud Storage Transfers

Transfer operations to and from Google Cloud Storage with various external systems.

class GCSToBigQueryOperator(BaseOperator):
    """
    Transfers data from Google Cloud Storage to BigQuery tables.
    
    Args:
        bucket (str): GCS bucket name
        source_objects (List[str]): List of GCS object paths
        destination_project_dataset_table (str): BigQuery destination in format project.dataset.table
        schema_fields (List[Dict]): Table schema definition
        write_disposition (str): Write mode (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
        source_format (str): Source data format (CSV, JSON, AVRO, PARQUET)
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        BigQuery job result
    """
    def __init__(
        self,
        bucket: str,
        source_objects: List[str],
        destination_project_dataset_table: str,
        schema_fields: Optional[List[Dict]] = None,
        write_disposition: str = "WRITE_EMPTY",
        source_format: str = "CSV",
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

class BigQueryToGCSOperator(BaseOperator):
    """
    Transfers data from BigQuery to Google Cloud Storage.
    
    Args:
        source_project_dataset_table (str): BigQuery source table
        destination_cloud_storage_uris (List[str]): GCS destination URIs
        compression (str): Output compression format
        export_format (str): Export format (CSV, JSON, AVRO, PARQUET)
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        Export job result with file locations
    """
    def __init__(
        self,
        source_project_dataset_table: str,
        destination_cloud_storage_uris: List[str],
        compression: str = "NONE",
        export_format: str = "CSV",
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

class S3ToGCSOperator(BaseOperator):
    """
    Transfers objects from Amazon S3 to Google Cloud Storage.
    
    Args:
        bucket (str): S3 bucket name
        prefix (str): S3 object key prefix
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        aws_conn_id (str): Connection ID for Amazon Web Services
        dest_gcs_conn_id (str): Destination GCS connection ID
        dest_bucket (str): Destination GCS bucket name
        dest_prefix (str): Destination GCS object prefix
        replace (bool): Whether to replace existing objects
        
    Returns:
        List of transferred object keys
    """
    def __init__(
        self,
        bucket: str,
        prefix: str = "",
        gcp_conn_id: str = "google_cloud_default",
        aws_conn_id: str = "aws_default",
        dest_gcs_conn_id: Optional[str] = None,
        dest_bucket: Optional[str] = None,
        dest_prefix: str = "",
        replace: bool = True,
        **kwargs
    ): ...

class AzureBlobStorageToGCSOperator(BaseOperator):
    """
    Transfers blobs from Azure Blob Storage to Google Cloud Storage.
    
    Args:
        blob_name (str): Azure blob name or prefix
        file_path (str): Destination GCS object path
        container_name (str): Azure container name
        bucket_name (str): Destination GCS bucket name
        azure_conn_id (str): Connection ID for Azure Blob Storage
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        GCS object path of transferred data
    """
    def __init__(
        self,
        blob_name: str,
        file_path: str,
        container_name: str,
        bucket_name: str,
        azure_conn_id: str = "azure_blob_default",
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

Database Transfers

Transfer operations between various databases and Google Cloud services.

class MySQLToGCSOperator(BaseOperator):
    """
    Transfers data from MySQL database to Google Cloud Storage.
    
    Args:
        sql (str): SQL query to execute
        bucket (str): Destination GCS bucket name
        filename (str): Destination GCS object path
        schema_filename (str): Optional schema file path in GCS
        mysql_conn_id (str): Connection ID for MySQL database
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        GCS object path of exported data
    """
    def __init__(
        self,
        sql: str,
        bucket: str,
        filename: str,
        schema_filename: Optional[str] = None,
        mysql_conn_id: str = "mysql_default",
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

class PostgrestoGCSOperator(BaseOperator):
    """
    Transfers data from PostgreSQL database to Google Cloud Storage.
    
    Args:
        sql (str): SQL query to execute
        bucket (str): Destination GCS bucket name
        filename (str): Destination GCS object path
        schema_filename (str): Optional schema file path in GCS
        postgres_conn_id (str): Connection ID for PostgreSQL database
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        GCS object path of exported data
    """
    def __init__(
        self,
        sql: str,
        bucket: str,
        filename: str,
        schema_filename: Optional[str] = None,
        postgres_conn_id: str = "postgres_default",
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

class BigQueryToPostgresOperator(BaseOperator):
    """
    Transfers data from BigQuery to PostgreSQL database.
    
    Args:
        dataset_table (str): BigQuery source table
        target_table_name (str): PostgreSQL destination table name
        postgres_conn_id (str): Connection ID for PostgreSQL database
        bigquery_conn_id (str): Connection ID for BigQuery
        
    Returns:
        Number of rows transferred
    """
    def __init__(
        self,
        dataset_table: str,
        target_table_name: str,
        postgres_conn_id: str = "postgres_default",
        bigquery_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

File System Transfers

Transfer operations between GCS and various file systems.

class GCSToLocalFilesystemOperator(BaseOperator):
    """
    Downloads objects from Google Cloud Storage to local filesystem.
    
    Args:
        bucket (str): GCS bucket name
        object_name (str): GCS object path
        filename (str): Local filesystem destination path
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        Local file path of downloaded object
    """
    def __init__(
        self,
        bucket: str,
        object_name: str,
        filename: str,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

class SFTPToGCSOperator(BaseOperator):
    """
    Transfers files from SFTP server to Google Cloud Storage.
    
    Args:
        source_path (str): SFTP source file path
        destination_bucket (str): GCS destination bucket name
        destination_path (str): GCS destination object path
        sftp_conn_id (str): Connection ID for SFTP server
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        move_object (bool): Whether to delete source after transfer
        
    Returns:
        GCS object path of transferred file
    """
    def __init__(
        self,
        source_path: str,
        destination_bucket: str,
        destination_path: str,
        sftp_conn_id: str = "sftp_default",
        gcp_conn_id: str = "google_cloud_default",
        move_object: bool = False,
        **kwargs
    ): ...

Usage Examples

Multi-Stage ETL Pipeline

from airflow import DAG
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from datetime import datetime

dag = DAG(
    'multi_cloud_etl',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Transfer raw data from S3 to GCS
s3_to_gcs = S3ToGCSOperator(
    task_id='s3_to_gcs',
    bucket='source-data-bucket',
    prefix='raw-data/{{ ds }}/',
    dest_bucket='processed-data-lake',
    dest_prefix='staging/{{ ds }}/',
    aws_conn_id='aws_default',
    dag=dag
)

# Load data into BigQuery for processing
gcs_to_bq = GCSToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='processed-data-lake',
    source_objects=['staging/{{ ds }}/*.csv'],
    destination_project_dataset_table='analytics.raw_data.daily_imports',
    schema_fields=[
        {'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
        {'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'event_type', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'properties', 'type': 'JSON', 'mode': 'NULLABLE'},
    ],
    write_disposition='WRITE_TRUNCATE',
    dag=dag
)

# Export processed results
bq_to_gcs = BigQueryToGCSOperator(
    task_id='bq_to_gcs',
    source_project_dataset_table='analytics.processed_data.user_metrics',
    destination_cloud_storage_uris=[
        'gs://processed-data-lake/exports/{{ ds }}/user_metrics.parquet'
    ],
    export_format='PARQUET',
    dag=dag
)

s3_to_gcs >> gcs_to_bq >> bq_to_gcs

Database Migration Pipeline

from airflow import DAG
from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime

dag = DAG(
    'database_migration',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Extract data from MySQL
mysql_to_gcs = MySQLToGCSOperator(
    task_id='mysql_to_gcs',
    sql='''
        SELECT 
            customer_id,
            order_date,
            total_amount,
            status,
            created_at,
            updated_at
        FROM orders 
        WHERE DATE(created_at) = '{{ ds }}'
    ''',
    bucket='data-migration',
    filename='orders/{{ ds }}/orders.csv',
    mysql_conn_id='mysql_prod',
    dag=dag
)

# Load into BigQuery
gcs_to_bigquery = GCSToBigQueryOperator(
    task_id='gcs_to_bigquery',
    bucket='data-migration',
    source_objects=['orders/{{ ds }}/orders.csv'],
    destination_project_dataset_table='warehouse.sales.orders',
    schema_fields=[
        {'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'order_date', 'type': 'DATE', 'mode': 'REQUIRED'},
        {'name': 'total_amount', 'type': 'NUMERIC', 'mode': 'REQUIRED'},
        {'name': 'status', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
        {'name': 'updated_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
    ],
    write_disposition='WRITE_APPEND',
    dag=dag
)

mysql_to_gcs >> gcs_to_bigquery

Types

from typing import List, Optional, Dict, Any, Union
from airflow.models import BaseOperator

# Transfer operation types
SourcePath = str
DestinationPath = str
BucketName = str
ObjectKey = str
TableReference = str

# Schema and format types
SchemaField = Dict[str, str]
WriteDisposition = str  # WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY
SourceFormat = str      # CSV, JSON, AVRO, PARQUET
ExportFormat = str      # CSV, JSON, AVRO, PARQUET
CompressionType = str   # NONE, GZIP, DEFLATE, SNAPPY

# Connection types
ConnectionId = str
TransferResult = Dict[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json