Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
Core connectivity to Hive services through CLI, HiveServer2, and Metastore interfaces. The provider supports three distinct connection types, each optimized for different use cases and operational requirements.
Wrapper around the Hive command-line interface, supporting both traditional CLI and Beeline (JDBC-based lightweight CLI).
class HiveCliHook:
conn_name_attr: str = "hive_cli_conn_id"
default_conn_name: str = "hive_cli_default"
conn_type: str = "hive_cli"
hook_name: str = "Hive Client Wrapper"
def __init__(
self,
hive_cli_conn_id: str = "hive_cli_default",
mapred_queue: str | None = None,
mapred_queue_priority: str | None = None,
mapred_job_name: str | None = None,
hive_cli_params: str = "",
auth: str | None = None,
proxy_user: str | None = None,
) -> None: ...
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]: ...
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]: ...
def run_cli(
self,
hql: str,
schema: str | None = None,
verbose: bool = True,
hive_conf: dict[Any, Any] | None = None
) -> Any: ...
def test_hql(self, hql: str) -> None: ...
def load_df(
self,
df: 'pd.DataFrame',
table: str,
field_dict: dict[Any, Any] | None = None,
delimiter: str = ",",
encoding: str = "utf8",
pandas_kwargs: Any = None,
**kwargs: Any
) -> None: ...
def load_file(
self,
filepath: str,
table: str,
delimiter: str = ",",
field_dict: dict[Any, Any] | None = None,
create: bool = True,
overwrite: bool = True,
partition: dict[str, Any] | None = None,
recreate: bool = False,
tblproperties: dict[str, Any] | None = None
) -> None: ...
def kill(self) -> None: ...Usage Example:
from airflow.providers.apache.hive.hooks.hive import HiveCliHook
# Initialize hook with custom configuration
hook = HiveCliHook(
hive_cli_conn_id='hive_production',
mapred_queue='high_priority',
mapred_queue_priority='HIGH',
mapred_job_name='daily_etl_{{ ds }}'
)
# Execute HQL commands
hook.run_cli("""
CREATE TABLE IF NOT EXISTS sales_temp AS
SELECT * FROM sales WHERE ds = '2024-01-01'
""")
# Test HQL syntax before execution
hook.test_hql("SELECT COUNT(*) FROM sales WHERE ds = '{{ ds }}'")
# Load pandas DataFrame to Hive table
import pandas as pd
df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})
hook.load_df(df, table='test_table', create=True)Interface to Hive Metastore for metadata operations, partition management, and table introspection via Thrift protocol.
class HiveMetastoreHook:
MAX_PART_COUNT: int = 32767
def __init__(self, metastore_conn_id: str = "metastore_default"): ...
def get_conn(self) -> Any: ...
def get_metastore_client(self) -> Any: ...
def check_for_partition(self, schema: str, table: str, partition: str) -> bool: ...
def check_for_named_partition(self, schema: str, table: str, partition_name: str) -> Any: ...
def get_table(self, table_name: str, db: str = "default") -> Any: ...
def get_tables(self, db: str, pattern: str = "*") -> Any: ...
def get_databases(self, pattern: str = "*") -> Any: ...
def get_partitions(
self,
schema: str,
table_name: str,
partition_filter: str | None = None
) -> list[Any]: ...
def max_partition(
self,
schema: str,
table_name: str,
field: str | None = None,
filter_map: dict[Any, Any] | None = None
) -> Any: ...
def table_exists(self, table_name: str, db: str = "default") -> bool: ...
def drop_partitions(
self,
table_name: str,
part_vals: Any,
delete_data: bool = False,
db: str = "default"
) -> Any: ...Usage Example:
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
# Initialize metastore hook
metastore = HiveMetastoreHook('metastore_production')
# Check if table exists
if metastore.table_exists('sales', 'warehouse'):
print("Table exists")
# Get table partitions
partitions = metastore.get_partitions('warehouse', 'sales')
print(f"Found {len(partitions)} partitions")
# Check for specific partition
has_partition = metastore.check_for_partition(
'warehouse',
'sales',
"ds='2024-01-01' AND region='us'"
)
# Find maximum partition value
max_ds = metastore.max_partition(
schema='warehouse',
table_name='sales',
field='ds',
filter_map={'region': 'us'}
)Database API-compatible hook for HiveServer2 connections using pyhive library, supporting SQL query execution and result retrieval.
class HiveServer2Hook:
def __init__(
self,
hiveserver2_conn_id: str = "hiveserver2_default",
schema: str | None = None
) -> None: ...
def get_conn(self, schema: str | None = None) -> Any: ...
def get_results(
self,
sql: str | list[str],
schema: str = "default",
fetch_size: int | None = None,
hive_conf: Iterable | Mapping | None = None
) -> dict[str, Any]: ...
def to_csv(
self,
sql: str,
csv_filepath: str,
schema: str = "default",
delimiter: str = ",",
lineterminator: str = "\r\n",
output_header: bool = True,
fetch_size: int = 1000,
hive_conf: dict[Any, Any] | None = None
) -> None: ...
def get_records(
self,
sql: str | list[str],
parameters: Iterable | Mapping[str, Any] | None = None,
**kwargs
) -> Any: ...
def get_df(
self,
sql: str,
schema: str = "default",
hive_conf: dict[Any, Any] | None = None,
*,
df_type: Literal["pandas", "polars"] = "pandas",
**kwargs
) -> 'pd.DataFrame | pl.DataFrame': ...
def get_pandas_df(
self,
sql: str,
schema: str = "default",
hive_conf: dict[Any, Any] | None = None,
**kwargs
) -> 'pd.DataFrame': ...Usage Example:
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
import pandas as pd
# Initialize HiveServer2 hook
hook = HiveServer2Hook('hiveserver2_analytics', schema='warehouse')
# Execute query and get pandas DataFrame
df = hook.get_pandas_df("""
SELECT region, SUM(amount) as total_sales
FROM sales
WHERE ds = %s
GROUP BY region
""", parameters=['2024-01-01'])
# Get all records as list
records = hook.get_records("SELECT region, SUM(amount) FROM sales WHERE ds = %s GROUP BY region", ['2024-01-01'])
# Export query results to CSV
hook.to_csv(
sql="SELECT * FROM sales WHERE ds = %s",
csv_filepath='/tmp/sales_export.csv',
parameters=['2024-01-01'],
schema='warehouse',
delimiter=',',
output_header=True
)
# Get results with custom fetch size and hive configuration
results = hook.get_results(
sql=["SET hive.exec.dynamic.partition=true", "SELECT * FROM sales WHERE ds = %s"],
schema='warehouse',
fetch_size=5000,
hive_conf={'mapred.job.queue.name': 'analytics'}
)
# Get DataFrame with Polars (if available)
polars_df = hook.get_df(
sql="SELECT region, COUNT(*) as count FROM sales GROUP BY region",
schema='warehouse',
df_type='polars'
)The provider supports three connection types in Airflow's connection management:
Available configuration options under [hive] section in Airflow configuration:
default_hive_mapred_queue: Default MapReduce queue for HiveOperator tasksmapred_job_name_template: Template for MapReduce job names supporting hostname, dag_id, task_id, execution_dateAll hooks support various authentication mechanisms:
auth parameter in connection extrasproxy_user parameter for impersonationConnection extras support additional parameters:
HiveCliHook Connection Extras:
use_beeline: Enable Beeline instead of traditional Hive CLI (boolean, default: true)proxy_user: Run HQL code as this user (string)principal: Kerberos principal (string, default: "hive/_HOST@EXAMPLE.COM")high_availability: Enable high availability mode (boolean, default: false)auth: Authentication mechanism for JDBC connection string (string)hive_cli_params: Additional CLI parameters (string)HiveServer2Hook Connection Extras:
auth_mechanism: Override default authentication mechanism (string)kerberos_service_name: Kerberos service name (string, default: "hive")database: Default database schema (string)HiveMetastoreHook Connection Extras:
authMechanism: Thrift authentication mechanism (string)use_ssl: Enable SSL connection (boolean, default: false)Available in airflow.providers.apache.hive.hooks.hive:
HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"]def get_context_from_env_var() -> dict[Any, Any]: ...Extract context from environment variables for use in BashOperator and PythonOperator. This function retrieves Airflow context information (dag_id, task_id, etc.) from environment variables using the AIRFLOW_VAR_NAME_FORMAT_MAPPING configuration.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hive