Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. The provider offers bidirectional data movement with transformation and format conversion capabilities for comprehensive ETL operations.
Transfer data from MySQL tables to Hive with automatic schema mapping and partition support.
class MySqlToHiveOperator:
def __init__(
self,
*,
sql: str,
hive_table: str,
create: bool = True,
recreate: bool = False,
partition: dict | None = None,
delimiter: str = chr(1),
quoting: Literal[0, 1, 2, 3] = csv.QUOTE_MINIMAL,
quotechar: str = '"',
escapechar: str | None = None,
mysql_conn_id: str = "mysql_default",
hive_cli_conn_id: str = "hive_cli_default",
hive_auth: str | None = None,
tblproperties: dict | None = None,
**kwargs
): ...
@classmethod
def type_map(cls, mysql_type: int) -> str: ...
def execute(self, context: 'Context') -> None: ...Usage Example:
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
# Basic MySQL to Hive transfer
mysql_to_hive = MySqlToHiveOperator(
task_id='transfer_customer_data',
sql='''
SELECT customer_id, name, email, registration_date, status
FROM customers
WHERE DATE(created_at) = %s
''',
sql_params=['{{ ds }}'],
hive_table='warehouse.customers',
partition={'ds': '{{ ds }}'},
mysql_conn_id='mysql_prod',
hive_cli_conn_id='hive_warehouse',
create=True,
dag=dag
)
# Complex transfer with table properties
advanced_mysql_transfer = MySqlToHiveOperator(
task_id='advanced_mysql_transfer',
sql='''
SELECT o.order_id, o.customer_id, o.total_amount, o.order_date,
GROUP_CONCAT(oi.product_id) as product_ids,
GROUP_CONCAT(oi.quantity) as quantities
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
WHERE DATE(o.order_date) = %s
GROUP BY o.order_id, o.customer_id, o.total_amount, o.order_date
''',
sql_params=['{{ ds }}'],
table='analytics.order_summary',
partition={'processing_date': '{{ ds }}'},
delimiter='\t',
tblproperties={
'hive.exec.compress.output': 'true',
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
},
dag=dag
)Transfer data from Amazon S3 to Hive tables with support for various file formats and compression.
class S3ToHiveOperator:
def __init__(
self,
*,
s3_key: str,
field_dict: dict,
hive_table: str,
delimiter: str = ",",
create: bool = True,
recreate: bool = False,
partition: dict | None = None,
headers: bool = False,
check_headers: bool = False,
wildcard_match: bool = False,
aws_conn_id: str | None = "aws_default",
verify: bool | str | None = None,
hive_cli_conn_id: str = "hive_cli_default",
input_compressed: bool = False,
tblproperties: dict | None = None,
select_expression: str | None = None,
hive_auth: str | None = None,
**kwargs
): ...
def execute(self, context: 'Context') -> None: ...def uncompress_file(input_file_name: str, file_extension: str, dest_dir: str) -> str: ...Usage Example:
from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator
# Basic S3 to Hive transfer
s3_to_hive = S3ToHiveOperator(
task_id='load_daily_logs',
s3_source_key='logs/{{ ds }}/application.log.gz',
table='warehouse.application_logs',
partition={'log_date': '{{ ds }}'},
aws_conn_id='aws_data_lake',
hive_cli_conn_id='hive_warehouse',
create=True,
dag=dag
)
# CSV transfer with pandas options
csv_s3_transfer = S3ToHiveOperator(
task_id='transfer_csv_data',
s3_source_key='exports/daily_report_{{ ds }}.csv.gz',
table='reports.daily_metrics',
partition={'report_date': '{{ ds }}'},
pd_csv_kwargs={
'sep': ',',
'header': 0,
'dtype': {'amount': 'float64', 'count': 'int64'},
'parse_dates': ['timestamp']
},
delimiter='\t',
dag=dag
)
# Multiple file transfer with templating
multi_file_transfer = S3ToHiveOperator(
task_id='load_regional_data',
s3_source_key='regional_data/{{ ds }}/{{ params.region }}_data.json.gz',
table='analytics.regional_metrics',
partition={'ds': '{{ ds }}', 'region': '{{ params.region }}'},
params={'region': 'us'},
tblproperties={'serialization.format': '1'},
dag=dag
)Export data from Hive tables to MySQL with support for incremental loads and data transformations.
class HiveToMySqlOperator:
def __init__(
self,
*,
sql: str,
mysql_table: str,
hiveserver2_conn_id: str = "hiveserver2_default",
mysql_conn_id: str = "mysql_default",
mysql_preoperator: str | None = None,
mysql_postoperator: str | None = None,
bulk_load: bool = False,
hive_conf: dict | None = None,
**kwargs
): ...
def execute(self, context: 'Context') -> None: ...Usage Example:
from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator
# Basic Hive to MySQL export
hive_to_mysql = HiveToMySqlOperator(
task_id='export_daily_summary',
sql='''
SELECT region, product_category,
SUM(revenue) as total_revenue,
COUNT(*) as transaction_count
FROM warehouse.daily_sales
WHERE ds = '{{ ds }}'
GROUP BY region, product_category
''',
mysql_table='reporting.daily_summary',
mysql_preoperator='DELETE FROM reporting.daily_summary WHERE report_date = "{{ ds }}"',
mysql_postoperator='UPDATE reporting.daily_summary SET last_updated = NOW() WHERE report_date = "{{ ds }}"',
hiveserver2_conn_id='hive_analytics',
mysql_conn_id='mysql_reporting',
dag=dag
)
# Bulk load for large datasets
bulk_export = HiveToMySqlOperator(
task_id='bulk_export_customers',
sql='''
SELECT customer_id, name, email, total_orders, lifetime_value
FROM warehouse.customer_metrics
WHERE ds = '{{ ds }}'
''',
mysql_table='crm.customer_snapshot',
bulk_load=True,
mysql_preoperator='TRUNCATE TABLE crm.customer_snapshot',
dag=dag
)Transfer data from Microsoft SQL Server to Hive with support for complex data types and bulk operations.
class MsSqlToHiveOperator:
def __init__(
self,
*,
sql: str,
hive_table: str,
create: bool = True,
recreate: bool = False,
partition: dict | None = None,
delimiter: str = chr(1),
mssql_conn_id: str = "mssql_default",
hive_cli_conn_id: str = "hive_cli_default",
hive_auth: str | None = None,
tblproperties: dict | None = None,
**kwargs
): ...
@classmethod
def type_map(cls, mssql_type: int) -> str: ...
def execute(self, context: 'Context') -> None: ...Usage Example:
from airflow.providers.apache.hive.transfers.mssql_to_hive import MsSqlToHiveOperator
# SQL Server to Hive transfer
mssql_to_hive = MsSqlToHiveOperator(
task_id='transfer_sales_data',
sql='''
SELECT SalesOrderID, CustomerID, OrderDate, TotalDue, Status
FROM Sales.SalesOrderHeader
WHERE CAST(OrderDate AS DATE) = ?
''',
sql_params=['{{ ds }}'],
table='warehouse.sales_orders',
partition={'order_date': '{{ ds }}'},
mssql_conn_id='mssql_prod',
hive_cli_conn_id='hive_warehouse',
dag=dag
)Transfer data from Vertica to Hive with optimized bulk loading capabilities.
class VerticaToHiveOperator:
def __init__(
self,
*,
sql: str,
hive_table: str,
create: bool = True,
recreate: bool = False,
partition: dict | None = None,
delimiter: str = chr(1),
vertica_conn_id: str = "vertica_default",
hive_cli_conn_id: str = "hive_cli_default",
hive_auth: str | None = None,
**kwargs
): ...
@classmethod
def type_map(cls, vertica_type) -> str: ...
def execute(self, context: 'Context') -> None: ...Usage Example:
from airflow.providers.apache.hive.transfers.vertica_to_hive import VerticaToHiveOperator
# Vertica to Hive transfer
vertica_to_hive = VerticaToHiveOperator(
task_id='transfer_analytics_data',
sql='''
SELECT customer_id, event_timestamp, event_type, properties
FROM analytics.user_events
WHERE DATE(event_timestamp) = DATE(?)
''',
sql_params=['{{ ds }}'],
hive_table='warehouse.user_events',
partition={'event_date': '{{ ds }}'},
vertica_conn_id='vertica_analytics',
hive_cli_conn_id='hive_warehouse',
dag=dag
)
# Large dataset transfer with compression
bulk_vertica_transfer = VerticaToHiveOperator(
task_id='bulk_transfer_transactions',
sql='''
SELECT transaction_id, user_id, amount, merchant_id, transaction_timestamp
FROM transactions.daily_transactions
WHERE transaction_date = ?
''',
sql_params=['{{ ds }}'],
hive_table='warehouse.transactions',
partition={'transaction_date': '{{ ds }}'},
create=True,
tblproperties={
'hive.exec.compress.output': 'true',
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
},
dag=dag
)Export data from Hive to Samba file shares for integration with legacy systems.
class HiveToSambaOperator:
def __init__(
self,
*,
hql: str,
destination_filepath: str,
samba_conn_id: str = 'samba_default',
hiveserver2_conn_id: str = 'hiveserver2_default',
**kwargs
): ...
def execute(self, context: 'Context') -> None: ...Usage Example:
from airflow.providers.apache.hive.transfers.hive_to_samba import HiveToSambaOperator
# Export to Samba share
hive_to_samba = HiveToSambaOperator(
task_id='export_to_legacy_system',
hql='''
SELECT customer_id, order_date, product_code, quantity, amount
FROM warehouse.orders
WHERE ds = '{{ ds }}'
ORDER BY customer_id, order_date
''',
destination_filepath='/exports/daily_orders_{{ ds }}.csv',
samba_conn_id='legacy_file_share',
hiveserver2_conn_id='hive_reporting',
dag=dag
)Transfer operators support various file formats and compression:
# CSV with custom formatting
csv_transfer = MySqlToHiveOperator(
task_id='csv_transfer',
sql='SELECT * FROM source_table',
table='target_table',
delimiter=',',
quoting=1, # csv.QUOTE_ALL
quotechar='"',
escapechar='\\',
dag=dag
)
# Tab-separated with compression
tsv_transfer = S3ToHiveOperator(
task_id='tsv_transfer',
s3_source_key='data.tsv.gz',
table='target_table',
delimiter='\t',
tblproperties={
'hive.exec.compress.output': 'true',
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
},
dag=dag
)All transfer operators support dynamic partitioning:
# Static partitioning
static_partition = MySqlToHiveOperator(
task_id='static_partition',
sql='SELECT * FROM orders WHERE order_date = "{{ ds }}"',
table='warehouse.orders',
partition={'ds': '{{ ds }}', 'region': 'us'},
dag=dag
)
# Dynamic partitioning with recreation
dynamic_partition = S3ToHiveOperator(
task_id='dynamic_partition',
s3_source_key='multi_region_data_{{ ds }}.csv',
table='warehouse.regional_data',
partition={'processing_date': '{{ ds }}'},
recreate=True, # Drop and recreate table
dag=dag
)Implement data quality checks during transfers:
def validate_transfer_quality(**context):
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
hive_hook = HiveServer2Hook('hive_warehouse')
# Check record count
result = hive_hook.get_first(
f"SELECT COUNT(*) FROM warehouse.transferred_data WHERE ds = '{context['ds']}'"
)
record_count = result[0] if result else 0
if record_count < 1000:
raise ValueError(f"Transfer validation failed: only {record_count} records")
context['task_instance'].xcom_push(key='record_count', value=record_count)
transfer_task = MySqlToHiveOperator(
task_id='transfer_data',
sql='SELECT * FROM source_data WHERE date_col = "{{ ds }}"',
table='warehouse.transferred_data',
partition={'ds': '{{ ds }}'},
dag=dag
)
validation_task = PythonOperator(
task_id='validate_transfer',
python_callable=validate_transfer_quality,
dag=dag
)
transfer_task >> validation_taskImplement incremental data loading strategies:
# Incremental load with watermark
incremental_load = MySqlToHiveOperator(
task_id='incremental_customer_load',
sql='''
SELECT customer_id, name, email, updated_at
FROM customers
WHERE updated_at >= (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM warehouse.customers_staging
)
AND DATE(updated_at) <= %s
''',
sql_params=['{{ ds }}'],
table='warehouse.customers_staging',
partition={'load_date': '{{ ds }}'},
dag=dag
)
# Merge staging to production
merge_task = HiveOperator(
task_id='merge_customer_data',
hql='''
INSERT OVERWRITE TABLE warehouse.customers
SELECT DISTINCT
COALESCE(prod.customer_id, stage.customer_id) as customer_id,
COALESCE(stage.name, prod.name) as name,
COALESCE(stage.email, prod.email) as email,
GREATEST(COALESCE(stage.updated_at, '1900-01-01'),
COALESCE(prod.updated_at, '1900-01-01')) as updated_at
FROM warehouse.customers prod
FULL OUTER JOIN (
SELECT * FROM warehouse.customers_staging
WHERE load_date = '{{ ds }}'
) stage ON prod.customer_id = stage.customer_id;
''',
dag=dag
)
incremental_load >> merge_taskCombine data from multiple sources:
# Transfer from multiple databases
mysql_transfer = MySqlToHiveOperator(
task_id='mysql_orders',
sql='SELECT * FROM orders WHERE date_col = "{{ ds }}"',
table='staging.mysql_orders',
partition={'ds': '{{ ds }}', 'source': 'mysql'},
dag=dag
)
mssql_transfer = MsSqlToHiveOperator(
task_id='mssql_orders',
sql='SELECT * FROM Orders WHERE DateCol = ?',
sql_params=['{{ ds }}'],
table='staging.mssql_orders',
partition={'ds': '{{ ds }}', 'source': 'mssql'},
dag=dag
)
# Combine in Hive
combine_sources = HiveOperator(
task_id='combine_order_sources',
hql='''
INSERT OVERWRITE TABLE warehouse.unified_orders
PARTITION (ds='{{ ds }}')
SELECT order_id, customer_id, amount, 'mysql' as source_system
FROM staging.mysql_orders WHERE ds='{{ ds }}'
UNION ALL
SELECT order_id, customer_id, amount, 'mssql' as source_system
FROM staging.mssql_orders WHERE ds='{{ ds }}';
''',
dag=dag
)
[mysql_transfer, mssql_transfer] >> combine_sourcesImplement robust error handling:
def handle_transfer_failure(context):
task_instance = context['task_instance']
# Clean up partial data
from airflow.providers.apache.hive.hooks.hive import HiveCliHook
hook = HiveCliHook('hive_warehouse')
hook.run_cli(f'''
ALTER TABLE warehouse.failed_transfers
DROP IF EXISTS PARTITION (ds='{context["ds"]}', attempt='{task_instance.try_number}')
''')
# Send notification
send_failure_notification(f"Transfer failed: {task_instance.task_id}")
resilient_transfer = MySqlToHiveOperator(
task_id='resilient_transfer',
sql='SELECT * FROM large_table WHERE date_col = "{{ ds }}"',
table='warehouse.large_table',
partition={'ds': '{{ ds }}'},
retries=3,
retry_delay=timedelta(minutes=10),
on_failure_callback=handle_transfer_failure,
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hive