Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
The Apache Airflow Hive provider package enables seamless integration between Apache Airflow and Apache Hive, providing comprehensive data warehouse connectivity and orchestration capabilities. This provider offers a complete suite of operators, hooks, sensors, and transfer operators for executing Hive queries, monitoring partitions, transferring data between systems, and managing Hive Metastore operations within Airflow workflows.
pip install apache-airflow-providers-apache-hive# Hook imports for connecting to Hive services
from airflow.providers.apache.hive.hooks.hive import HiveCliHook, HiveMetastoreHook, HiveServer2Hook
# Operator imports for executing tasks
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator
# Sensor imports for monitoring conditions
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
from airflow.providers.apache.hive.sensors.metastore_partition import MetastorePartitionSensor
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
# Transfer operator imports for data movement
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator
from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator
# Additional transfer operators available
# Macro imports for template functions
from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partitionfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
# Define DAG
dag = DAG(
'hive_example',
default_args={
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Example Hive data processing pipeline',
schedule_interval=timedelta(days=1),
catchup=False,
)
# Wait for partition to be available
wait_for_partition = HivePartitionSensor(
task_id='wait_for_partition',
table='warehouse.daily_sales',
partition="ds='{{ ds }}'",
metastore_conn_id='hive_metastore_default',
poke_interval=300,
timeout=3600,
dag=dag,
)
# Execute Hive query
process_data = HiveOperator(
task_id='process_daily_sales',
hql='''
INSERT OVERWRITE TABLE warehouse.sales_summary
PARTITION (ds='{{ ds }}')
SELECT
region,
product_category,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM warehouse.daily_sales
WHERE ds='{{ ds }}'
GROUP BY region, product_category;
''',
hive_cli_conn_id='hive_cli_default',
schema='warehouse',
dag=dag,
)
# Set task dependencies
wait_for_partition >> process_dataThe provider is organized around three main connection types and corresponding hooks:
Key components include:
Core connectivity to Hive services through CLI, HiveServer2, and Metastore interfaces. Provides connection management, query execution, and metadata operations with support for authentication, connection pooling, and configuration management.
class HiveCliHook:
def __init__(self, hive_cli_conn_id: str = 'hive_cli_default', **kwargs): ...
def run_cli(self, hql: str, schema: str = 'default') -> None: ...
def test_hql(self, hql: str) -> None: ...
def load_file(self, filepath: str, table: str, **kwargs) -> None: ...
class HiveMetastoreHook:
def __init__(self, metastore_conn_id: str = 'metastore_default'): ...
def check_for_partition(self, schema: str, table: str, partition: str) -> bool: ...
def get_table(self, schema: str, table_name: str) -> Any: ...
def get_partitions(self, schema: str, table_name: str, **kwargs) -> list: ...
class HiveServer2Hook:
def __init__(self, hiveserver2_conn_id: str = 'hiveserver2_default', **kwargs): ...
def get_conn(self, schema: str = None) -> Any: ...
def get_pandas_df(self, sql: str, parameters: list = None, **kwargs) -> 'pd.DataFrame': ...Execute HQL scripts and queries with support for templating, parameter substitution, mapreduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting table statistics.
class HiveOperator:
def __init__(self, *, hql: str, hive_cli_conn_id: str = 'hive_cli_default', **kwargs): ...
def execute(self, context: 'Context') -> None: ...
class HiveStatsCollectionOperator:
def __init__(self, *, table: str, partition: Any, **kwargs): ...
def execute(self, context: 'Context') -> None: ...Monitor Hive table partitions with flexible sensors for waiting on partition availability. Supports general partition filters, named partitions, and direct metastore queries for efficient partition detection.
class HivePartitionSensor:
def __init__(self, *, table: str, partition: str = "ds='{{ ds }}'", **kwargs): ...
def poke(self, context: 'Context') -> bool: ...
class NamedHivePartitionSensor:
def __init__(self, *, partition_names: list[str], **kwargs): ...
def poke(self, context: 'Context') -> bool: ...
class MetastorePartitionSensor:
def __init__(self, *, table: str, partition_name: str, **kwargs): ...
def poke(self, context: 'Context') -> bool: ...Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. Provides bidirectional data movement with transformation and format conversion capabilities.
class MySqlToHiveOperator:
def __init__(self, *, sql: str, table: str, **kwargs): ...
class S3ToHiveOperator:
def __init__(self, *, s3_source_key: str, table: str, **kwargs): ...
class HiveToMySqlOperator:
def __init__(self, *, sql: str, mysql_table: str, **kwargs): ...
# Additional transfer operators: MsSqlToHiveOperator, VerticaToHiveOperator, HiveToSambaOperatorTemplate functions for partition discovery, date-based partition selection, and metadata queries. Includes utilities for finding maximum partitions and closest date partitions for dynamic task execution.
def max_partition(table: str, schema: str = 'default', field: str = None, **kwargs) -> str: ...
def closest_ds_partition(table: str, ds: str, before: bool = True, **kwargs) -> str | None: ...Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hive