0
# Data Transfer Operations
1
2
Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. The provider offers bidirectional data movement with transformation and format conversion capabilities for comprehensive ETL operations.
3
4
## Capabilities
5
6
### MySQL to Hive Transfer
7
8
Transfer data from MySQL tables to Hive with automatic schema mapping and partition support.
9
10
```python { .api }
11
class MySqlToHiveOperator:
12
def __init__(
13
self,
14
*,
15
sql: str,
16
hive_table: str,
17
create: bool = True,
18
recreate: bool = False,
19
partition: dict | None = None,
20
delimiter: str = chr(1),
21
quoting: Literal[0, 1, 2, 3] = csv.QUOTE_MINIMAL,
22
quotechar: str = '"',
23
escapechar: str | None = None,
24
mysql_conn_id: str = "mysql_default",
25
hive_cli_conn_id: str = "hive_cli_default",
26
hive_auth: str | None = None,
27
tblproperties: dict | None = None,
28
**kwargs
29
): ...
30
31
@classmethod
32
def type_map(cls, mysql_type: int) -> str: ...
33
34
def execute(self, context: 'Context') -> None: ...
35
```
36
37
**Usage Example:**
38
39
```python
40
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
41
42
# Basic MySQL to Hive transfer
43
mysql_to_hive = MySqlToHiveOperator(
44
task_id='transfer_customer_data',
45
sql='''
46
SELECT customer_id, name, email, registration_date, status
47
FROM customers
48
WHERE DATE(created_at) = %s
49
''',
50
sql_params=['{{ ds }}'],
51
hive_table='warehouse.customers',
52
partition={'ds': '{{ ds }}'},
53
mysql_conn_id='mysql_prod',
54
hive_cli_conn_id='hive_warehouse',
55
create=True,
56
dag=dag
57
)
58
59
# Complex transfer with table properties
60
advanced_mysql_transfer = MySqlToHiveOperator(
61
task_id='advanced_mysql_transfer',
62
sql='''
63
SELECT o.order_id, o.customer_id, o.total_amount, o.order_date,
64
GROUP_CONCAT(oi.product_id) as product_ids,
65
GROUP_CONCAT(oi.quantity) as quantities
66
FROM orders o
67
JOIN order_items oi ON o.order_id = oi.order_id
68
WHERE DATE(o.order_date) = %s
69
GROUP BY o.order_id, o.customer_id, o.total_amount, o.order_date
70
''',
71
sql_params=['{{ ds }}'],
72
table='analytics.order_summary',
73
partition={'processing_date': '{{ ds }}'},
74
delimiter='\t',
75
tblproperties={
76
'hive.exec.compress.output': 'true',
77
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
78
},
79
dag=dag
80
)
81
```
82
83
### S3 to Hive Transfer
84
85
Transfer data from Amazon S3 to Hive tables with support for various file formats and compression.
86
87
```python { .api }
88
class S3ToHiveOperator:
89
def __init__(
90
self,
91
*,
92
s3_key: str,
93
field_dict: dict,
94
hive_table: str,
95
delimiter: str = ",",
96
create: bool = True,
97
recreate: bool = False,
98
partition: dict | None = None,
99
headers: bool = False,
100
check_headers: bool = False,
101
wildcard_match: bool = False,
102
aws_conn_id: str | None = "aws_default",
103
verify: bool | str | None = None,
104
hive_cli_conn_id: str = "hive_cli_default",
105
input_compressed: bool = False,
106
tblproperties: dict | None = None,
107
select_expression: str | None = None,
108
hive_auth: str | None = None,
109
**kwargs
110
): ...
111
112
def execute(self, context: 'Context') -> None: ...
113
```
114
115
```python { .api }
116
def uncompress_file(input_file_name: str, file_extension: str, dest_dir: str) -> str: ...
117
```
118
119
**Usage Example:**
120
121
```python
122
from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator
123
124
# Basic S3 to Hive transfer
125
s3_to_hive = S3ToHiveOperator(
126
task_id='load_daily_logs',
127
s3_source_key='logs/{{ ds }}/application.log.gz',
128
table='warehouse.application_logs',
129
partition={'log_date': '{{ ds }}'},
130
aws_conn_id='aws_data_lake',
131
hive_cli_conn_id='hive_warehouse',
132
create=True,
133
dag=dag
134
)
135
136
# CSV transfer with pandas options
137
csv_s3_transfer = S3ToHiveOperator(
138
task_id='transfer_csv_data',
139
s3_source_key='exports/daily_report_{{ ds }}.csv.gz',
140
table='reports.daily_metrics',
141
partition={'report_date': '{{ ds }}'},
142
pd_csv_kwargs={
143
'sep': ',',
144
'header': 0,
145
'dtype': {'amount': 'float64', 'count': 'int64'},
146
'parse_dates': ['timestamp']
147
},
148
delimiter='\t',
149
dag=dag
150
)
151
152
# Multiple file transfer with templating
153
multi_file_transfer = S3ToHiveOperator(
154
task_id='load_regional_data',
155
s3_source_key='regional_data/{{ ds }}/{{ params.region }}_data.json.gz',
156
table='analytics.regional_metrics',
157
partition={'ds': '{{ ds }}', 'region': '{{ params.region }}'},
158
params={'region': 'us'},
159
tblproperties={'serialization.format': '1'},
160
dag=dag
161
)
162
```
163
164
### Hive to MySQL Transfer
165
166
Export data from Hive tables to MySQL with support for incremental loads and data transformations.
167
168
```python { .api }
169
class HiveToMySqlOperator:
170
def __init__(
171
self,
172
*,
173
sql: str,
174
mysql_table: str,
175
hiveserver2_conn_id: str = "hiveserver2_default",
176
mysql_conn_id: str = "mysql_default",
177
mysql_preoperator: str | None = None,
178
mysql_postoperator: str | None = None,
179
bulk_load: bool = False,
180
hive_conf: dict | None = None,
181
**kwargs
182
): ...
183
184
def execute(self, context: 'Context') -> None: ...
185
```
186
187
**Usage Example:**
188
189
```python
190
from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator
191
192
# Basic Hive to MySQL export
193
hive_to_mysql = HiveToMySqlOperator(
194
task_id='export_daily_summary',
195
sql='''
196
SELECT region, product_category,
197
SUM(revenue) as total_revenue,
198
COUNT(*) as transaction_count
199
FROM warehouse.daily_sales
200
WHERE ds = '{{ ds }}'
201
GROUP BY region, product_category
202
''',
203
mysql_table='reporting.daily_summary',
204
mysql_preoperator='DELETE FROM reporting.daily_summary WHERE report_date = "{{ ds }}"',
205
mysql_postoperator='UPDATE reporting.daily_summary SET last_updated = NOW() WHERE report_date = "{{ ds }}"',
206
hiveserver2_conn_id='hive_analytics',
207
mysql_conn_id='mysql_reporting',
208
dag=dag
209
)
210
211
# Bulk load for large datasets
212
bulk_export = HiveToMySqlOperator(
213
task_id='bulk_export_customers',
214
sql='''
215
SELECT customer_id, name, email, total_orders, lifetime_value
216
FROM warehouse.customer_metrics
217
WHERE ds = '{{ ds }}'
218
''',
219
mysql_table='crm.customer_snapshot',
220
bulk_load=True,
221
mysql_preoperator='TRUNCATE TABLE crm.customer_snapshot',
222
dag=dag
223
)
224
```
225
226
### Microsoft SQL Server to Hive Transfer
227
228
Transfer data from Microsoft SQL Server to Hive with support for complex data types and bulk operations.
229
230
```python { .api }
231
class MsSqlToHiveOperator:
232
def __init__(
233
self,
234
*,
235
sql: str,
236
hive_table: str,
237
create: bool = True,
238
recreate: bool = False,
239
partition: dict | None = None,
240
delimiter: str = chr(1),
241
mssql_conn_id: str = "mssql_default",
242
hive_cli_conn_id: str = "hive_cli_default",
243
hive_auth: str | None = None,
244
tblproperties: dict | None = None,
245
**kwargs
246
): ...
247
248
@classmethod
249
def type_map(cls, mssql_type: int) -> str: ...
250
251
def execute(self, context: 'Context') -> None: ...
252
```
253
254
**Usage Example:**
255
256
```python
257
from airflow.providers.apache.hive.transfers.mssql_to_hive import MsSqlToHiveOperator
258
259
# SQL Server to Hive transfer
260
mssql_to_hive = MsSqlToHiveOperator(
261
task_id='transfer_sales_data',
262
sql='''
263
SELECT SalesOrderID, CustomerID, OrderDate, TotalDue, Status
264
FROM Sales.SalesOrderHeader
265
WHERE CAST(OrderDate AS DATE) = ?
266
''',
267
sql_params=['{{ ds }}'],
268
table='warehouse.sales_orders',
269
partition={'order_date': '{{ ds }}'},
270
mssql_conn_id='mssql_prod',
271
hive_cli_conn_id='hive_warehouse',
272
dag=dag
273
)
274
```
275
276
### Vertica to Hive Transfer
277
278
Transfer data from Vertica to Hive with optimized bulk loading capabilities.
279
280
```python { .api }
281
class VerticaToHiveOperator:
282
def __init__(
283
self,
284
*,
285
sql: str,
286
hive_table: str,
287
create: bool = True,
288
recreate: bool = False,
289
partition: dict | None = None,
290
delimiter: str = chr(1),
291
vertica_conn_id: str = "vertica_default",
292
hive_cli_conn_id: str = "hive_cli_default",
293
hive_auth: str | None = None,
294
**kwargs
295
): ...
296
297
@classmethod
298
def type_map(cls, vertica_type) -> str: ...
299
300
def execute(self, context: 'Context') -> None: ...
301
```
302
303
**Usage Example:**
304
305
```python
306
from airflow.providers.apache.hive.transfers.vertica_to_hive import VerticaToHiveOperator
307
308
# Vertica to Hive transfer
309
vertica_to_hive = VerticaToHiveOperator(
310
task_id='transfer_analytics_data',
311
sql='''
312
SELECT customer_id, event_timestamp, event_type, properties
313
FROM analytics.user_events
314
WHERE DATE(event_timestamp) = DATE(?)
315
''',
316
sql_params=['{{ ds }}'],
317
hive_table='warehouse.user_events',
318
partition={'event_date': '{{ ds }}'},
319
vertica_conn_id='vertica_analytics',
320
hive_cli_conn_id='hive_warehouse',
321
dag=dag
322
)
323
324
# Large dataset transfer with compression
325
bulk_vertica_transfer = VerticaToHiveOperator(
326
task_id='bulk_transfer_transactions',
327
sql='''
328
SELECT transaction_id, user_id, amount, merchant_id, transaction_timestamp
329
FROM transactions.daily_transactions
330
WHERE transaction_date = ?
331
''',
332
sql_params=['{{ ds }}'],
333
hive_table='warehouse.transactions',
334
partition={'transaction_date': '{{ ds }}'},
335
create=True,
336
tblproperties={
337
'hive.exec.compress.output': 'true',
338
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
339
},
340
dag=dag
341
)
342
```
343
344
### Hive to Samba Transfer
345
346
Export data from Hive to Samba file shares for integration with legacy systems.
347
348
```python { .api }
349
class HiveToSambaOperator:
350
def __init__(
351
self,
352
*,
353
hql: str,
354
destination_filepath: str,
355
samba_conn_id: str = 'samba_default',
356
hiveserver2_conn_id: str = 'hiveserver2_default',
357
**kwargs
358
): ...
359
360
def execute(self, context: 'Context') -> None: ...
361
```
362
363
**Usage Example:**
364
365
```python
366
from airflow.providers.apache.hive.transfers.hive_to_samba import HiveToSambaOperator
367
368
# Export to Samba share
369
hive_to_samba = HiveToSambaOperator(
370
task_id='export_to_legacy_system',
371
hql='''
372
SELECT customer_id, order_date, product_code, quantity, amount
373
FROM warehouse.orders
374
WHERE ds = '{{ ds }}'
375
ORDER BY customer_id, order_date
376
''',
377
destination_filepath='/exports/daily_orders_{{ ds }}.csv',
378
samba_conn_id='legacy_file_share',
379
hiveserver2_conn_id='hive_reporting',
380
dag=dag
381
)
382
```
383
384
## Transfer Configuration
385
386
### File Format Handling
387
388
Transfer operators support various file formats and compression:
389
390
```python
391
# CSV with custom formatting
392
csv_transfer = MySqlToHiveOperator(
393
task_id='csv_transfer',
394
sql='SELECT * FROM source_table',
395
table='target_table',
396
delimiter=',',
397
quoting=1, # csv.QUOTE_ALL
398
quotechar='"',
399
escapechar='\\',
400
dag=dag
401
)
402
403
# Tab-separated with compression
404
tsv_transfer = S3ToHiveOperator(
405
task_id='tsv_transfer',
406
s3_source_key='data.tsv.gz',
407
table='target_table',
408
delimiter='\t',
409
tblproperties={
410
'hive.exec.compress.output': 'true',
411
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
412
},
413
dag=dag
414
)
415
```
416
417
### Partition Management
418
419
All transfer operators support dynamic partitioning:
420
421
```python
422
# Static partitioning
423
static_partition = MySqlToHiveOperator(
424
task_id='static_partition',
425
sql='SELECT * FROM orders WHERE order_date = "{{ ds }}"',
426
table='warehouse.orders',
427
partition={'ds': '{{ ds }}', 'region': 'us'},
428
dag=dag
429
)
430
431
# Dynamic partitioning with recreation
432
dynamic_partition = S3ToHiveOperator(
433
task_id='dynamic_partition',
434
s3_source_key='multi_region_data_{{ ds }}.csv',
435
table='warehouse.regional_data',
436
partition={'processing_date': '{{ ds }}'},
437
recreate=True, # Drop and recreate table
438
dag=dag
439
)
440
```
441
442
### Data Quality and Validation
443
444
Implement data quality checks during transfers:
445
446
```python
447
def validate_transfer_quality(**context):
448
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
449
450
hive_hook = HiveServer2Hook('hive_warehouse')
451
452
# Check record count
453
result = hive_hook.get_first(
454
f"SELECT COUNT(*) FROM warehouse.transferred_data WHERE ds = '{context['ds']}'"
455
)
456
457
record_count = result[0] if result else 0
458
if record_count < 1000:
459
raise ValueError(f"Transfer validation failed: only {record_count} records")
460
461
context['task_instance'].xcom_push(key='record_count', value=record_count)
462
463
transfer_task = MySqlToHiveOperator(
464
task_id='transfer_data',
465
sql='SELECT * FROM source_data WHERE date_col = "{{ ds }}"',
466
table='warehouse.transferred_data',
467
partition={'ds': '{{ ds }}'},
468
dag=dag
469
)
470
471
validation_task = PythonOperator(
472
task_id='validate_transfer',
473
python_callable=validate_transfer_quality,
474
dag=dag
475
)
476
477
transfer_task >> validation_task
478
```
479
480
## Advanced Transfer Patterns
481
482
### Incremental Loading
483
484
Implement incremental data loading strategies:
485
486
```python
487
# Incremental load with watermark
488
incremental_load = MySqlToHiveOperator(
489
task_id='incremental_customer_load',
490
sql='''
491
SELECT customer_id, name, email, updated_at
492
FROM customers
493
WHERE updated_at >= (
494
SELECT COALESCE(MAX(updated_at), '1900-01-01')
495
FROM warehouse.customers_staging
496
)
497
AND DATE(updated_at) <= %s
498
''',
499
sql_params=['{{ ds }}'],
500
table='warehouse.customers_staging',
501
partition={'load_date': '{{ ds }}'},
502
dag=dag
503
)
504
505
# Merge staging to production
506
merge_task = HiveOperator(
507
task_id='merge_customer_data',
508
hql='''
509
INSERT OVERWRITE TABLE warehouse.customers
510
SELECT DISTINCT
511
COALESCE(prod.customer_id, stage.customer_id) as customer_id,
512
COALESCE(stage.name, prod.name) as name,
513
COALESCE(stage.email, prod.email) as email,
514
GREATEST(COALESCE(stage.updated_at, '1900-01-01'),
515
COALESCE(prod.updated_at, '1900-01-01')) as updated_at
516
FROM warehouse.customers prod
517
FULL OUTER JOIN (
518
SELECT * FROM warehouse.customers_staging
519
WHERE load_date = '{{ ds }}'
520
) stage ON prod.customer_id = stage.customer_id;
521
''',
522
dag=dag
523
)
524
525
incremental_load >> merge_task
526
```
527
528
### Multi-Source Data Integration
529
530
Combine data from multiple sources:
531
532
```python
533
# Transfer from multiple databases
534
mysql_transfer = MySqlToHiveOperator(
535
task_id='mysql_orders',
536
sql='SELECT * FROM orders WHERE date_col = "{{ ds }}"',
537
table='staging.mysql_orders',
538
partition={'ds': '{{ ds }}', 'source': 'mysql'},
539
dag=dag
540
)
541
542
mssql_transfer = MsSqlToHiveOperator(
543
task_id='mssql_orders',
544
sql='SELECT * FROM Orders WHERE DateCol = ?',
545
sql_params=['{{ ds }}'],
546
table='staging.mssql_orders',
547
partition={'ds': '{{ ds }}', 'source': 'mssql'},
548
dag=dag
549
)
550
551
# Combine in Hive
552
combine_sources = HiveOperator(
553
task_id='combine_order_sources',
554
hql='''
555
INSERT OVERWRITE TABLE warehouse.unified_orders
556
PARTITION (ds='{{ ds }}')
557
SELECT order_id, customer_id, amount, 'mysql' as source_system
558
FROM staging.mysql_orders WHERE ds='{{ ds }}'
559
UNION ALL
560
SELECT order_id, customer_id, amount, 'mssql' as source_system
561
FROM staging.mssql_orders WHERE ds='{{ ds }}';
562
''',
563
dag=dag
564
)
565
566
[mysql_transfer, mssql_transfer] >> combine_sources
567
```
568
569
### Error Handling and Recovery
570
571
Implement robust error handling:
572
573
```python
574
def handle_transfer_failure(context):
575
task_instance = context['task_instance']
576
577
# Clean up partial data
578
from airflow.providers.apache.hive.hooks.hive import HiveCliHook
579
hook = HiveCliHook('hive_warehouse')
580
581
hook.run_cli(f'''
582
ALTER TABLE warehouse.failed_transfers
583
DROP IF EXISTS PARTITION (ds='{context["ds"]}', attempt='{task_instance.try_number}')
584
''')
585
586
# Send notification
587
send_failure_notification(f"Transfer failed: {task_instance.task_id}")
588
589
resilient_transfer = MySqlToHiveOperator(
590
task_id='resilient_transfer',
591
sql='SELECT * FROM large_table WHERE date_col = "{{ ds }}"',
592
table='warehouse.large_table',
593
partition={'ds': '{{ ds }}'},
594
retries=3,
595
retry_delay=timedelta(minutes=10),
596
on_failure_callback=handle_transfer_failure,
597
dag=dag
598
)
599
```