0
# Data Transfer Operations
1
2
Comprehensive data movement capabilities between AWS services and external systems. Provides operators for transferring data between S3, Redshift, databases, FTP/SFTP servers, and other data sources with support for various formats and transformation options.
3
4
## Capabilities
5
6
### S3 to Redshift Transfer
7
8
Transfer data from S3 to Redshift tables with support for COPY command options and data transformation.
9
10
```python { .api }
11
class S3ToRedshiftOperator(BaseOperator):
12
def __init__(self, schema: str, table: str, s3_bucket: str, s3_key: str, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: list = None, autocommit: bool = True, parameters: dict = None, **kwargs):
13
"""
14
Transfer data from S3 to Redshift table.
15
16
Parameters:
17
- schema: Redshift schema name
18
- table: Redshift table name
19
- s3_bucket: S3 bucket containing source data
20
- s3_key: S3 key path for source data
21
- redshift_conn_id: Redshift connection ID
22
- aws_conn_id: AWS connection ID for S3 access
23
- verify: SSL certificate verification
24
- wildcard_match: Use wildcard matching for S3 keys
25
- copy_options: Additional COPY command options
26
- autocommit: Auto-commit the transaction
27
- parameters: Additional parameters for the operation
28
"""
29
```
30
31
### Redshift to S3 Transfer
32
33
Export data from Redshift tables to S3 with support for UNLOAD command options and parallel processing.
34
35
```python { .api }
36
class RedshiftToS3Operator(BaseOperator):
37
def __init__(self, s3_bucket: str, s3_key: str, schema: str = None, table: str = None, select_query: str = None, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, unload_options: list = None, autocommit: bool = True, include_header: bool = False, **kwargs):
38
"""
39
Transfer data from Redshift to S3.
40
41
Parameters:
42
- s3_bucket: S3 bucket for destination data
43
- s3_key: S3 key path for destination data
44
- schema: Redshift schema name (if using table)
45
- table: Redshift table name (if using table)
46
- select_query: Custom SELECT query (alternative to schema/table)
47
- redshift_conn_id: Redshift connection ID
48
- aws_conn_id: AWS connection ID for S3 access
49
- verify: SSL certificate verification
50
- unload_options: Additional UNLOAD command options
51
- autocommit: Auto-commit the transaction
52
- include_header: Include column headers in output
53
"""
54
```
55
56
### SQL to S3 Transfer
57
58
Transfer SQL query results from any database to S3 with support for various file formats and partitioning.
59
60
```python { .api }
61
class SqlToS3Operator(BaseOperator):
62
def __init__(self, query: str, s3_bucket: str, s3_key: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, pd_kwargs: dict = None, index: bool = True, **kwargs):
63
"""
64
Transfer SQL query results to S3.
65
66
Parameters:
67
- query: SQL query to execute
68
- s3_bucket: S3 bucket for destination data
69
- s3_key: S3 key path for destination data
70
- sql_conn_id: SQL database connection ID
71
- aws_conn_id: AWS connection ID for S3 access
72
- verify: SSL certificate verification
73
- replace: Replace existing S3 object
74
- pd_kwargs: Additional pandas parameters for data processing
75
- index: Include DataFrame index in output
76
"""
77
```
78
79
### S3 to SQL Transfer
80
81
Transfer data from S3 to SQL databases with support for data type inference and table creation.
82
83
```python { .api }
84
class S3ToSqlOperator(BaseOperator):
85
def __init__(self, s3_bucket: str, s3_key: str, table: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: dict = None, **kwargs):
86
"""
87
Transfer data from S3 to SQL database table.
88
89
Parameters:
90
- s3_bucket: S3 bucket containing source data
91
- s3_key: S3 key path for source data
92
- table: Destination table name
93
- sql_conn_id: SQL database connection ID
94
- aws_conn_id: AWS connection ID for S3 access
95
- verify: SSL certificate verification
96
- wildcard_match: Use wildcard matching for S3 keys
97
- copy_options: Additional copy options
98
"""
99
```
100
101
### DynamoDB to S3 Transfer
102
103
Export DynamoDB table data to S3 with support for parallel scans and data filtering.
104
105
```python { .api }
106
class DynamoDbToS3Operator(BaseOperator):
107
def __init__(self, dynamodb_table_name: str, s3_bucket_name: str, s3_key: str, aws_conn_id: str = 'aws_default', dynamodb_scan_kwargs: dict = None, s3_key_type: str = 'table_name', **kwargs):
108
"""
109
Export DynamoDB table to S3.
110
111
Parameters:
112
- dynamodb_table_name: DynamoDB table name
113
- s3_bucket_name: S3 bucket for destination data
114
- s3_key: S3 key path for destination data
115
- aws_conn_id: AWS connection ID
116
- dynamodb_scan_kwargs: Additional DynamoDB scan parameters
117
- s3_key_type: S3 key generation strategy
118
"""
119
```
120
121
### File System Transfers
122
123
Transfer operations between S3 and local file systems or remote file servers.
124
125
```python { .api }
126
class LocalFilesystemToS3Operator(BaseOperator):
127
def __init__(self, filename: str, dest_key: str, dest_bucket: str = None, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, **kwargs):
128
"""
129
Transfer local file to S3.
130
131
Parameters:
132
- filename: Local file path
133
- dest_key: S3 destination key
134
- dest_bucket: S3 destination bucket
135
- aws_conn_id: AWS connection ID
136
- verify: SSL certificate verification
137
- replace: Replace existing S3 object
138
"""
139
140
class S3ToFtpOperator(BaseOperator):
141
def __init__(self, s3_bucket: str, s3_key: str, ftp_path: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):
142
"""
143
Transfer S3 object to FTP server.
144
145
Parameters:
146
- s3_bucket: S3 source bucket
147
- s3_key: S3 source key
148
- ftp_path: FTP destination path
149
- ftp_conn_id: FTP connection ID
150
- aws_conn_id: AWS connection ID
151
"""
152
153
class FtpToS3Operator(BaseOperator):
154
def __init__(self, ftp_path: str, s3_bucket: str, s3_key: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):
155
"""
156
Transfer file from FTP server to S3.
157
158
Parameters:
159
- ftp_path: FTP source path
160
- s3_bucket: S3 destination bucket
161
- s3_key: S3 destination key
162
- ftp_conn_id: FTP connection ID
163
- aws_conn_id: AWS connection ID
164
"""
165
166
class S3ToSftpOperator(BaseOperator):
167
def __init__(self, sftp_path: str, s3_bucket: str, s3_key: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):
168
"""
169
Transfer S3 object to SFTP server.
170
171
Parameters:
172
- sftp_path: SFTP destination path
173
- s3_bucket: S3 source bucket
174
- s3_key: S3 source key
175
- sftp_conn_id: SFTP connection ID
176
- aws_conn_id: AWS connection ID
177
"""
178
179
class SftpToS3Operator(BaseOperator):
180
def __init__(self, s3_bucket: str, s3_key: str, sftp_path: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):
181
"""
182
Transfer file from SFTP server to S3.
183
184
Parameters:
185
- s3_bucket: S3 destination bucket
186
- s3_key: S3 destination key
187
- sftp_path: SFTP source path
188
- sftp_conn_id: SFTP connection ID
189
- aws_conn_id: AWS connection ID
190
"""
191
```
192
193
### Web and API Transfers
194
195
Transfer data from web APIs and HTTP sources to S3.
196
197
```python { .api }
198
class HttpToS3Operator(BaseOperator):
199
def __init__(self, endpoint: str, s3_bucket: str, s3_key: str, http_conn_id: str = 'http_default', aws_conn_id: str = 'aws_default', method: str = 'GET', headers: dict = None, **kwargs):
200
"""
201
Transfer HTTP response data to S3.
202
203
Parameters:
204
- endpoint: HTTP endpoint URL
205
- s3_bucket: S3 destination bucket
206
- s3_key: S3 destination key
207
- http_conn_id: HTTP connection ID
208
- aws_conn_id: AWS connection ID
209
- method: HTTP method (GET, POST, etc.)
210
- headers: HTTP request headers
211
"""
212
213
class GoogleApiToS3Operator(BaseOperator):
214
def __init__(self, google_api_service_name: str, google_api_service_version: str, google_api_endpoint_path: str, s3_bucket: str, s3_key: str, google_conn_id: str = 'google_cloud_default', aws_conn_id: str = 'aws_default', **kwargs):
215
"""
216
Transfer Google API data to S3.
217
218
Parameters:
219
- google_api_service_name: Google API service name
220
- google_api_service_version: Google API service version
221
- google_api_endpoint_path: API endpoint path
222
- s3_bucket: S3 destination bucket
223
- s3_key: S3 destination key
224
- google_conn_id: Google Cloud connection ID
225
- aws_conn_id: AWS connection ID
226
"""
227
```
228
229
### Database Transfers
230
231
Transfer data between S3 and various database systems.
232
233
```python { .api }
234
class MongoToS3Operator(BaseOperator):
235
def __init__(self, mongo_collection: str, s3_bucket: str, s3_key: str, mongo_conn_id: str = 'mongo_default', aws_conn_id: str = 'aws_default', mongo_query: dict = None, **kwargs):
236
"""
237
Transfer MongoDB collection data to S3.
238
239
Parameters:
240
- mongo_collection: MongoDB collection name
241
- s3_bucket: S3 destination bucket
242
- s3_key: S3 destination key
243
- mongo_conn_id: MongoDB connection ID
244
- aws_conn_id: AWS connection ID
245
- mongo_query: MongoDB query filter
246
"""
247
```
248
249
## Usage Examples
250
251
### S3 to Redshift Data Pipeline
252
253
```python
254
from airflow import DAG
255
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
256
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
257
258
dag = DAG('s3_to_redshift_pipeline', start_date=datetime(2023, 1, 1))
259
260
# Wait for data file in S3
261
wait_for_data = S3KeySensor(
262
task_id='wait_for_data',
263
bucket_name='data-lake-bucket',
264
bucket_key='raw-data/{{ ds }}/sales_data.csv',
265
timeout=3600,
266
dag=dag
267
)
268
269
# Load data into Redshift
270
load_to_redshift = S3ToRedshiftOperator(
271
task_id='load_to_redshift',
272
schema='analytics',
273
table='daily_sales',
274
s3_bucket='data-lake-bucket',
275
s3_key='raw-data/{{ ds }}/sales_data.csv',
276
redshift_conn_id='redshift_prod',
277
aws_conn_id='aws_default',
278
copy_options=[
279
"CSV",
280
"IGNOREHEADER 1",
281
"DATEFORMAT 'YYYY-MM-DD'",
282
"TRUNCATECOLUMNS"
283
],
284
dag=dag
285
)
286
287
wait_for_data >> load_to_redshift
288
```
289
290
### Database Export to S3
291
292
```python
293
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
294
295
# Export PostgreSQL query results to S3
296
export_sales_data = SqlToS3Operator(
297
task_id='export_sales_data',
298
query="""
299
SELECT
300
date_trunc('day', order_date) as order_day,
301
customer_id,
302
product_id,
303
quantity,
304
price,
305
total_amount
306
FROM orders
307
WHERE order_date >= '{{ ds }}'
308
AND order_date < '{{ next_ds }}'
309
""",
310
s3_bucket='analytics-exports',
311
s3_key='exports/sales/{{ ds }}/daily_sales.parquet',
312
sql_conn_id='postgres_prod',
313
aws_conn_id='aws_default',
314
pd_kwargs={
315
'dtype': {
316
'customer_id': 'int64',
317
'product_id': 'int64',
318
'quantity': 'int32',
319
'price': 'float64'
320
}
321
},
322
dag=dag
323
)
324
```
325
326
### Multi-Source Data Integration
327
328
```python
329
from airflow.providers.amazon.aws.transfers.ftp_to_s3 import FtpToS3Operator
330
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
331
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
332
333
# Download files from multiple sources
334
ftp_download = FtpToS3Operator(
335
task_id='download_from_ftp',
336
ftp_path='/data/exports/customer_data.csv',
337
s3_bucket='integration-staging',
338
s3_key='sources/ftp/{{ ds }}/customer_data.csv',
339
ftp_conn_id='partner_ftp',
340
dag=dag
341
)
342
343
api_download = HttpToS3Operator(
344
task_id='download_from_api',
345
endpoint='/api/v1/products?date={{ ds }}',
346
s3_bucket='integration-staging',
347
s3_key='sources/api/{{ ds }}/products.json',
348
http_conn_id='partner_api',
349
headers={'Authorization': 'Bearer {{ var.value.api_token }}'},
350
dag=dag
351
)
352
353
# Load all data into warehouse
354
load_customers = S3ToRedshiftOperator(
355
task_id='load_customers',
356
schema='staging',
357
table='customers',
358
s3_bucket='integration-staging',
359
s3_key='sources/ftp/{{ ds }}/',
360
wildcard_match=True,
361
copy_options=["CSV", "IGNOREHEADER 1"],
362
dag=dag
363
)
364
365
# Parallel downloads, then load
366
[ftp_download, api_download] >> load_customers
367
```
368
369
### Data Export Pipeline
370
371
```python
372
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
373
from airflow.providers.amazon.aws.transfers.s3_to_sftp import S3ToSftpOperator
374
375
# Export aggregated data from Redshift
376
export_summary = RedshiftToS3Operator(
377
task_id='export_summary',
378
select_query="""
379
SELECT
380
region,
381
product_category,
382
DATE_TRUNC('month', sale_date) as month,
383
SUM(revenue) as total_revenue,
384
COUNT(*) as transaction_count
385
FROM fact_sales
386
WHERE sale_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
387
AND sale_date < DATE_TRUNC('month', CURRENT_DATE)
388
GROUP BY region, product_category, month
389
""",
390
s3_bucket='analytics-exports',
391
s3_key='reports/monthly_summary_{{ ds }}.csv',
392
redshift_conn_id='redshift_prod',
393
unload_options=[
394
"HEADER",
395
"DELIMITER ','",
396
"NULL AS ''"
397
],
398
include_header=True,
399
dag=dag
400
)
401
402
# Send report to external partner
403
send_to_partner = S3ToSftpOperator(
404
task_id='send_to_partner',
405
s3_bucket='analytics-exports',
406
s3_key='reports/monthly_summary_{{ ds }}.csv000',
407
sftp_path='/uploads/monthly_summary_{{ ds }}.csv',
408
sftp_conn_id='partner_sftp',
409
dag=dag
410
)
411
412
export_summary >> send_to_partner
413
```
414
415
### DynamoDB Backup to S3
416
417
```python
418
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDbToS3Operator
419
420
# Daily backup of DynamoDB table
421
backup_user_sessions = DynamoDbToS3Operator(
422
task_id='backup_user_sessions',
423
dynamodb_table_name='user_sessions',
424
s3_bucket_name='dynamodb-backups',
425
s3_key='backups/user_sessions/{{ ds }}/sessions.json',
426
aws_conn_id='aws_default',
427
dynamodb_scan_kwargs={
428
'FilterExpression': 'attribute_exists(session_id)',
429
'ProjectionExpression': 'session_id, user_id, created_at, last_active'
430
},
431
dag=dag
432
)
433
```
434
435
## Types
436
437
```python { .api }
438
# Transfer configuration types
439
class TransferConfig:
440
source_conn_id: str
441
dest_conn_id: str
442
batch_size: int = 1000
443
parallel: bool = False
444
verify_data: bool = True
445
446
# S3 transfer options
447
class S3TransferOptions:
448
multipart_threshold: int = 8 * 1024 * 1024 # 8MB
449
max_concurrency: int = 10
450
multipart_chunksize: int = 8 * 1024 * 1024
451
use_threads: bool = True
452
453
# Redshift COPY options
454
class RedshiftCopyOptions:
455
format: str = 'CSV'
456
delimiter: str = ','
457
quote_character: str = '"'
458
escape_character: str = None
459
null_as: str = ''
460
ignore_header: int = 0
461
date_format: str = 'auto'
462
time_format: str = 'auto'
463
ignore_blank_lines: bool = True
464
truncate_columns: bool = False
465
fill_record: bool = False
466
blanks_as_null: bool = True
467
empty_as_null: bool = True
468
explicit_ids: bool = False
469
acceptanydate: bool = False
470
acceptinvchars: str = None
471
max_error: int = 0
472
compupdate: bool = True
473
statupdate: bool = True
474
475
# File format types
476
class FileFormat:
477
CSV = 'csv'
478
JSON = 'json'
479
PARQUET = 'parquet'
480
AVRO = 'avro'
481
ORC = 'orc'
482
TSV = 'tsv'
483
484
# Compression types
485
class CompressionType:
486
NONE = None
487
GZIP = 'gzip'
488
BZIP2 = 'bz2'
489
LZOP = 'lzop'
490
ZSTD = 'zstd'
491
492
# Transfer status
493
class TransferStatus:
494
PENDING = 'pending'
495
RUNNING = 'running'
496
SUCCESS = 'success'
497
FAILED = 'failed'
498
CANCELLED = 'cancelled'
499
```