Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
Execute HQL scripts and queries with comprehensive support for templating, parameter substitution, MapReduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting detailed table statistics.
Primary operator for executing HQL code or Hive scripts within Airflow workflows with full templating and configuration support.
class HiveOperator:
template_fields: tuple[str, ...] = (
'hql', 'schema', 'hive_cli_conn_id', 'mapred_queue',
'hiveconfs', 'mapred_job_name', 'mapred_queue_priority', 'proxy_user'
)
template_ext: tuple[str, ...] = ('.hql', '.sql')
def __init__(
self,
*,
hql: str,
hive_cli_conn_id: str = 'hive_cli_default',
schema: str = 'default',
hiveconfs: dict | None = None,
hiveconf_jinja_translate: bool = False,
script_begin_tag: str | None = None,
mapred_queue: str | None = None,
mapred_queue_priority: str | None = None,
mapred_job_name: str | None = None,
hive_cli_params: str = '',
auth: str | None = None,
proxy_user: str | None = None,
**kwargs
): ...
def execute(self, context: 'Context') -> None: ...
def on_kill(self) -> None: ...Usage Examples:
from airflow.providers.apache.hive.operators.hive import HiveOperator
from datetime import datetime, timedelta
# Basic HQL execution
simple_query = HiveOperator(
task_id='simple_hive_query',
hql='''
SELECT COUNT(*) as record_count
FROM warehouse.sales
WHERE ds = '{{ ds }}';
''',
hive_cli_conn_id='hive_production',
schema='warehouse',
dag=dag
)
# Complex query with configuration
etl_process = HiveOperator(
task_id='daily_etl',
hql='''
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE warehouse.sales_summary
PARTITION (ds='{{ ds }}', region)
SELECT
product_id,
SUM(amount) as total_sales,
COUNT(*) as transaction_count,
AVG(amount) as avg_sale,
region
FROM warehouse.daily_sales
WHERE ds = '{{ ds }}'
GROUP BY product_id, region;
''',
hiveconfs={
'hive.exec.compress.output': 'true',
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
},
mapred_queue='analytics',
mapred_queue_priority='HIGH',
mapred_job_name='sales_etl_{{ ds }}',
dag=dag
)
# Execute script from file
script_execution = HiveOperator(
task_id='run_hive_script',
hql='process_customer_data.hql', # File path relative to DAG
hiveconf_jinja_translate=True,
script_begin_tag='-- BEGIN PROCESSING',
proxy_user='etl_service_account',
dag=dag
)
# Query with custom parameters
parameterized_query = HiveOperator(
task_id='parameterized_analysis',
hql='''
SELECT {{ params.metric_column }}
FROM {{ params.source_table }}
WHERE ds BETWEEN '{{ ds }}' AND '{{ macros.ds_add(ds, 7) }}'
AND region IN ({{ params.regions | join("','") | "'" + _0 + "'" }})
GROUP BY {{ params.group_by_columns | join(', ') }};
''',
params={
'metric_column': 'SUM(revenue) as total_revenue',
'source_table': 'warehouse.sales',
'regions': ['us', 'eu', 'asia'],
'group_by_columns': ['product_category', 'customer_segment']
},
dag=dag
)Specialized operator for gathering partition statistics using Presto queries and storing results in MySQL for monitoring and optimization.
class HiveStatsCollectionOperator:
template_fields: tuple[str, ...] = ('table', 'partition', 'ds', 'dttm')
def __init__(
self,
*,
table: str,
partition: Any,
extra_exprs: dict[str, Any] | None = None,
excluded_columns: list[str] | None = None,
assignment_func: callable | None = None,
metastore_conn_id: str = 'metastore_default',
presto_conn_id: str = 'presto_default',
mysql_conn_id: str = 'airflow_db',
ds: str = '{{ ds }}',
dttm: str = '{{ logical_date.isoformat() }}',
**kwargs
): ...
def execute(self, context: 'Context') -> None: ...
def get_default_exprs(self, col: str, col_type: str) -> dict: ...Usage Example:
from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator
# Basic statistics collection
collect_stats = HiveStatsCollectionOperator(
task_id='collect_table_stats',
table='warehouse.customer_transactions',
partition={'ds': '{{ ds }}', 'region': 'us'},
metastore_conn_id='metastore_prod',
presto_conn_id='presto_analytics',
mysql_conn_id='stats_db',
dag=dag
)
# Advanced statistics with custom expressions
advanced_stats = HiveStatsCollectionOperator(
task_id='advanced_stats_collection',
table='warehouse.sales_data',
partition={'ds': '{{ ds }}'},
extra_exprs={
'revenue_percentile_95': 'APPROX_PERCENTILE(revenue, 0.95)',
'unique_customers': 'COUNT(DISTINCT customer_id)',
'avg_order_value': 'AVG(order_total)',
'max_transaction_time': 'MAX(transaction_timestamp)'
},
excluded_columns=['raw_data_blob', 'customer_notes'],
dag=dag
)
# Custom column assignment function
def custom_assignment(col_name: str, col_type: str) -> dict | None:
if col_type.lower().startswith('varchar'):
return {
(col_name, 'max_length'): f'MAX(LENGTH({col_name}))',
(col_name, 'avg_length'): f'AVG(LENGTH({col_name}))'
}
elif col_type.lower() in ['int', 'bigint', 'double', 'decimal']:
return {
(col_name, 'min'): f'MIN({col_name})',
(col_name, 'max'): f'MAX({col_name})',
(col_name, 'avg'): f'AVG({col_name})',
(col_name, 'stddev'): f'STDDEV({col_name})'
}
return None
custom_stats = HiveStatsCollectionOperator(
task_id='custom_stats_collection',
table='warehouse.product_metrics',
partition={'ds': '{{ ds }}'},
assignment_func=custom_assignment,
dag=dag
)Both operators support extensive templating through Airflow's Jinja2 engine:
hql, schema, hive_cli_conn_id, mapred_queue, hiveconfs, mapred_job_name, mapred_queue_priority, proxy_usertable, partition, ds, dttmWhen hiveconf_jinja_translate=True, the operator automatically translates Hive-style variable substitution to Jinja2 templating:
${var} → {{ var }}${hiveconf:var} → {{ var }}HiveOperator automatically processes files with .hql and .sql extensions as templates, enabling external script management with Jinja2 variable substitution.
Configure Hadoop scheduler queues and priorities:
# Queue configuration options
HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW']
operator = HiveOperator(
task_id='priority_job',
hql='SELECT * FROM large_table',
mapred_queue='analytics_high',
mapred_queue_priority='HIGH',
mapred_job_name='{{ dag.dag_id }}_{{ task.task_id }}_{{ ds }}',
dag=dag
)Customize MapReduce job names for monitoring and debugging:
# Template supports: hostname, dag_id, task_id, execution_date
operator = HiveOperator(
task_id='named_job',
hql='CREATE TABLE summary AS SELECT * FROM source',
mapred_job_name='ETL_{{ dag.dag_id }}_{{ task.task_id }}_{{ ts_nodash }}',
dag=dag
)Pass runtime configuration via hiveconfs parameter:
operator = HiveOperator(
task_id='optimized_query',
hql='SELECT * FROM partitioned_table WHERE ds = "{{ ds }}"',
hiveconfs={
'hive.exec.dynamic.partition': 'true',
'hive.exec.dynamic.partition.mode': 'nonstrict',
'hive.exec.max.dynamic.partitions': '10000',
'hive.exec.compress.output': 'true',
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.SnappyCodec',
'mapred.job.queue.name': 'analytics'
},
dag=dag
)Both operators support graceful task termination through the on_kill() method, which cleanly stops running Hive jobs when tasks are cancelled or killed.
HiveOperator supports script preprocessing with script_begin_tag to skip setup sections and focus on core processing logic:
operator = HiveOperator(
task_id='process_script',
hql='full_etl_script.hql',
script_begin_tag='-- MAIN PROCESSING BEGINS',
dag=dag
)Both operators have full access to Airflow context variables for dynamic execution:
# Available context variables in templates
template_context = {
'ds': '2024-01-01', # Execution date
'ds_nodash': '20240101',
'ts': '2024-01-01T00:00:00+00:00', # Timestamp
'dag': dag_object,
'task': task_object,
'macros': airflow_macros,
'params': user_defined_params
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hive