Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Google Workspace (formerly G Suite) integration for Drive, Sheets, and Calendar. Enables document management, spreadsheet automation, and calendar scheduling within data pipelines for collaborative productivity workflows.
Google Drive API integration for file and folder management, sharing, and access control.
class GoogleDriveHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v3",
**kwargs
): ...
def get_conn(self): ...
def upload_file(
self,
local_location: str,
remote_location: str,
chunk_size: int = 104857600,
resumable: bool = False
): ...
def download_file(
self,
file_id: str,
file_handle: Any,
chunk_size: int = 104857600
): ...
def create_file(
self,
file_metadata: Dict[str, Any],
media_body: Optional[Any] = None,
fields: str = "id"
): ...
def delete_file(
self,
file_id: str
): ...
def get_file_id(
self,
folder_id: str,
file_name: str,
drive_id: Optional[str] = None
): ...
class GoogleDriveFileExistenceSensor(BaseSensorOperator):
def __init__(
self,
folder_id: str,
file_name: str,
drive_id: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...Google Sheets API integration for spreadsheet creation, data manipulation, and automation.
class GSheetsHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v4",
**kwargs
): ...
def get_conn(self): ...
def get_values(
self,
spreadsheet_id: str,
range_: str,
major_dimension: str = "ROWS",
value_render_option: str = "FORMATTED_VALUE",
date_time_render_option: str = "SERIAL_NUMBER"
): ...
def update_values(
self,
spreadsheet_id: str,
range_: str,
values: List[List[Any]],
major_dimension: str = "ROWS",
value_input_option: str = "RAW",
include_values_in_response: bool = False,
value_render_option: str = "FORMATTED_VALUE",
date_time_render_option: str = "SERIAL_NUMBER"
): ...
def append_values(
self,
spreadsheet_id: str,
range_: str,
values: List[List[Any]],
major_dimension: str = "ROWS",
value_input_option: str = "RAW",
insert_data_option: str = "OVERWRITE",
include_values_in_response: bool = False,
value_render_option: str = "FORMATTED_VALUE",
date_time_render_option: str = "SERIAL_NUMBER"
): ...
def clear_values(
self,
spreadsheet_id: str,
range_: str
): ...
def batch_update(
self,
spreadsheet_id: str,
body: Dict[str, Any]
): ...
def create(
self,
body: Dict[str, Any]
): ...
class GoogleSheetsCreateSpreadsheetOperator(BaseOperator):
def __init__(
self,
spreadsheet: Dict[str, Any],
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...Google Calendar API integration for event management and scheduling automation.
class GoogleCalendarHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v3",
**kwargs
): ...
def get_conn(self): ...
def get_events(
self,
calendar_id: str = "primary",
time_min: Optional[str] = None,
time_max: Optional[str] = None,
max_results: Optional[int] = None,
single_events: bool = False,
order_by: Optional[str] = None
): ...
def create_event(
self,
event: Dict[str, Any],
calendar_id: str = "primary",
send_notifications: bool = False,
send_updates: str = "none"
): ...
def delete_event(
self,
event_id: str,
calendar_id: str = "primary",
send_notifications: bool = False,
send_updates: str = "none"
): ...
def quick_add_event(
self,
text: str,
calendar_id: str = "primary",
send_notifications: bool = False
): ...Specialized operators for transferring data between Google Cloud services and Google Workspace applications.
class GCSToGoogleDriveOperator(BaseOperator):
"""
Transfers files from Google Cloud Storage to Google Drive.
Args:
source_bucket (str): GCS bucket name
source_object (str): GCS object path
destination_object (str): Google Drive file name
folder_id (str): Google Drive folder ID for destination
gcp_conn_id (str): Connection ID for Google Cloud Platform
move_object (bool): Whether to delete source after transfer
Returns:
Google Drive file ID of transferred file
"""
def __init__(
self,
source_bucket: str,
source_object: str,
destination_object: str,
folder_id: str,
gcp_conn_id: str = "google_cloud_default",
move_object: bool = False,
**kwargs
): ...
class LocalFilesystemToGoogleDriveOperator(BaseOperator):
"""
Transfers files from local filesystem to Google Drive.
Args:
local_paths (List[str]): Local file paths to transfer
folder_id (str): Google Drive destination folder ID
ignore_if_missing (bool): Whether to ignore missing local files
gcp_conn_id (str): Connection ID for Google Cloud Platform
Returns:
List of Google Drive file IDs
"""
def __init__(
self,
local_paths: List[str],
folder_id: str,
ignore_if_missing: bool = False,
gcp_conn_id: str = "google_cloud_default",
**kwargs
): ...
class GCSToGoogleSheetsOperator(BaseOperator):
"""
Transfers data from Google Cloud Storage to Google Sheets.
Args:
source_bucket (str): GCS bucket name
source_object (str): GCS object path (CSV format)
spreadsheet_id (str): Google Sheets spreadsheet ID
range_name (str): Target range in A1 notation
gcp_conn_id (str): Connection ID for Google Cloud Platform
clear_sheet (bool): Whether to clear existing data
Returns:
Number of rows updated in the spreadsheet
"""
def __init__(
self,
source_bucket: str,
source_object: str,
spreadsheet_id: str,
range_name: str = "Sheet1",
gcp_conn_id: str = "google_cloud_default",
clear_sheet: bool = False,
**kwargs
): ...
class SQLToGoogleSheetsOperator(BaseOperator):
"""
Transfers SQL query results to Google Sheets.
Args:
sql (str): SQL query to execute
spreadsheet_id (str): Google Sheets spreadsheet ID
range_name (str): Target range in A1 notation
sql_conn_id (str): Connection ID for SQL database
gcp_conn_id (str): Connection ID for Google Cloud Platform
parameters (Dict): SQL query parameters
Returns:
Number of rows updated in the spreadsheet
"""
def __init__(
self,
sql: str,
spreadsheet_id: str,
range_name: str = "Sheet1",
sql_conn_id: str = "default",
gcp_conn_id: str = "google_cloud_default",
parameters: Optional[Dict] = None,
**kwargs
): ...from airflow import DAG
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_drive import GCSToGoogleDriveOperator
from datetime import datetime
dag = DAG(
'drive_file_management',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Wait for file in Drive
wait_for_file = GoogleDriveFileExistenceSensor(
task_id='wait_for_file',
folder_id='1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms',
file_name='input_data.csv',
timeout=300,
poke_interval=60,
dag=dag
)
# Transfer processed file to Drive
transfer_to_drive = GCSToGoogleDriveOperator(
task_id='transfer_to_drive',
source_bucket='processed-data',
source_object='outputs/{{ ds }}/results.csv',
destination_object='Daily Results {{ ds }}.csv',
folder_id='1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms',
dag=dag
)
wait_for_file >> transfer_to_drivefrom airflow import DAG
from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator
from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator
from datetime import datetime
dag = DAG(
'sheets_reporting',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Create new spreadsheet for daily report
create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
task_id='create_spreadsheet',
spreadsheet={
'properties': {
'title': 'Daily Sales Report - {{ ds }}',
'locale': 'en_US',
'timeZone': 'America/New_York'
},
'sheets': [
{
'properties': {
'title': 'Sales Data',
'gridProperties': {
'rowCount': 1000,
'columnCount': 10
}
}
}
]
},
dag=dag
)
# Export SQL data to Sheets
export_sales_data = SQLToGoogleSheetsOperator(
task_id='export_sales_data',
sql='''
SELECT
product_name,
category,
sales_date,
quantity_sold,
unit_price,
total_revenue
FROM sales_summary
WHERE sales_date = '{{ ds }}'
ORDER BY total_revenue DESC
''',
spreadsheet_id='{{ task_instance.xcom_pull(task_ids="create_spreadsheet", key="spreadsheet_id") }}',
range_name='Sales Data!A1',
sql_conn_id='postgres_default',
dag=dag
)
create_spreadsheet >> export_sales_dataGoogle Workspace integration requires OAuth 2.0 authentication with appropriate scopes:
# Required OAuth scopes for Workspace APIs
DRIVE_SCOPES = [
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/drive.file'
]
SHEETS_SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive.file'
]
CALENDAR_SCOPES = [
'https://www.googleapis.com/auth/calendar',
'https://www.googleapis.com/auth/calendar.events'
]from typing import Dict, List, Optional, Any, Union
from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator
# Drive types
DriveFileId = str
FolderId = str
FileMetadata = Dict[str, Any]
DrivePermissions = Dict[str, Any]
# Sheets types
SpreadsheetId = str
SheetRange = str
CellValues = List[List[Any]]
SpreadsheetData = Dict[str, Any]
BatchUpdateRequest = Dict[str, Any]
# Calendar types
CalendarId = str
EventId = str
CalendarEvent = Dict[str, Any]
EventDateTime = Dict[str, str]
EventAttendee = Dict[str, str]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-google