0
# Google Workspace Integration
1
2
Google Workspace (formerly G Suite) integration for Drive, Sheets, and Calendar. Enables document management, spreadsheet automation, and calendar scheduling within data pipelines for collaborative productivity workflows.
3
4
## Capabilities
5
6
### Google Drive
7
8
Google Drive API integration for file and folder management, sharing, and access control.
9
10
```python { .api }
11
class GoogleDriveHook(GoogleBaseHook):
12
def __init__(
13
self,
14
gcp_conn_id: str = "google_cloud_default",
15
api_version: str = "v3",
16
**kwargs
17
): ...
18
19
def get_conn(self): ...
20
def upload_file(
21
self,
22
local_location: str,
23
remote_location: str,
24
chunk_size: int = 104857600,
25
resumable: bool = False
26
): ...
27
def download_file(
28
self,
29
file_id: str,
30
file_handle: Any,
31
chunk_size: int = 104857600
32
): ...
33
def create_file(
34
self,
35
file_metadata: Dict[str, Any],
36
media_body: Optional[Any] = None,
37
fields: str = "id"
38
): ...
39
def delete_file(
40
self,
41
file_id: str
42
): ...
43
def get_file_id(
44
self,
45
folder_id: str,
46
file_name: str,
47
drive_id: Optional[str] = None
48
): ...
49
50
class GoogleDriveFileExistenceSensor(BaseSensorOperator):
51
def __init__(
52
self,
53
folder_id: str,
54
file_name: str,
55
drive_id: Optional[str] = None,
56
gcp_conn_id: str = "google_cloud_default",
57
**kwargs
58
): ...
59
```
60
61
### Google Sheets
62
63
Google Sheets API integration for spreadsheet creation, data manipulation, and automation.
64
65
```python { .api }
66
class GSheetsHook(GoogleBaseHook):
67
def __init__(
68
self,
69
gcp_conn_id: str = "google_cloud_default",
70
api_version: str = "v4",
71
**kwargs
72
): ...
73
74
def get_conn(self): ...
75
def get_values(
76
self,
77
spreadsheet_id: str,
78
range_: str,
79
major_dimension: str = "ROWS",
80
value_render_option: str = "FORMATTED_VALUE",
81
date_time_render_option: str = "SERIAL_NUMBER"
82
): ...
83
def update_values(
84
self,
85
spreadsheet_id: str,
86
range_: str,
87
values: List[List[Any]],
88
major_dimension: str = "ROWS",
89
value_input_option: str = "RAW",
90
include_values_in_response: bool = False,
91
value_render_option: str = "FORMATTED_VALUE",
92
date_time_render_option: str = "SERIAL_NUMBER"
93
): ...
94
def append_values(
95
self,
96
spreadsheet_id: str,
97
range_: str,
98
values: List[List[Any]],
99
major_dimension: str = "ROWS",
100
value_input_option: str = "RAW",
101
insert_data_option: str = "OVERWRITE",
102
include_values_in_response: bool = False,
103
value_render_option: str = "FORMATTED_VALUE",
104
date_time_render_option: str = "SERIAL_NUMBER"
105
): ...
106
def clear_values(
107
self,
108
spreadsheet_id: str,
109
range_: str
110
): ...
111
def batch_update(
112
self,
113
spreadsheet_id: str,
114
body: Dict[str, Any]
115
): ...
116
def create(
117
self,
118
body: Dict[str, Any]
119
): ...
120
121
class GoogleSheetsCreateSpreadsheetOperator(BaseOperator):
122
def __init__(
123
self,
124
spreadsheet: Dict[str, Any],
125
gcp_conn_id: str = "google_cloud_default",
126
**kwargs
127
): ...
128
```
129
130
### Google Calendar
131
132
Google Calendar API integration for event management and scheduling automation.
133
134
```python { .api }
135
class GoogleCalendarHook(GoogleBaseHook):
136
def __init__(
137
self,
138
gcp_conn_id: str = "google_cloud_default",
139
api_version: str = "v3",
140
**kwargs
141
): ...
142
143
def get_conn(self): ...
144
def get_events(
145
self,
146
calendar_id: str = "primary",
147
time_min: Optional[str] = None,
148
time_max: Optional[str] = None,
149
max_results: Optional[int] = None,
150
single_events: bool = False,
151
order_by: Optional[str] = None
152
): ...
153
def create_event(
154
self,
155
event: Dict[str, Any],
156
calendar_id: str = "primary",
157
send_notifications: bool = False,
158
send_updates: str = "none"
159
): ...
160
def delete_event(
161
self,
162
event_id: str,
163
calendar_id: str = "primary",
164
send_notifications: bool = False,
165
send_updates: str = "none"
166
): ...
167
def quick_add_event(
168
self,
169
text: str,
170
calendar_id: str = "primary",
171
send_notifications: bool = False
172
): ...
173
```
174
175
### Transfer Operations
176
177
Specialized operators for transferring data between Google Cloud services and Google Workspace applications.
178
179
```python { .api }
180
class GCSToGoogleDriveOperator(BaseOperator):
181
"""
182
Transfers files from Google Cloud Storage to Google Drive.
183
184
Args:
185
source_bucket (str): GCS bucket name
186
source_object (str): GCS object path
187
destination_object (str): Google Drive file name
188
folder_id (str): Google Drive folder ID for destination
189
gcp_conn_id (str): Connection ID for Google Cloud Platform
190
move_object (bool): Whether to delete source after transfer
191
192
Returns:
193
Google Drive file ID of transferred file
194
"""
195
def __init__(
196
self,
197
source_bucket: str,
198
source_object: str,
199
destination_object: str,
200
folder_id: str,
201
gcp_conn_id: str = "google_cloud_default",
202
move_object: bool = False,
203
**kwargs
204
): ...
205
206
class LocalFilesystemToGoogleDriveOperator(BaseOperator):
207
"""
208
Transfers files from local filesystem to Google Drive.
209
210
Args:
211
local_paths (List[str]): Local file paths to transfer
212
folder_id (str): Google Drive destination folder ID
213
ignore_if_missing (bool): Whether to ignore missing local files
214
gcp_conn_id (str): Connection ID for Google Cloud Platform
215
216
Returns:
217
List of Google Drive file IDs
218
"""
219
def __init__(
220
self,
221
local_paths: List[str],
222
folder_id: str,
223
ignore_if_missing: bool = False,
224
gcp_conn_id: str = "google_cloud_default",
225
**kwargs
226
): ...
227
228
class GCSToGoogleSheetsOperator(BaseOperator):
229
"""
230
Transfers data from Google Cloud Storage to Google Sheets.
231
232
Args:
233
source_bucket (str): GCS bucket name
234
source_object (str): GCS object path (CSV format)
235
spreadsheet_id (str): Google Sheets spreadsheet ID
236
range_name (str): Target range in A1 notation
237
gcp_conn_id (str): Connection ID for Google Cloud Platform
238
clear_sheet (bool): Whether to clear existing data
239
240
Returns:
241
Number of rows updated in the spreadsheet
242
"""
243
def __init__(
244
self,
245
source_bucket: str,
246
source_object: str,
247
spreadsheet_id: str,
248
range_name: str = "Sheet1",
249
gcp_conn_id: str = "google_cloud_default",
250
clear_sheet: bool = False,
251
**kwargs
252
): ...
253
254
class SQLToGoogleSheetsOperator(BaseOperator):
255
"""
256
Transfers SQL query results to Google Sheets.
257
258
Args:
259
sql (str): SQL query to execute
260
spreadsheet_id (str): Google Sheets spreadsheet ID
261
range_name (str): Target range in A1 notation
262
sql_conn_id (str): Connection ID for SQL database
263
gcp_conn_id (str): Connection ID for Google Cloud Platform
264
parameters (Dict): SQL query parameters
265
266
Returns:
267
Number of rows updated in the spreadsheet
268
"""
269
def __init__(
270
self,
271
sql: str,
272
spreadsheet_id: str,
273
range_name: str = "Sheet1",
274
sql_conn_id: str = "default",
275
gcp_conn_id: str = "google_cloud_default",
276
parameters: Optional[Dict] = None,
277
**kwargs
278
): ...
279
```
280
281
## Usage Examples
282
283
### Drive File Management
284
285
```python
286
from airflow import DAG
287
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
288
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
289
from airflow.providers.google.suite.transfers.gcs_to_drive import GCSToGoogleDriveOperator
290
from datetime import datetime
291
292
dag = DAG(
293
'drive_file_management',
294
default_args={'start_date': datetime(2023, 1, 1)},
295
schedule_interval='@daily',
296
catchup=False
297
)
298
299
# Wait for file in Drive
300
wait_for_file = GoogleDriveFileExistenceSensor(
301
task_id='wait_for_file',
302
folder_id='1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms',
303
file_name='input_data.csv',
304
timeout=300,
305
poke_interval=60,
306
dag=dag
307
)
308
309
# Transfer processed file to Drive
310
transfer_to_drive = GCSToGoogleDriveOperator(
311
task_id='transfer_to_drive',
312
source_bucket='processed-data',
313
source_object='outputs/{{ ds }}/results.csv',
314
destination_object='Daily Results {{ ds }}.csv',
315
folder_id='1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms',
316
dag=dag
317
)
318
319
wait_for_file >> transfer_to_drive
320
```
321
322
### Sheets Data Pipeline
323
324
```python
325
from airflow import DAG
326
from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator
327
from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator
328
from datetime import datetime
329
330
dag = DAG(
331
'sheets_reporting',
332
default_args={'start_date': datetime(2023, 1, 1)},
333
schedule_interval='@daily',
334
catchup=False
335
)
336
337
# Create new spreadsheet for daily report
338
create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
339
task_id='create_spreadsheet',
340
spreadsheet={
341
'properties': {
342
'title': 'Daily Sales Report - {{ ds }}',
343
'locale': 'en_US',
344
'timeZone': 'America/New_York'
345
},
346
'sheets': [
347
{
348
'properties': {
349
'title': 'Sales Data',
350
'gridProperties': {
351
'rowCount': 1000,
352
'columnCount': 10
353
}
354
}
355
}
356
]
357
},
358
dag=dag
359
)
360
361
# Export SQL data to Sheets
362
export_sales_data = SQLToGoogleSheetsOperator(
363
task_id='export_sales_data',
364
sql='''
365
SELECT
366
product_name,
367
category,
368
sales_date,
369
quantity_sold,
370
unit_price,
371
total_revenue
372
FROM sales_summary
373
WHERE sales_date = '{{ ds }}'
374
ORDER BY total_revenue DESC
375
''',
376
spreadsheet_id='{{ task_instance.xcom_pull(task_ids="create_spreadsheet", key="spreadsheet_id") }}',
377
range_name='Sales Data!A1',
378
sql_conn_id='postgres_default',
379
dag=dag
380
)
381
382
create_spreadsheet >> export_sales_data
383
```
384
385
## Authentication
386
387
Google Workspace integration requires OAuth 2.0 authentication with appropriate scopes:
388
389
```python
390
# Required OAuth scopes for Workspace APIs
391
DRIVE_SCOPES = [
392
'https://www.googleapis.com/auth/drive',
393
'https://www.googleapis.com/auth/drive.file'
394
]
395
396
SHEETS_SCOPES = [
397
'https://www.googleapis.com/auth/spreadsheets',
398
'https://www.googleapis.com/auth/drive.file'
399
]
400
401
CALENDAR_SCOPES = [
402
'https://www.googleapis.com/auth/calendar',
403
'https://www.googleapis.com/auth/calendar.events'
404
]
405
```
406
407
## Types
408
409
```python { .api }
410
from typing import Dict, List, Optional, Any, Union
411
from airflow.models import BaseOperator
412
from airflow.sensors.base import BaseSensorOperator
413
414
# Drive types
415
DriveFileId = str
416
FolderId = str
417
FileMetadata = Dict[str, Any]
418
DrivePermissions = Dict[str, Any]
419
420
# Sheets types
421
SpreadsheetId = str
422
SheetRange = str
423
CellValues = List[List[Any]]
424
SpreadsheetData = Dict[str, Any]
425
BatchUpdateRequest = Dict[str, Any]
426
427
# Calendar types
428
CalendarId = str
429
EventId = str
430
CalendarEvent = Dict[str, Any]
431
EventDateTime = Dict[str, str]
432
EventAttendee = Dict[str, str]
433
```