CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

google-workspace.mddocs/

Google Workspace Integration

Google Workspace (formerly G Suite) integration for Drive, Sheets, and Calendar. Enables document management, spreadsheet automation, and calendar scheduling within data pipelines for collaborative productivity workflows.

Capabilities

Google Drive

Google Drive API integration for file and folder management, sharing, and access control.

class GoogleDriveHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v3",
        **kwargs
    ): ...
    
    def get_conn(self): ...
    def upload_file(
        self,
        local_location: str,
        remote_location: str,
        chunk_size: int = 104857600,
        resumable: bool = False
    ): ...
    def download_file(
        self,
        file_id: str,
        file_handle: Any,
        chunk_size: int = 104857600
    ): ...
    def create_file(
        self,
        file_metadata: Dict[str, Any],
        media_body: Optional[Any] = None,
        fields: str = "id"
    ): ...
    def delete_file(
        self,
        file_id: str
    ): ...
    def get_file_id(
        self,
        folder_id: str,
        file_name: str,
        drive_id: Optional[str] = None
    ): ...

class GoogleDriveFileExistenceSensor(BaseSensorOperator):
    def __init__(
        self,
        folder_id: str,
        file_name: str,
        drive_id: Optional[str] = None,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

Google Sheets

Google Sheets API integration for spreadsheet creation, data manipulation, and automation.

class GSheetsHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v4",
        **kwargs
    ): ...
    
    def get_conn(self): ...
    def get_values(
        self,
        spreadsheet_id: str,
        range_: str,
        major_dimension: str = "ROWS",
        value_render_option: str = "FORMATTED_VALUE",
        date_time_render_option: str = "SERIAL_NUMBER"
    ): ...
    def update_values(
        self,
        spreadsheet_id: str,
        range_: str,
        values: List[List[Any]],
        major_dimension: str = "ROWS",
        value_input_option: str = "RAW",
        include_values_in_response: bool = False,
        value_render_option: str = "FORMATTED_VALUE",
        date_time_render_option: str = "SERIAL_NUMBER"
    ): ...
    def append_values(
        self,
        spreadsheet_id: str,
        range_: str,
        values: List[List[Any]],
        major_dimension: str = "ROWS",
        value_input_option: str = "RAW",
        insert_data_option: str = "OVERWRITE",
        include_values_in_response: bool = False,
        value_render_option: str = "FORMATTED_VALUE",
        date_time_render_option: str = "SERIAL_NUMBER"
    ): ...
    def clear_values(
        self,
        spreadsheet_id: str,
        range_: str
    ): ...
    def batch_update(
        self,
        spreadsheet_id: str,
        body: Dict[str, Any]
    ): ...
    def create(
        self,
        body: Dict[str, Any]
    ): ...

class GoogleSheetsCreateSpreadsheetOperator(BaseOperator):
    def __init__(
        self,
        spreadsheet: Dict[str, Any],
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

Google Calendar

Google Calendar API integration for event management and scheduling automation.

class GoogleCalendarHook(GoogleBaseHook):
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        api_version: str = "v3",
        **kwargs
    ): ...
    
    def get_conn(self): ...
    def get_events(
        self,
        calendar_id: str = "primary",
        time_min: Optional[str] = None,
        time_max: Optional[str] = None,
        max_results: Optional[int] = None,
        single_events: bool = False,
        order_by: Optional[str] = None
    ): ...
    def create_event(
        self,
        event: Dict[str, Any],
        calendar_id: str = "primary",
        send_notifications: bool = False,
        send_updates: str = "none"
    ): ...
    def delete_event(
        self,
        event_id: str,
        calendar_id: str = "primary",
        send_notifications: bool = False,
        send_updates: str = "none"
    ): ...
    def quick_add_event(
        self,
        text: str,
        calendar_id: str = "primary",
        send_notifications: bool = False
    ): ...

Transfer Operations

Specialized operators for transferring data between Google Cloud services and Google Workspace applications.

class GCSToGoogleDriveOperator(BaseOperator):
    """
    Transfers files from Google Cloud Storage to Google Drive.
    
    Args:
        source_bucket (str): GCS bucket name
        source_object (str): GCS object path
        destination_object (str): Google Drive file name
        folder_id (str): Google Drive folder ID for destination
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        move_object (bool): Whether to delete source after transfer
        
    Returns:
        Google Drive file ID of transferred file
    """
    def __init__(
        self,
        source_bucket: str,
        source_object: str,
        destination_object: str,
        folder_id: str,
        gcp_conn_id: str = "google_cloud_default",
        move_object: bool = False,
        **kwargs
    ): ...

class LocalFilesystemToGoogleDriveOperator(BaseOperator):
    """
    Transfers files from local filesystem to Google Drive.
    
    Args:
        local_paths (List[str]): Local file paths to transfer
        folder_id (str): Google Drive destination folder ID
        ignore_if_missing (bool): Whether to ignore missing local files
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        List of Google Drive file IDs
    """
    def __init__(
        self,
        local_paths: List[str],
        folder_id: str,
        ignore_if_missing: bool = False,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

class GCSToGoogleSheetsOperator(BaseOperator):
    """
    Transfers data from Google Cloud Storage to Google Sheets.
    
    Args:
        source_bucket (str): GCS bucket name
        source_object (str): GCS object path (CSV format)
        spreadsheet_id (str): Google Sheets spreadsheet ID
        range_name (str): Target range in A1 notation
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        clear_sheet (bool): Whether to clear existing data
        
    Returns:
        Number of rows updated in the spreadsheet
    """
    def __init__(
        self,
        source_bucket: str,
        source_object: str,
        spreadsheet_id: str,
        range_name: str = "Sheet1",
        gcp_conn_id: str = "google_cloud_default",
        clear_sheet: bool = False,
        **kwargs
    ): ...

class SQLToGoogleSheetsOperator(BaseOperator):
    """
    Transfers SQL query results to Google Sheets.
    
    Args:
        sql (str): SQL query to execute
        spreadsheet_id (str): Google Sheets spreadsheet ID
        range_name (str): Target range in A1 notation
        sql_conn_id (str): Connection ID for SQL database
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        parameters (Dict): SQL query parameters
        
    Returns:
        Number of rows updated in the spreadsheet
    """
    def __init__(
        self,
        sql: str,
        spreadsheet_id: str,
        range_name: str = "Sheet1",
        sql_conn_id: str = "default",
        gcp_conn_id: str = "google_cloud_default",
        parameters: Optional[Dict] = None,
        **kwargs
    ): ...

Usage Examples

Drive File Management

from airflow import DAG
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_drive import GCSToGoogleDriveOperator
from datetime import datetime

dag = DAG(
    'drive_file_management',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Wait for file in Drive
wait_for_file = GoogleDriveFileExistenceSensor(
    task_id='wait_for_file',
    folder_id='1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms',
    file_name='input_data.csv',
    timeout=300,
    poke_interval=60,
    dag=dag
)

# Transfer processed file to Drive
transfer_to_drive = GCSToGoogleDriveOperator(
    task_id='transfer_to_drive',
    source_bucket='processed-data',
    source_object='outputs/{{ ds }}/results.csv',
    destination_object='Daily Results {{ ds }}.csv',
    folder_id='1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms',
    dag=dag
)

wait_for_file >> transfer_to_drive

Sheets Data Pipeline

from airflow import DAG
from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator
from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator
from datetime import datetime

dag = DAG(
    'sheets_reporting',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Create new spreadsheet for daily report
create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
    task_id='create_spreadsheet',
    spreadsheet={
        'properties': {
            'title': 'Daily Sales Report - {{ ds }}',
            'locale': 'en_US',
            'timeZone': 'America/New_York'
        },
        'sheets': [
            {
                'properties': {
                    'title': 'Sales Data',
                    'gridProperties': {
                        'rowCount': 1000,
                        'columnCount': 10
                    }
                }
            }
        ]
    },
    dag=dag
)

# Export SQL data to Sheets
export_sales_data = SQLToGoogleSheetsOperator(
    task_id='export_sales_data',
    sql='''
        SELECT 
            product_name,
            category,
            sales_date,
            quantity_sold,
            unit_price,
            total_revenue
        FROM sales_summary 
        WHERE sales_date = '{{ ds }}'
        ORDER BY total_revenue DESC
    ''',
    spreadsheet_id='{{ task_instance.xcom_pull(task_ids="create_spreadsheet", key="spreadsheet_id") }}',
    range_name='Sales Data!A1',
    sql_conn_id='postgres_default',
    dag=dag
)

create_spreadsheet >> export_sales_data

Authentication

Google Workspace integration requires OAuth 2.0 authentication with appropriate scopes:

# Required OAuth scopes for Workspace APIs
DRIVE_SCOPES = [
    'https://www.googleapis.com/auth/drive',
    'https://www.googleapis.com/auth/drive.file'
]

SHEETS_SCOPES = [
    'https://www.googleapis.com/auth/spreadsheets',
    'https://www.googleapis.com/auth/drive.file'
]

CALENDAR_SCOPES = [
    'https://www.googleapis.com/auth/calendar',
    'https://www.googleapis.com/auth/calendar.events'
]

Types

from typing import Dict, List, Optional, Any, Union
from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator

# Drive types
DriveFileId = str
FolderId = str
FileMetadata = Dict[str, Any]
DrivePermissions = Dict[str, Any]

# Sheets types  
SpreadsheetId = str
SheetRange = str
CellValues = List[List[Any]]
SpreadsheetData = Dict[str, Any]
BatchUpdateRequest = Dict[str, Any]

# Calendar types
CalendarId = str
EventId = str
CalendarEvent = Dict[str, Any]
EventDateTime = Dict[str, str]
EventAttendee = Dict[str, str]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json