0
# Data Transfer Operations
1
2
Extensive transfer capabilities between Google services and external systems including AWS S3, Azure Blob Storage, SFTP, local filesystems, and various databases. Enables seamless data movement across cloud platforms and on-premises systems.
3
4
## Capabilities
5
6
### Cloud Storage Transfers
7
8
Transfer operations to and from Google Cloud Storage with various external systems.
9
10
```python { .api }
11
class GCSToBigQueryOperator(BaseOperator):
12
"""
13
Transfers data from Google Cloud Storage to BigQuery tables.
14
15
Args:
16
bucket (str): GCS bucket name
17
source_objects (List[str]): List of GCS object paths
18
destination_project_dataset_table (str): BigQuery destination in format project.dataset.table
19
schema_fields (List[Dict]): Table schema definition
20
write_disposition (str): Write mode (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
21
source_format (str): Source data format (CSV, JSON, AVRO, PARQUET)
22
gcp_conn_id (str): Connection ID for Google Cloud Platform
23
24
Returns:
25
BigQuery job result
26
"""
27
def __init__(
28
self,
29
bucket: str,
30
source_objects: List[str],
31
destination_project_dataset_table: str,
32
schema_fields: Optional[List[Dict]] = None,
33
write_disposition: str = "WRITE_EMPTY",
34
source_format: str = "CSV",
35
gcp_conn_id: str = "google_cloud_default",
36
**kwargs
37
): ...
38
39
class BigQueryToGCSOperator(BaseOperator):
40
"""
41
Transfers data from BigQuery to Google Cloud Storage.
42
43
Args:
44
source_project_dataset_table (str): BigQuery source table
45
destination_cloud_storage_uris (List[str]): GCS destination URIs
46
compression (str): Output compression format
47
export_format (str): Export format (CSV, JSON, AVRO, PARQUET)
48
gcp_conn_id (str): Connection ID for Google Cloud Platform
49
50
Returns:
51
Export job result with file locations
52
"""
53
def __init__(
54
self,
55
source_project_dataset_table: str,
56
destination_cloud_storage_uris: List[str],
57
compression: str = "NONE",
58
export_format: str = "CSV",
59
gcp_conn_id: str = "google_cloud_default",
60
**kwargs
61
): ...
62
63
class S3ToGCSOperator(BaseOperator):
64
"""
65
Transfers objects from Amazon S3 to Google Cloud Storage.
66
67
Args:
68
bucket (str): S3 bucket name
69
prefix (str): S3 object key prefix
70
gcp_conn_id (str): Connection ID for Google Cloud Platform
71
aws_conn_id (str): Connection ID for Amazon Web Services
72
dest_gcs_conn_id (str): Destination GCS connection ID
73
dest_bucket (str): Destination GCS bucket name
74
dest_prefix (str): Destination GCS object prefix
75
replace (bool): Whether to replace existing objects
76
77
Returns:
78
List of transferred object keys
79
"""
80
def __init__(
81
self,
82
bucket: str,
83
prefix: str = "",
84
gcp_conn_id: str = "google_cloud_default",
85
aws_conn_id: str = "aws_default",
86
dest_gcs_conn_id: Optional[str] = None,
87
dest_bucket: Optional[str] = None,
88
dest_prefix: str = "",
89
replace: bool = True,
90
**kwargs
91
): ...
92
93
class AzureBlobStorageToGCSOperator(BaseOperator):
94
"""
95
Transfers blobs from Azure Blob Storage to Google Cloud Storage.
96
97
Args:
98
blob_name (str): Azure blob name or prefix
99
file_path (str): Destination GCS object path
100
container_name (str): Azure container name
101
bucket_name (str): Destination GCS bucket name
102
azure_conn_id (str): Connection ID for Azure Blob Storage
103
gcp_conn_id (str): Connection ID for Google Cloud Platform
104
105
Returns:
106
GCS object path of transferred data
107
"""
108
def __init__(
109
self,
110
blob_name: str,
111
file_path: str,
112
container_name: str,
113
bucket_name: str,
114
azure_conn_id: str = "azure_blob_default",
115
gcp_conn_id: str = "google_cloud_default",
116
**kwargs
117
): ...
118
```
119
120
### Database Transfers
121
122
Transfer operations between various databases and Google Cloud services.
123
124
```python { .api }
125
class MySQLToGCSOperator(BaseOperator):
126
"""
127
Transfers data from MySQL database to Google Cloud Storage.
128
129
Args:
130
sql (str): SQL query to execute
131
bucket (str): Destination GCS bucket name
132
filename (str): Destination GCS object path
133
schema_filename (str): Optional schema file path in GCS
134
mysql_conn_id (str): Connection ID for MySQL database
135
gcp_conn_id (str): Connection ID for Google Cloud Platform
136
137
Returns:
138
GCS object path of exported data
139
"""
140
def __init__(
141
self,
142
sql: str,
143
bucket: str,
144
filename: str,
145
schema_filename: Optional[str] = None,
146
mysql_conn_id: str = "mysql_default",
147
gcp_conn_id: str = "google_cloud_default",
148
**kwargs
149
): ...
150
151
class PostgrestoGCSOperator(BaseOperator):
152
"""
153
Transfers data from PostgreSQL database to Google Cloud Storage.
154
155
Args:
156
sql (str): SQL query to execute
157
bucket (str): Destination GCS bucket name
158
filename (str): Destination GCS object path
159
schema_filename (str): Optional schema file path in GCS
160
postgres_conn_id (str): Connection ID for PostgreSQL database
161
gcp_conn_id (str): Connection ID for Google Cloud Platform
162
163
Returns:
164
GCS object path of exported data
165
"""
166
def __init__(
167
self,
168
sql: str,
169
bucket: str,
170
filename: str,
171
schema_filename: Optional[str] = None,
172
postgres_conn_id: str = "postgres_default",
173
gcp_conn_id: str = "google_cloud_default",
174
**kwargs
175
): ...
176
177
class BigQueryToPostgresOperator(BaseOperator):
178
"""
179
Transfers data from BigQuery to PostgreSQL database.
180
181
Args:
182
dataset_table (str): BigQuery source table
183
target_table_name (str): PostgreSQL destination table name
184
postgres_conn_id (str): Connection ID for PostgreSQL database
185
bigquery_conn_id (str): Connection ID for BigQuery
186
187
Returns:
188
Number of rows transferred
189
"""
190
def __init__(
191
self,
192
dataset_table: str,
193
target_table_name: str,
194
postgres_conn_id: str = "postgres_default",
195
bigquery_conn_id: str = "google_cloud_default",
196
**kwargs
197
): ...
198
```
199
200
### File System Transfers
201
202
Transfer operations between GCS and various file systems.
203
204
```python { .api }
205
class GCSToLocalFilesystemOperator(BaseOperator):
206
"""
207
Downloads objects from Google Cloud Storage to local filesystem.
208
209
Args:
210
bucket (str): GCS bucket name
211
object_name (str): GCS object path
212
filename (str): Local filesystem destination path
213
gcp_conn_id (str): Connection ID for Google Cloud Platform
214
215
Returns:
216
Local file path of downloaded object
217
"""
218
def __init__(
219
self,
220
bucket: str,
221
object_name: str,
222
filename: str,
223
gcp_conn_id: str = "google_cloud_default",
224
**kwargs
225
): ...
226
227
class SFTPToGCSOperator(BaseOperator):
228
"""
229
Transfers files from SFTP server to Google Cloud Storage.
230
231
Args:
232
source_path (str): SFTP source file path
233
destination_bucket (str): GCS destination bucket name
234
destination_path (str): GCS destination object path
235
sftp_conn_id (str): Connection ID for SFTP server
236
gcp_conn_id (str): Connection ID for Google Cloud Platform
237
move_object (bool): Whether to delete source after transfer
238
239
Returns:
240
GCS object path of transferred file
241
"""
242
def __init__(
243
self,
244
source_path: str,
245
destination_bucket: str,
246
destination_path: str,
247
sftp_conn_id: str = "sftp_default",
248
gcp_conn_id: str = "google_cloud_default",
249
move_object: bool = False,
250
**kwargs
251
): ...
252
```
253
254
## Usage Examples
255
256
### Multi-Stage ETL Pipeline
257
258
```python
259
from airflow import DAG
260
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
261
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
262
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
263
from datetime import datetime
264
265
dag = DAG(
266
'multi_cloud_etl',
267
default_args={'start_date': datetime(2023, 1, 1)},
268
schedule_interval='@daily',
269
catchup=False
270
)
271
272
# Transfer raw data from S3 to GCS
273
s3_to_gcs = S3ToGCSOperator(
274
task_id='s3_to_gcs',
275
bucket='source-data-bucket',
276
prefix='raw-data/{{ ds }}/',
277
dest_bucket='processed-data-lake',
278
dest_prefix='staging/{{ ds }}/',
279
aws_conn_id='aws_default',
280
dag=dag
281
)
282
283
# Load data into BigQuery for processing
284
gcs_to_bq = GCSToBigQueryOperator(
285
task_id='gcs_to_bq',
286
bucket='processed-data-lake',
287
source_objects=['staging/{{ ds }}/*.csv'],
288
destination_project_dataset_table='analytics.raw_data.daily_imports',
289
schema_fields=[
290
{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
291
{'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'},
292
{'name': 'event_type', 'type': 'STRING', 'mode': 'REQUIRED'},
293
{'name': 'properties', 'type': 'JSON', 'mode': 'NULLABLE'},
294
],
295
write_disposition='WRITE_TRUNCATE',
296
dag=dag
297
)
298
299
# Export processed results
300
bq_to_gcs = BigQueryToGCSOperator(
301
task_id='bq_to_gcs',
302
source_project_dataset_table='analytics.processed_data.user_metrics',
303
destination_cloud_storage_uris=[
304
'gs://processed-data-lake/exports/{{ ds }}/user_metrics.parquet'
305
],
306
export_format='PARQUET',
307
dag=dag
308
)
309
310
s3_to_gcs >> gcs_to_bq >> bq_to_gcs
311
```
312
313
### Database Migration Pipeline
314
315
```python
316
from airflow import DAG
317
from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator
318
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
319
from datetime import datetime
320
321
dag = DAG(
322
'database_migration',
323
default_args={'start_date': datetime(2023, 1, 1)},
324
schedule_interval='@daily',
325
catchup=False
326
)
327
328
# Extract data from MySQL
329
mysql_to_gcs = MySQLToGCSOperator(
330
task_id='mysql_to_gcs',
331
sql='''
332
SELECT
333
customer_id,
334
order_date,
335
total_amount,
336
status,
337
created_at,
338
updated_at
339
FROM orders
340
WHERE DATE(created_at) = '{{ ds }}'
341
''',
342
bucket='data-migration',
343
filename='orders/{{ ds }}/orders.csv',
344
mysql_conn_id='mysql_prod',
345
dag=dag
346
)
347
348
# Load into BigQuery
349
gcs_to_bigquery = GCSToBigQueryOperator(
350
task_id='gcs_to_bigquery',
351
bucket='data-migration',
352
source_objects=['orders/{{ ds }}/orders.csv'],
353
destination_project_dataset_table='warehouse.sales.orders',
354
schema_fields=[
355
{'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
356
{'name': 'order_date', 'type': 'DATE', 'mode': 'REQUIRED'},
357
{'name': 'total_amount', 'type': 'NUMERIC', 'mode': 'REQUIRED'},
358
{'name': 'status', 'type': 'STRING', 'mode': 'REQUIRED'},
359
{'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
360
{'name': 'updated_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
361
],
362
write_disposition='WRITE_APPEND',
363
dag=dag
364
)
365
366
mysql_to_gcs >> gcs_to_bigquery
367
```
368
369
## Types
370
371
```python { .api }
372
from typing import List, Optional, Dict, Any, Union
373
from airflow.models import BaseOperator
374
375
# Transfer operation types
376
SourcePath = str
377
DestinationPath = str
378
BucketName = str
379
ObjectKey = str
380
TableReference = str
381
382
# Schema and format types
383
SchemaField = Dict[str, str]
384
WriteDisposition = str # WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY
385
SourceFormat = str # CSV, JSON, AVRO, PARQUET
386
ExportFormat = str # CSV, JSON, AVRO, PARQUET
387
CompressionType = str # NONE, GZIP, DEFLATE, SNAPPY
388
389
# Connection types
390
ConnectionId = str
391
TransferResult = Dict[str, Any]
392
```