CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-hive

Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.

Overview
Eval results
Files

data-transfers.mddocs/

Data Transfer Operations

Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. The provider offers bidirectional data movement with transformation and format conversion capabilities for comprehensive ETL operations.

Capabilities

MySQL to Hive Transfer

Transfer data from MySQL tables to Hive with automatic schema mapping and partition support.

class MySqlToHiveOperator:
    def __init__(
        self,
        *,
        sql: str,
        hive_table: str,
        create: bool = True,
        recreate: bool = False,
        partition: dict | None = None,
        delimiter: str = chr(1),
        quoting: Literal[0, 1, 2, 3] = csv.QUOTE_MINIMAL,
        quotechar: str = '"',
        escapechar: str | None = None,
        mysql_conn_id: str = "mysql_default",
        hive_cli_conn_id: str = "hive_cli_default",
        hive_auth: str | None = None,
        tblproperties: dict | None = None,
        **kwargs
    ): ...
    
    @classmethod
    def type_map(cls, mysql_type: int) -> str: ...
    
    def execute(self, context: 'Context') -> None: ...

Usage Example:

from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator

# Basic MySQL to Hive transfer
mysql_to_hive = MySqlToHiveOperator(
    task_id='transfer_customer_data',
    sql='''
        SELECT customer_id, name, email, registration_date, status
        FROM customers 
        WHERE DATE(created_at) = %s
    ''',
    sql_params=['{{ ds }}'],
    hive_table='warehouse.customers',
    partition={'ds': '{{ ds }}'},
    mysql_conn_id='mysql_prod',
    hive_cli_conn_id='hive_warehouse',
    create=True,
    dag=dag
)

# Complex transfer with table properties
advanced_mysql_transfer = MySqlToHiveOperator(
    task_id='advanced_mysql_transfer',
    sql='''
        SELECT o.order_id, o.customer_id, o.total_amount, o.order_date,
               GROUP_CONCAT(oi.product_id) as product_ids,
               GROUP_CONCAT(oi.quantity) as quantities
        FROM orders o
        JOIN order_items oi ON o.order_id = oi.order_id
        WHERE DATE(o.order_date) = %s
        GROUP BY o.order_id, o.customer_id, o.total_amount, o.order_date
    ''',
    sql_params=['{{ ds }}'],
    table='analytics.order_summary',
    partition={'processing_date': '{{ ds }}'},
    delimiter='\t',
    tblproperties={
        'hive.exec.compress.output': 'true',
        'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
    },
    dag=dag
)

S3 to Hive Transfer

Transfer data from Amazon S3 to Hive tables with support for various file formats and compression.

class S3ToHiveOperator:
    def __init__(
        self,
        *,
        s3_key: str,
        field_dict: dict,
        hive_table: str,
        delimiter: str = ",",
        create: bool = True,
        recreate: bool = False,
        partition: dict | None = None,
        headers: bool = False,
        check_headers: bool = False,
        wildcard_match: bool = False,
        aws_conn_id: str | None = "aws_default",
        verify: bool | str | None = None,
        hive_cli_conn_id: str = "hive_cli_default",
        input_compressed: bool = False,
        tblproperties: dict | None = None,
        select_expression: str | None = None,
        hive_auth: str | None = None,
        **kwargs
    ): ...
    
    def execute(self, context: 'Context') -> None: ...
def uncompress_file(input_file_name: str, file_extension: str, dest_dir: str) -> str: ...

Usage Example:

from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator

# Basic S3 to Hive transfer
s3_to_hive = S3ToHiveOperator(
    task_id='load_daily_logs',
    s3_source_key='logs/{{ ds }}/application.log.gz',
    table='warehouse.application_logs',
    partition={'log_date': '{{ ds }}'},
    aws_conn_id='aws_data_lake',
    hive_cli_conn_id='hive_warehouse',
    create=True,
    dag=dag
)

# CSV transfer with pandas options
csv_s3_transfer = S3ToHiveOperator(
    task_id='transfer_csv_data',
    s3_source_key='exports/daily_report_{{ ds }}.csv.gz',
    table='reports.daily_metrics',
    partition={'report_date': '{{ ds }}'},
    pd_csv_kwargs={
        'sep': ',',
        'header': 0,
        'dtype': {'amount': 'float64', 'count': 'int64'},
        'parse_dates': ['timestamp']
    },
    delimiter='\t',
    dag=dag
)

# Multiple file transfer with templating
multi_file_transfer = S3ToHiveOperator(
    task_id='load_regional_data',
    s3_source_key='regional_data/{{ ds }}/{{ params.region }}_data.json.gz',
    table='analytics.regional_metrics',
    partition={'ds': '{{ ds }}', 'region': '{{ params.region }}'},
    params={'region': 'us'},
    tblproperties={'serialization.format': '1'},
    dag=dag
)

Hive to MySQL Transfer

Export data from Hive tables to MySQL with support for incremental loads and data transformations.

class HiveToMySqlOperator:
    def __init__(
        self,
        *,
        sql: str,
        mysql_table: str,
        hiveserver2_conn_id: str = "hiveserver2_default",
        mysql_conn_id: str = "mysql_default",
        mysql_preoperator: str | None = None,
        mysql_postoperator: str | None = None,
        bulk_load: bool = False,
        hive_conf: dict | None = None,
        **kwargs
    ): ...
    
    def execute(self, context: 'Context') -> None: ...

Usage Example:

from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator

# Basic Hive to MySQL export
hive_to_mysql = HiveToMySqlOperator(
    task_id='export_daily_summary',
    sql='''
        SELECT region, product_category, 
               SUM(revenue) as total_revenue,
               COUNT(*) as transaction_count
        FROM warehouse.daily_sales
        WHERE ds = '{{ ds }}'
        GROUP BY region, product_category
    ''',
    mysql_table='reporting.daily_summary',
    mysql_preoperator='DELETE FROM reporting.daily_summary WHERE report_date = "{{ ds }}"',
    mysql_postoperator='UPDATE reporting.daily_summary SET last_updated = NOW() WHERE report_date = "{{ ds }}"',
    hiveserver2_conn_id='hive_analytics',
    mysql_conn_id='mysql_reporting',
    dag=dag
)

# Bulk load for large datasets
bulk_export = HiveToMySqlOperator(
    task_id='bulk_export_customers',
    sql='''
        SELECT customer_id, name, email, total_orders, lifetime_value
        FROM warehouse.customer_metrics
        WHERE ds = '{{ ds }}'
    ''',
    mysql_table='crm.customer_snapshot',
    bulk_load=True,
    mysql_preoperator='TRUNCATE TABLE crm.customer_snapshot',
    dag=dag
)

Microsoft SQL Server to Hive Transfer

Transfer data from Microsoft SQL Server to Hive with support for complex data types and bulk operations.

class MsSqlToHiveOperator:
    def __init__(
        self,
        *,
        sql: str,
        hive_table: str,
        create: bool = True,
        recreate: bool = False,
        partition: dict | None = None,
        delimiter: str = chr(1),
        mssql_conn_id: str = "mssql_default",
        hive_cli_conn_id: str = "hive_cli_default",
        hive_auth: str | None = None,
        tblproperties: dict | None = None,
        **kwargs
    ): ...
    
    @classmethod
    def type_map(cls, mssql_type: int) -> str: ...
    
    def execute(self, context: 'Context') -> None: ...

Usage Example:

from airflow.providers.apache.hive.transfers.mssql_to_hive import MsSqlToHiveOperator

# SQL Server to Hive transfer
mssql_to_hive = MsSqlToHiveOperator(
    task_id='transfer_sales_data',
    sql='''
        SELECT SalesOrderID, CustomerID, OrderDate, TotalDue, Status
        FROM Sales.SalesOrderHeader
        WHERE CAST(OrderDate AS DATE) = ?
    ''',
    sql_params=['{{ ds }}'],
    table='warehouse.sales_orders',
    partition={'order_date': '{{ ds }}'},
    mssql_conn_id='mssql_prod',
    hive_cli_conn_id='hive_warehouse',
    dag=dag
)

Vertica to Hive Transfer

Transfer data from Vertica to Hive with optimized bulk loading capabilities.

class VerticaToHiveOperator:
    def __init__(
        self,
        *,
        sql: str,
        hive_table: str,
        create: bool = True,
        recreate: bool = False,
        partition: dict | None = None,
        delimiter: str = chr(1),
        vertica_conn_id: str = "vertica_default",
        hive_cli_conn_id: str = "hive_cli_default",
        hive_auth: str | None = None,
        **kwargs
    ): ...
    
    @classmethod
    def type_map(cls, vertica_type) -> str: ...
    
    def execute(self, context: 'Context') -> None: ...

Usage Example:

from airflow.providers.apache.hive.transfers.vertica_to_hive import VerticaToHiveOperator

# Vertica to Hive transfer
vertica_to_hive = VerticaToHiveOperator(
    task_id='transfer_analytics_data',
    sql='''
        SELECT customer_id, event_timestamp, event_type, properties
        FROM analytics.user_events
        WHERE DATE(event_timestamp) = DATE(?)
    ''',
    sql_params=['{{ ds }}'],
    hive_table='warehouse.user_events',
    partition={'event_date': '{{ ds }}'},
    vertica_conn_id='vertica_analytics',
    hive_cli_conn_id='hive_warehouse',
    dag=dag
)

# Large dataset transfer with compression
bulk_vertica_transfer = VerticaToHiveOperator(
    task_id='bulk_transfer_transactions',
    sql='''
        SELECT transaction_id, user_id, amount, merchant_id, transaction_timestamp
        FROM transactions.daily_transactions
        WHERE transaction_date = ?
    ''',
    sql_params=['{{ ds }}'],
    hive_table='warehouse.transactions',
    partition={'transaction_date': '{{ ds }}'},
    create=True,
    tblproperties={
        'hive.exec.compress.output': 'true',
        'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
    },
    dag=dag
)

Hive to Samba Transfer

Export data from Hive to Samba file shares for integration with legacy systems.

class HiveToSambaOperator:
    def __init__(
        self,
        *,
        hql: str,
        destination_filepath: str,
        samba_conn_id: str = 'samba_default',
        hiveserver2_conn_id: str = 'hiveserver2_default',
        **kwargs
    ): ...
    
    def execute(self, context: 'Context') -> None: ...

Usage Example:

from airflow.providers.apache.hive.transfers.hive_to_samba import HiveToSambaOperator

# Export to Samba share
hive_to_samba = HiveToSambaOperator(
    task_id='export_to_legacy_system',
    hql='''
        SELECT customer_id, order_date, product_code, quantity, amount
        FROM warehouse.orders
        WHERE ds = '{{ ds }}'
        ORDER BY customer_id, order_date
    ''',
    destination_filepath='/exports/daily_orders_{{ ds }}.csv',
    samba_conn_id='legacy_file_share',
    hiveserver2_conn_id='hive_reporting',
    dag=dag
)

Transfer Configuration

File Format Handling

Transfer operators support various file formats and compression:

# CSV with custom formatting
csv_transfer = MySqlToHiveOperator(
    task_id='csv_transfer',
    sql='SELECT * FROM source_table',
    table='target_table',
    delimiter=',',
    quoting=1,  # csv.QUOTE_ALL
    quotechar='"',
    escapechar='\\',
    dag=dag
)

# Tab-separated with compression
tsv_transfer = S3ToHiveOperator(
    task_id='tsv_transfer',
    s3_source_key='data.tsv.gz',
    table='target_table',
    delimiter='\t',
    tblproperties={
        'hive.exec.compress.output': 'true',
        'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
    },
    dag=dag
)

Partition Management

All transfer operators support dynamic partitioning:

# Static partitioning
static_partition = MySqlToHiveOperator(
    task_id='static_partition',
    sql='SELECT * FROM orders WHERE order_date = "{{ ds }}"',
    table='warehouse.orders',
    partition={'ds': '{{ ds }}', 'region': 'us'},
    dag=dag
)

# Dynamic partitioning with recreation
dynamic_partition = S3ToHiveOperator(
    task_id='dynamic_partition',
    s3_source_key='multi_region_data_{{ ds }}.csv',
    table='warehouse.regional_data',
    partition={'processing_date': '{{ ds }}'},
    recreate=True,  # Drop and recreate table
    dag=dag
)

Data Quality and Validation

Implement data quality checks during transfers:

def validate_transfer_quality(**context):
    from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
    
    hive_hook = HiveServer2Hook('hive_warehouse')
    
    # Check record count
    result = hive_hook.get_first(
        f"SELECT COUNT(*) FROM warehouse.transferred_data WHERE ds = '{context['ds']}'"
    )
    
    record_count = result[0] if result else 0
    if record_count < 1000:
        raise ValueError(f"Transfer validation failed: only {record_count} records")
    
    context['task_instance'].xcom_push(key='record_count', value=record_count)

transfer_task = MySqlToHiveOperator(
    task_id='transfer_data',
    sql='SELECT * FROM source_data WHERE date_col = "{{ ds }}"',
    table='warehouse.transferred_data',
    partition={'ds': '{{ ds }}'},
    dag=dag
)

validation_task = PythonOperator(
    task_id='validate_transfer',
    python_callable=validate_transfer_quality,
    dag=dag
)

transfer_task >> validation_task

Advanced Transfer Patterns

Incremental Loading

Implement incremental data loading strategies:

# Incremental load with watermark
incremental_load = MySqlToHiveOperator(
    task_id='incremental_customer_load',
    sql='''
        SELECT customer_id, name, email, updated_at
        FROM customers
        WHERE updated_at >= (
            SELECT COALESCE(MAX(updated_at), '1900-01-01')
            FROM warehouse.customers_staging
        )
        AND DATE(updated_at) <= %s
    ''',
    sql_params=['{{ ds }}'],
    table='warehouse.customers_staging',
    partition={'load_date': '{{ ds }}'},
    dag=dag
)

# Merge staging to production
merge_task = HiveOperator(
    task_id='merge_customer_data',
    hql='''
        INSERT OVERWRITE TABLE warehouse.customers
        SELECT DISTINCT
            COALESCE(prod.customer_id, stage.customer_id) as customer_id,
            COALESCE(stage.name, prod.name) as name,
            COALESCE(stage.email, prod.email) as email,
            GREATEST(COALESCE(stage.updated_at, '1900-01-01'), 
                    COALESCE(prod.updated_at, '1900-01-01')) as updated_at
        FROM warehouse.customers prod
        FULL OUTER JOIN (
            SELECT * FROM warehouse.customers_staging 
            WHERE load_date = '{{ ds }}'
        ) stage ON prod.customer_id = stage.customer_id;
    ''',
    dag=dag
)

incremental_load >> merge_task

Multi-Source Data Integration

Combine data from multiple sources:

# Transfer from multiple databases
mysql_transfer = MySqlToHiveOperator(
    task_id='mysql_orders',
    sql='SELECT * FROM orders WHERE date_col = "{{ ds }}"',
    table='staging.mysql_orders',
    partition={'ds': '{{ ds }}', 'source': 'mysql'},
    dag=dag
)

mssql_transfer = MsSqlToHiveOperator(
    task_id='mssql_orders',
    sql='SELECT * FROM Orders WHERE DateCol = ?',
    sql_params=['{{ ds }}'],
    table='staging.mssql_orders', 
    partition={'ds': '{{ ds }}', 'source': 'mssql'},
    dag=dag
)

# Combine in Hive
combine_sources = HiveOperator(
    task_id='combine_order_sources',
    hql='''
        INSERT OVERWRITE TABLE warehouse.unified_orders
        PARTITION (ds='{{ ds }}')
        SELECT order_id, customer_id, amount, 'mysql' as source_system
        FROM staging.mysql_orders WHERE ds='{{ ds }}'
        UNION ALL
        SELECT order_id, customer_id, amount, 'mssql' as source_system  
        FROM staging.mssql_orders WHERE ds='{{ ds }}';
    ''',
    dag=dag
)

[mysql_transfer, mssql_transfer] >> combine_sources

Error Handling and Recovery

Implement robust error handling:

def handle_transfer_failure(context):
    task_instance = context['task_instance']
    
    # Clean up partial data
    from airflow.providers.apache.hive.hooks.hive import HiveCliHook
    hook = HiveCliHook('hive_warehouse')
    
    hook.run_cli(f'''
        ALTER TABLE warehouse.failed_transfers 
        DROP IF EXISTS PARTITION (ds='{context["ds"]}', attempt='{task_instance.try_number}')
    ''')
    
    # Send notification
    send_failure_notification(f"Transfer failed: {task_instance.task_id}")

resilient_transfer = MySqlToHiveOperator(
    task_id='resilient_transfer',
    sql='SELECT * FROM large_table WHERE date_col = "{{ ds }}"',
    table='warehouse.large_table',
    partition={'ds': '{{ ds }}'},
    retries=3,
    retry_delay=timedelta(minutes=10),
    on_failure_callback=handle_transfer_failure,
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-hive

docs

data-transfers.md

hooks-connections.md

index.md

macros-utilities.md

partition-monitoring.md

query-execution.md

tile.json