0
# Data Transfer Operations
1
2
Transfer operators for moving data from various source systems into MySQL tables. These operators provide high-level task definitions for Airflow DAGs, supporting bulk loading, transformation options, and integration with multiple data sources.
3
4
## Capabilities
5
6
### S3 to MySQL Transfer
7
8
Transfer data from Amazon S3 files directly into MySQL tables using bulk loading operations.
9
10
```python { .api }
11
class S3ToMySqlOperator(BaseOperator):
12
"""
13
Load a file from S3 into a MySQL table.
14
15
Template fields: s3_source_key, mysql_table
16
"""
17
18
def __init__(
19
self,
20
s3_source_key: str,
21
mysql_table: str,
22
mysql_duplicate_key_handling: str = "IGNORE",
23
mysql_extra_options: str = None,
24
aws_conn_id: str = "aws_default",
25
mysql_conn_id: str = "mysql_default",
26
mysql_local_infile: bool = False,
27
**kwargs
28
):
29
"""
30
Initialize S3 to MySQL transfer operator.
31
32
Parameters:
33
- s3_source_key: S3 key path to source file (templated)
34
- mysql_table: Target MySQL table name (templated)
35
- mysql_duplicate_key_handling: Handle duplicates ("IGNORE" or "REPLACE")
36
- mysql_extra_options: Additional MySQL LOAD DATA options
37
- aws_conn_id: S3 connection ID for credentials
38
- mysql_conn_id: MySQL connection ID
39
- mysql_local_infile: Enable local_infile option on MySQLHook
40
"""
41
42
def execute(self, context: Context) -> None:
43
"""
44
Execute S3 to MySQL data transfer.
45
46
Downloads file from S3 and loads into MySQL using bulk operations.
47
"""
48
```
49
50
### Vertica to MySQL Transfer
51
52
Transfer data from Vertica databases to MySQL with support for both bulk loading and regular insert operations.
53
54
```python { .api }
55
class VerticaToMySqlOperator(BaseOperator):
56
"""
57
Move data from Vertica to MySQL.
58
59
Template fields: sql, mysql_table, mysql_preoperator, mysql_postoperator
60
"""
61
62
def __init__(
63
self,
64
sql: str,
65
mysql_table: str,
66
vertica_conn_id: str = "vertica_default",
67
mysql_conn_id: str = "mysql_default",
68
mysql_preoperator: str = None,
69
mysql_postoperator: str = None,
70
bulk_load: bool = False,
71
**kwargs
72
):
73
"""
74
Initialize Vertica to MySQL transfer operator.
75
76
Parameters:
77
- sql: SQL query to execute against Vertica database (templated)
78
- mysql_table: Target MySQL table, supports dot notation (templated)
79
- vertica_conn_id: Source Vertica connection ID
80
- mysql_conn_id: Target MySQL connection ID
81
- mysql_preoperator: SQL statement to run before import (templated)
82
- mysql_postoperator: SQL statement to run after import (templated)
83
- bulk_load: Use LOAD DATA LOCAL INFILE for bulk operations
84
"""
85
86
def execute(self, context: Context):
87
"""
88
Execute Vertica to MySQL data transfer.
89
90
Supports both bulk load (via temporary files) and regular insert operations.
91
"""
92
```
93
94
### Presto to MySQL Transfer
95
96
Transfer data from Presto queries to MySQL tables using in-memory operations for small to medium datasets.
97
98
```python { .api }
99
class PrestoToMySqlOperator(BaseOperator):
100
"""
101
Move data from Presto to MySQL.
102
103
Note: Data is loaded into memory, suitable for small amounts of data.
104
Template fields: sql, mysql_table, mysql_preoperator
105
"""
106
107
def __init__(
108
self,
109
sql: str,
110
mysql_table: str,
111
presto_conn_id: str = "presto_default",
112
mysql_conn_id: str = "mysql_default",
113
mysql_preoperator: str = None,
114
**kwargs
115
):
116
"""
117
Initialize Presto to MySQL transfer operator.
118
119
Parameters:
120
- sql: SQL query to execute against Presto (templated)
121
- mysql_table: Target MySQL table, supports dot notation (templated)
122
- presto_conn_id: Source Presto connection ID
123
- mysql_conn_id: Target MySQL connection ID
124
- mysql_preoperator: SQL statement to run before import (templated)
125
"""
126
127
def execute(self, context: Context) -> None:
128
"""
129
Execute Presto to MySQL data transfer.
130
131
Loads query results into memory before inserting into MySQL.
132
"""
133
```
134
135
### Trino to MySQL Transfer
136
137
Transfer data from Trino queries to MySQL tables using in-memory operations for small to medium datasets.
138
139
```python { .api }
140
class TrinoToMySqlOperator(BaseOperator):
141
"""
142
Move data from Trino to MySQL.
143
144
Note: Data is loaded into memory, suitable for small amounts of data.
145
Template fields: sql, mysql_table, mysql_preoperator
146
"""
147
148
def __init__(
149
self,
150
sql: str,
151
mysql_table: str,
152
trino_conn_id: str = "trino_default",
153
mysql_conn_id: str = "mysql_default",
154
mysql_preoperator: str = None,
155
**kwargs
156
):
157
"""
158
Initialize Trino to MySQL transfer operator.
159
160
Parameters:
161
- sql: SQL query to execute against Trino (templated)
162
- mysql_table: Target MySQL table, supports dot notation (templated)
163
- trino_conn_id: Source Trino connection ID
164
- mysql_conn_id: Target MySQL connection ID
165
- mysql_preoperator: SQL statement to run before import (templated)
166
"""
167
168
def execute(self, context: Context) -> None:
169
"""
170
Execute Trino to MySQL data transfer.
171
172
Loads query results into memory before inserting into MySQL.
173
"""
174
```
175
176
## Usage Examples
177
178
### S3 to MySQL Transfer
179
180
```python
181
from airflow import DAG
182
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
183
from datetime import datetime
184
185
dag = DAG('s3_mysql_transfer', start_date=datetime(2024, 1, 1))
186
187
# Basic S3 to MySQL transfer
188
s3_to_mysql = S3ToMySqlOperator(
189
task_id='load_users_from_s3',
190
s3_source_key='data/users/{{ ds }}/users.csv',
191
mysql_table='staging.users',
192
mysql_duplicate_key_handling='REPLACE',
193
mysql_extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""',
194
aws_conn_id='aws_default',
195
mysql_conn_id='mysql_default',
196
mysql_local_infile=True,
197
dag=dag
198
)
199
```
200
201
### Vertica to MySQL Transfer
202
203
```python
204
from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator
205
206
# Bulk load transfer from Vertica
207
vertica_to_mysql_bulk = VerticaToMySqlOperator(
208
task_id='transfer_vertica_bulk',
209
sql='''
210
SELECT user_id, username, email, created_date
211
FROM users
212
WHERE created_date >= '{{ ds }}'
213
''',
214
mysql_table='staging.users',
215
mysql_preoperator='TRUNCATE TABLE staging.users',
216
mysql_postoperator='CALL update_user_stats()',
217
bulk_load=True,
218
vertica_conn_id='vertica_default',
219
mysql_conn_id='mysql_default',
220
dag=dag
221
)
222
223
# Regular insert transfer
224
vertica_to_mysql_insert = VerticaToMySqlOperator(
225
task_id='transfer_vertica_insert',
226
sql='SELECT * FROM daily_metrics WHERE date = %s',
227
mysql_table='analytics.daily_metrics',
228
bulk_load=False,
229
dag=dag
230
)
231
```
232
233
### Presto to MySQL Transfer
234
235
```python
236
from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator
237
238
# Transfer aggregated data from Presto
239
presto_to_mysql = PrestoToMySqlOperator(
240
task_id='load_presto_aggregates',
241
sql='''
242
SELECT
243
date_trunc('day', event_time) as event_date,
244
event_type,
245
count(*) as event_count
246
FROM events
247
WHERE event_time >= date('{{ ds }}')
248
GROUP BY 1, 2
249
''',
250
mysql_table='analytics.event_summary',
251
mysql_preoperator='DELETE FROM analytics.event_summary WHERE event_date = "{{ ds }}"',
252
presto_conn_id='presto_default',
253
mysql_conn_id='mysql_default',
254
dag=dag
255
)
256
```
257
258
### Trino to MySQL Transfer
259
260
```python
261
from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperator
262
263
# Transfer processed data from Trino
264
trino_to_mysql = TrinoToMySqlOperator(
265
task_id='load_trino_results',
266
sql='''
267
SELECT
268
customer_id,
269
product_category,
270
sum(purchase_amount) as total_spent
271
FROM purchases
272
WHERE purchase_date = date('{{ ds }}')
273
GROUP BY customer_id, product_category
274
''',
275
mysql_table='customer_analytics.daily_spending',
276
mysql_preoperator='''
277
CREATE TABLE IF NOT EXISTS customer_analytics.daily_spending (
278
customer_id INT,
279
product_category VARCHAR(100),
280
total_spent DECIMAL(10,2),
281
load_date DATE DEFAULT CURDATE()
282
)
283
''',
284
trino_conn_id='trino_default',
285
mysql_conn_id='mysql_default',
286
dag=dag
287
)
288
```
289
290
## Transfer Operation Patterns
291
292
### Template Variables
293
294
All transfer operators support Airflow templating for dynamic values:
295
296
```python
297
# Template variables available in sql, mysql_table, and preoperator fields
298
sql_with_templates = '''
299
SELECT * FROM events
300
WHERE event_date = '{{ ds }}' # Current DAG run date
301
AND event_time >= '{{ ts }}' # Current DAG run timestamp
302
AND user_id IN {{ params.user_ids }} # Custom parameters
303
'''
304
305
mysql_table_template = 'staging.events_{{ ds_nodash }}' # Table with date suffix
306
```
307
308
### Error Handling and Retries
309
310
```python
311
# Configure retry behavior for transfer operations
312
transfer_operator = S3ToMySqlOperator(
313
task_id='s3_transfer',
314
s3_source_key='data/file.csv',
315
mysql_table='staging.data',
316
retries=3,
317
retry_delay=timedelta(minutes=5),
318
dag=dag
319
)
320
```
321
322
### Data Quality Checks
323
324
```python
325
from airflow.operators.python import PythonOperator
326
327
def validate_transfer_results():
328
hook = MySqlHook(mysql_conn_id='mysql_default')
329
count = hook.get_first('SELECT COUNT(*) FROM staging.users')[0]
330
if count == 0:
331
raise ValueError("No data transferred")
332
return count
333
334
# Add validation after transfer
335
validate_task = PythonOperator(
336
task_id='validate_transfer',
337
python_callable=validate_transfer_results,
338
dag=dag
339
)
340
341
s3_to_mysql >> validate_task
342
```
343
344
## Type Definitions
345
346
```python { .api }
347
# Base operator context for all transfer operations
348
Context = Dict[str, Any]
349
350
# Transfer operation configuration
351
TransferConfig = {
352
"source_conn_id": str, # Source system connection ID
353
"mysql_conn_id": str, # MySQL connection ID (default: "mysql_default")
354
"mysql_table": str, # Target table (supports database.table notation)
355
"mysql_preoperator": str, # SQL to run before transfer (optional)
356
"mysql_postoperator": str, # SQL to run after transfer (optional)
357
}
358
359
# S3 specific configuration
360
S3TransferConfig = {
361
"s3_source_key": str, # S3 object key (templated)
362
"aws_conn_id": str, # AWS connection (default: "aws_default")
363
"mysql_duplicate_key_handling": str, # "IGNORE" or "REPLACE"
364
"mysql_extra_options": str, # Additional LOAD DATA options
365
"mysql_local_infile": bool # Enable local_infile feature
366
}
367
368
# Bulk load configuration for Vertica transfers
369
BulkLoadConfig = {
370
"bulk_load": bool, # Enable bulk loading via temporary files
371
"tmp_file_path": str, # Temporary file location for bulk operations
372
}
373
```