CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-hive

Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.

Overview
Eval results
Files

query-execution.mddocs/

Query Execution

Execute HQL scripts and queries with comprehensive support for templating, parameter substitution, MapReduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting detailed table statistics.

Capabilities

Hive Query Operator

Primary operator for executing HQL code or Hive scripts within Airflow workflows with full templating and configuration support.

class HiveOperator:
    template_fields: tuple[str, ...] = (
        'hql', 'schema', 'hive_cli_conn_id', 'mapred_queue', 
        'hiveconfs', 'mapred_job_name', 'mapred_queue_priority', 'proxy_user'
    )
    template_ext: tuple[str, ...] = ('.hql', '.sql')
    
    def __init__(
        self,
        *,
        hql: str,
        hive_cli_conn_id: str = 'hive_cli_default',
        schema: str = 'default',
        hiveconfs: dict | None = None,
        hiveconf_jinja_translate: bool = False,
        script_begin_tag: str | None = None,
        mapred_queue: str | None = None,
        mapred_queue_priority: str | None = None,
        mapred_job_name: str | None = None,
        hive_cli_params: str = '',
        auth: str | None = None,
        proxy_user: str | None = None,
        **kwargs
    ): ...
    
    def execute(self, context: 'Context') -> None: ...
    def on_kill(self) -> None: ...

Usage Examples:

from airflow.providers.apache.hive.operators.hive import HiveOperator
from datetime import datetime, timedelta

# Basic HQL execution
simple_query = HiveOperator(
    task_id='simple_hive_query',
    hql='''
        SELECT COUNT(*) as record_count
        FROM warehouse.sales
        WHERE ds = '{{ ds }}';
    ''',
    hive_cli_conn_id='hive_production',
    schema='warehouse',
    dag=dag
)

# Complex query with configuration
etl_process = HiveOperator(
    task_id='daily_etl',
    hql='''
        SET hive.exec.dynamic.partition=true;
        SET hive.exec.dynamic.partition.mode=nonstrict;
        
        INSERT OVERWRITE TABLE warehouse.sales_summary
        PARTITION (ds='{{ ds }}', region)
        SELECT 
            product_id,
            SUM(amount) as total_sales,
            COUNT(*) as transaction_count,
            AVG(amount) as avg_sale,
            region
        FROM warehouse.daily_sales
        WHERE ds = '{{ ds }}'
        GROUP BY product_id, region;
    ''',
    hiveconfs={
        'hive.exec.compress.output': 'true',
        'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
    },
    mapred_queue='analytics',
    mapred_queue_priority='HIGH',
    mapred_job_name='sales_etl_{{ ds }}',
    dag=dag
)

# Execute script from file
script_execution = HiveOperator(
    task_id='run_hive_script',
    hql='process_customer_data.hql',  # File path relative to DAG
    hiveconf_jinja_translate=True,
    script_begin_tag='-- BEGIN PROCESSING',
    proxy_user='etl_service_account',
    dag=dag
)

# Query with custom parameters
parameterized_query = HiveOperator(
    task_id='parameterized_analysis',
    hql='''
        SELECT {{ params.metric_column }}
        FROM {{ params.source_table }}
        WHERE ds BETWEEN '{{ ds }}' AND '{{ macros.ds_add(ds, 7) }}'
          AND region IN ({{ params.regions | join("','") | "'" + _0 + "'" }})
        GROUP BY {{ params.group_by_columns | join(', ') }};
    ''',
    params={
        'metric_column': 'SUM(revenue) as total_revenue',
        'source_table': 'warehouse.sales',
        'regions': ['us', 'eu', 'asia'],
        'group_by_columns': ['product_category', 'customer_segment']
    },
    dag=dag
)

Hive Statistics Collection Operator

Specialized operator for gathering partition statistics using Presto queries and storing results in MySQL for monitoring and optimization.

class HiveStatsCollectionOperator:
    template_fields: tuple[str, ...] = ('table', 'partition', 'ds', 'dttm')
    
    def __init__(
        self,
        *,
        table: str,
        partition: Any,
        extra_exprs: dict[str, Any] | None = None,
        excluded_columns: list[str] | None = None,
        assignment_func: callable | None = None,
        metastore_conn_id: str = 'metastore_default',
        presto_conn_id: str = 'presto_default',
        mysql_conn_id: str = 'airflow_db',
        ds: str = '{{ ds }}',
        dttm: str = '{{ logical_date.isoformat() }}',
        **kwargs
    ): ...
    
    def execute(self, context: 'Context') -> None: ...
    def get_default_exprs(self, col: str, col_type: str) -> dict: ...

Usage Example:

from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator

# Basic statistics collection
collect_stats = HiveStatsCollectionOperator(
    task_id='collect_table_stats',
    table='warehouse.customer_transactions',
    partition={'ds': '{{ ds }}', 'region': 'us'},
    metastore_conn_id='metastore_prod',
    presto_conn_id='presto_analytics',
    mysql_conn_id='stats_db',
    dag=dag
)

# Advanced statistics with custom expressions
advanced_stats = HiveStatsCollectionOperator(
    task_id='advanced_stats_collection',
    table='warehouse.sales_data',
    partition={'ds': '{{ ds }}'},
    extra_exprs={
        'revenue_percentile_95': 'APPROX_PERCENTILE(revenue, 0.95)',
        'unique_customers': 'COUNT(DISTINCT customer_id)',
        'avg_order_value': 'AVG(order_total)',
        'max_transaction_time': 'MAX(transaction_timestamp)'
    },
    excluded_columns=['raw_data_blob', 'customer_notes'],
    dag=dag
)

# Custom column assignment function
def custom_assignment(col_name: str, col_type: str) -> dict | None:
    if col_type.lower().startswith('varchar'):
        return {
            (col_name, 'max_length'): f'MAX(LENGTH({col_name}))',
            (col_name, 'avg_length'): f'AVG(LENGTH({col_name}))'
        }
    elif col_type.lower() in ['int', 'bigint', 'double', 'decimal']:
        return {
            (col_name, 'min'): f'MIN({col_name})',
            (col_name, 'max'): f'MAX({col_name})',
            (col_name, 'avg'): f'AVG({col_name})',
            (col_name, 'stddev'): f'STDDEV({col_name})'
        }
    return None

custom_stats = HiveStatsCollectionOperator(
    task_id='custom_stats_collection',
    table='warehouse.product_metrics',
    partition={'ds': '{{ ds }}'},
    assignment_func=custom_assignment,
    dag=dag
)

Template Support

Airflow Template Fields

Both operators support extensive templating through Airflow's Jinja2 engine:

  • HiveOperator: hql, schema, hive_cli_conn_id, mapred_queue, hiveconfs, mapred_job_name, mapred_queue_priority, proxy_user
  • HiveStatsCollectionOperator: table, partition, ds, dttm

Hiveconf Jinja Translation

When hiveconf_jinja_translate=True, the operator automatically translates Hive-style variable substitution to Jinja2 templating:

  • ${var}{{ var }}
  • ${hiveconf:var}{{ var }}

Template File Extensions

HiveOperator automatically processes files with .hql and .sql extensions as templates, enabling external script management with Jinja2 variable substitution.

MapReduce Configuration

Queue Management

Configure Hadoop scheduler queues and priorities:

# Queue configuration options
HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW']

operator = HiveOperator(
    task_id='priority_job',
    hql='SELECT * FROM large_table',
    mapred_queue='analytics_high',
    mapred_queue_priority='HIGH',
    mapred_job_name='{{ dag.dag_id }}_{{ task.task_id }}_{{ ds }}',
    dag=dag
)

Job Naming Templates

Customize MapReduce job names for monitoring and debugging:

# Template supports: hostname, dag_id, task_id, execution_date
operator = HiveOperator(
    task_id='named_job',
    hql='CREATE TABLE summary AS SELECT * FROM source',
    mapred_job_name='ETL_{{ dag.dag_id }}_{{ task.task_id }}_{{ ts_nodash }}',
    dag=dag
)

Hive Configuration Parameters

Pass runtime configuration via hiveconfs parameter:

operator = HiveOperator(
    task_id='optimized_query',
    hql='SELECT * FROM partitioned_table WHERE ds = "{{ ds }}"',
    hiveconfs={
        'hive.exec.dynamic.partition': 'true',
        'hive.exec.dynamic.partition.mode': 'nonstrict',
        'hive.exec.max.dynamic.partitions': '10000',
        'hive.exec.compress.output': 'true',
        'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.SnappyCodec',
        'mapred.job.queue.name': 'analytics'
    },
    dag=dag
)

Error Handling and Monitoring

Task Termination

Both operators support graceful task termination through the on_kill() method, which cleanly stops running Hive jobs when tasks are cancelled or killed.

Script Processing

HiveOperator supports script preprocessing with script_begin_tag to skip setup sections and focus on core processing logic:

operator = HiveOperator(
    task_id='process_script',
    hql='full_etl_script.hql',
    script_begin_tag='-- MAIN PROCESSING BEGINS',
    dag=dag
)

Context Access

Both operators have full access to Airflow context variables for dynamic execution:

# Available context variables in templates
template_context = {
    'ds': '2024-01-01',  # Execution date
    'ds_nodash': '20240101',
    'ts': '2024-01-01T00:00:00+00:00',  # Timestamp
    'dag': dag_object,
    'task': task_object,
    'macros': airflow_macros,
    'params': user_defined_params
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-hive@9.1.1

docs

data-transfers.md

hooks-connections.md

index.md

macros-utilities.md

partition-monitoring.md

query-execution.md

tile.json