Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-hive@9.1.0The Apache Airflow Hive provider package enables seamless integration between Apache Airflow and Apache Hive, providing comprehensive data warehouse connectivity and orchestration capabilities. This provider offers a complete suite of operators, hooks, sensors, and transfer operators for executing Hive queries, monitoring partitions, transferring data between systems, and managing Hive Metastore operations within Airflow workflows.
pip install apache-airflow-providers-apache-hive# Hook imports for connecting to Hive services
from airflow.providers.apache.hive.hooks.hive import HiveCliHook, HiveMetastoreHook, HiveServer2Hook
# Operator imports for executing tasks
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator
# Sensor imports for monitoring conditions
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
from airflow.providers.apache.hive.sensors.metastore_partition import MetastorePartitionSensor
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
# Transfer operator imports for data movement
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator
from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator
# Additional transfer operators available
# Macro imports for template functions
from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partitionfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
# Define DAG
dag = DAG(
'hive_example',
default_args={
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Example Hive data processing pipeline',
schedule_interval=timedelta(days=1),
catchup=False,
)
# Wait for partition to be available
wait_for_partition = HivePartitionSensor(
task_id='wait_for_partition',
table='warehouse.daily_sales',
partition="ds='{{ ds }}'",
metastore_conn_id='hive_metastore_default',
poke_interval=300,
timeout=3600,
dag=dag,
)
# Execute Hive query
process_data = HiveOperator(
task_id='process_daily_sales',
hql='''
INSERT OVERWRITE TABLE warehouse.sales_summary
PARTITION (ds='{{ ds }}')
SELECT
region,
product_category,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM warehouse.daily_sales
WHERE ds='{{ ds }}'
GROUP BY region, product_category;
''',
hive_cli_conn_id='hive_cli_default',
schema='warehouse',
dag=dag,
)
# Set task dependencies
wait_for_partition >> process_dataThe provider is organized around three main connection types and corresponding hooks:
Key components include:
Core connectivity to Hive services through CLI, HiveServer2, and Metastore interfaces. Provides connection management, query execution, and metadata operations with support for authentication, connection pooling, and configuration management.
class HiveCliHook:
def __init__(self, hive_cli_conn_id: str = 'hive_cli_default', **kwargs): ...
def run_cli(self, hql: str, schema: str = 'default') -> None: ...
def test_hql(self, hql: str) -> None: ...
def load_file(self, filepath: str, table: str, **kwargs) -> None: ...
class HiveMetastoreHook:
def __init__(self, metastore_conn_id: str = 'metastore_default'): ...
def check_for_partition(self, schema: str, table: str, partition: str) -> bool: ...
def get_table(self, schema: str, table_name: str) -> Any: ...
def get_partitions(self, schema: str, table_name: str, **kwargs) -> list: ...
class HiveServer2Hook:
def __init__(self, hiveserver2_conn_id: str = 'hiveserver2_default', **kwargs): ...
def get_conn(self, schema: str = None) -> Any: ...
def get_pandas_df(self, sql: str, parameters: list = None, **kwargs) -> 'pd.DataFrame': ...Execute HQL scripts and queries with support for templating, parameter substitution, mapreduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting table statistics.
class HiveOperator:
def __init__(self, *, hql: str, hive_cli_conn_id: str = 'hive_cli_default', **kwargs): ...
def execute(self, context: 'Context') -> None: ...
class HiveStatsCollectionOperator:
def __init__(self, *, table: str, partition: Any, **kwargs): ...
def execute(self, context: 'Context') -> None: ...Monitor Hive table partitions with flexible sensors for waiting on partition availability. Supports general partition filters, named partitions, and direct metastore queries for efficient partition detection.
class HivePartitionSensor:
def __init__(self, *, table: str, partition: str = "ds='{{ ds }}'", **kwargs): ...
def poke(self, context: 'Context') -> bool: ...
class NamedHivePartitionSensor:
def __init__(self, *, partition_names: list[str], **kwargs): ...
def poke(self, context: 'Context') -> bool: ...
class MetastorePartitionSensor:
def __init__(self, *, table: str, partition_name: str, **kwargs): ...
def poke(self, context: 'Context') -> bool: ...Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. Provides bidirectional data movement with transformation and format conversion capabilities.
class MySqlToHiveOperator:
def __init__(self, *, sql: str, table: str, **kwargs): ...
class S3ToHiveOperator:
def __init__(self, *, s3_source_key: str, table: str, **kwargs): ...
class HiveToMySqlOperator:
def __init__(self, *, sql: str, mysql_table: str, **kwargs): ...
# Additional transfer operators: MsSqlToHiveOperator, VerticaToHiveOperator, HiveToSambaOperatorTemplate functions for partition discovery, date-based partition selection, and metadata queries. Includes utilities for finding maximum partitions and closest date partitions for dynamic task execution.
def max_partition(table: str, schema: str = 'default', field: str = None, **kwargs) -> str: ...
def closest_ds_partition(table: str, ds: str, before: bool = True, **kwargs) -> str | None: ...