CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-hive

Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.

Overview
Eval results
Files

hooks-connections.mddocs/

Hooks and Connections

Core connectivity to Hive services through CLI, HiveServer2, and Metastore interfaces. The provider supports three distinct connection types, each optimized for different use cases and operational requirements.

Capabilities

HiveCLI Hook

Wrapper around the Hive command-line interface, supporting both traditional CLI and Beeline (JDBC-based lightweight CLI).

class HiveCliHook:
    conn_name_attr: str = "hive_cli_conn_id"
    default_conn_name: str = "hive_cli_default"
    conn_type: str = "hive_cli"
    hook_name: str = "Hive Client Wrapper"
    
    def __init__(
        self,
        hive_cli_conn_id: str = "hive_cli_default",
        mapred_queue: str | None = None,
        mapred_queue_priority: str | None = None,
        mapred_job_name: str | None = None,
        hive_cli_params: str = "",
        auth: str | None = None,
        proxy_user: str | None = None,
    ) -> None: ...
    
    @classmethod
    def get_connection_form_widgets(cls) -> dict[str, Any]: ...
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]: ...
    
    def run_cli(
        self, 
        hql: str, 
        schema: str | None = None, 
        verbose: bool = True, 
        hive_conf: dict[Any, Any] | None = None
    ) -> Any: ...
    
    def test_hql(self, hql: str) -> None: ...
    
    def load_df(
        self,
        df: 'pd.DataFrame',
        table: str,
        field_dict: dict[Any, Any] | None = None,
        delimiter: str = ",",
        encoding: str = "utf8",
        pandas_kwargs: Any = None,
        **kwargs: Any
    ) -> None: ...
    
    def load_file(
        self,
        filepath: str,
        table: str,
        delimiter: str = ",",
        field_dict: dict[Any, Any] | None = None,
        create: bool = True,
        overwrite: bool = True,
        partition: dict[str, Any] | None = None,
        recreate: bool = False,
        tblproperties: dict[str, Any] | None = None
    ) -> None: ...
    
    def kill(self) -> None: ...

Usage Example:

from airflow.providers.apache.hive.hooks.hive import HiveCliHook

# Initialize hook with custom configuration
hook = HiveCliHook(
    hive_cli_conn_id='hive_production',
    mapred_queue='high_priority',
    mapred_queue_priority='HIGH',
    mapred_job_name='daily_etl_{{ ds }}'
)

# Execute HQL commands
hook.run_cli("""
    CREATE TABLE IF NOT EXISTS sales_temp AS 
    SELECT * FROM sales WHERE ds = '2024-01-01'
""")

# Test HQL syntax before execution
hook.test_hql("SELECT COUNT(*) FROM sales WHERE ds = '{{ ds }}'")

# Load pandas DataFrame to Hive table
import pandas as pd
df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})
hook.load_df(df, table='test_table', create=True)

Hive Metastore Hook

Interface to Hive Metastore for metadata operations, partition management, and table introspection via Thrift protocol.

class HiveMetastoreHook:
    MAX_PART_COUNT: int = 32767
    
    def __init__(self, metastore_conn_id: str = "metastore_default"): ...
    
    def get_conn(self) -> Any: ...
    def get_metastore_client(self) -> Any: ...
    def check_for_partition(self, schema: str, table: str, partition: str) -> bool: ...
    def check_for_named_partition(self, schema: str, table: str, partition_name: str) -> Any: ...
    def get_table(self, table_name: str, db: str = "default") -> Any: ...
    def get_tables(self, db: str, pattern: str = "*") -> Any: ...
    def get_databases(self, pattern: str = "*") -> Any: ...
    def get_partitions(
        self,
        schema: str,
        table_name: str,
        partition_filter: str | None = None
    ) -> list[Any]: ...
    def max_partition(
        self,
        schema: str,
        table_name: str,
        field: str | None = None,
        filter_map: dict[Any, Any] | None = None
    ) -> Any: ...
    def table_exists(self, table_name: str, db: str = "default") -> bool: ...
    def drop_partitions(
        self,
        table_name: str,
        part_vals: Any,
        delete_data: bool = False,
        db: str = "default"
    ) -> Any: ...

Usage Example:

from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook

# Initialize metastore hook
metastore = HiveMetastoreHook('metastore_production')

# Check if table exists
if metastore.table_exists('sales', 'warehouse'):
    print("Table exists")

# Get table partitions
partitions = metastore.get_partitions('warehouse', 'sales')
print(f"Found {len(partitions)} partitions")

# Check for specific partition
has_partition = metastore.check_for_partition(
    'warehouse', 
    'sales', 
    "ds='2024-01-01' AND region='us'"
)

# Find maximum partition value
max_ds = metastore.max_partition(
    schema='warehouse',
    table_name='sales',
    field='ds',
    filter_map={'region': 'us'}
)

HiveServer2 Hook

Database API-compatible hook for HiveServer2 connections using pyhive library, supporting SQL query execution and result retrieval.

class HiveServer2Hook:
    def __init__(
        self,
        hiveserver2_conn_id: str = "hiveserver2_default",
        schema: str | None = None
    ) -> None: ...
    
    def get_conn(self, schema: str | None = None) -> Any: ...
    
    def get_results(
        self,
        sql: str | list[str],
        schema: str = "default",
        fetch_size: int | None = None,
        hive_conf: Iterable | Mapping | None = None
    ) -> dict[str, Any]: ...
    
    def to_csv(
        self,
        sql: str,
        csv_filepath: str,
        schema: str = "default",
        delimiter: str = ",",
        lineterminator: str = "\r\n",
        output_header: bool = True,
        fetch_size: int = 1000,
        hive_conf: dict[Any, Any] | None = None
    ) -> None: ...
    
    def get_records(
        self,
        sql: str | list[str],
        parameters: Iterable | Mapping[str, Any] | None = None,
        **kwargs
    ) -> Any: ...
    
    def get_df(
        self,
        sql: str,
        schema: str = "default",
        hive_conf: dict[Any, Any] | None = None,
        *,
        df_type: Literal["pandas", "polars"] = "pandas",
        **kwargs
    ) -> 'pd.DataFrame | pl.DataFrame': ...
    
    def get_pandas_df(
        self,
        sql: str,
        schema: str = "default",
        hive_conf: dict[Any, Any] | None = None,
        **kwargs
    ) -> 'pd.DataFrame': ...

Usage Example:

from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
import pandas as pd

# Initialize HiveServer2 hook
hook = HiveServer2Hook('hiveserver2_analytics', schema='warehouse')

# Execute query and get pandas DataFrame
df = hook.get_pandas_df("""
    SELECT region, SUM(amount) as total_sales
    FROM sales 
    WHERE ds = %s
    GROUP BY region
""", parameters=['2024-01-01'])

# Get all records as list
records = hook.get_records("SELECT region, SUM(amount) FROM sales WHERE ds = %s GROUP BY region", ['2024-01-01'])

# Export query results to CSV
hook.to_csv(
    sql="SELECT * FROM sales WHERE ds = %s",
    csv_filepath='/tmp/sales_export.csv',
    parameters=['2024-01-01'],
    schema='warehouse',
    delimiter=',',
    output_header=True
)

# Get results with custom fetch size and hive configuration
results = hook.get_results(
    sql=["SET hive.exec.dynamic.partition=true", "SELECT * FROM sales WHERE ds = %s"],
    schema='warehouse',
    fetch_size=5000,
    hive_conf={'mapred.job.queue.name': 'analytics'}
)

# Get DataFrame with Polars (if available)
polars_df = hook.get_df(
    sql="SELECT region, COUNT(*) as count FROM sales GROUP BY region",
    schema='warehouse',
    df_type='polars'
)

Connection Configuration

Connection Types

The provider supports three connection types in Airflow's connection management:

  1. hive_cli - For HiveCliHook connections
  2. hiveserver2 - For HiveServer2Hook connections
  3. hive_metastore - For HiveMetastoreHook connections

Configuration Options

Available configuration options under [hive] section in Airflow configuration:

  • default_hive_mapred_queue: Default MapReduce queue for HiveOperator tasks
  • mapred_job_name_template: Template for MapReduce job names supporting hostname, dag_id, task_id, execution_date

Authentication and Security

All hooks support various authentication mechanisms:

  • Basic authentication - Username/password via connection configuration
  • Kerberos authentication - Via connection extras and system configuration
  • Custom authentication - Via auth parameter in connection extras
  • Proxy user execution - Via proxy_user parameter for impersonation

Connection Extras

Connection extras support additional parameters:

HiveCliHook Connection Extras:

  • use_beeline: Enable Beeline instead of traditional Hive CLI (boolean, default: true)
  • proxy_user: Run HQL code as this user (string)
  • principal: Kerberos principal (string, default: "hive/_HOST@EXAMPLE.COM")
  • high_availability: Enable high availability mode (boolean, default: false)
  • auth: Authentication mechanism for JDBC connection string (string)
  • hive_cli_params: Additional CLI parameters (string)

HiveServer2Hook Connection Extras:

  • auth_mechanism: Override default authentication mechanism (string)
  • kerberos_service_name: Kerberos service name (string, default: "hive")
  • database: Default database schema (string)

HiveMetastoreHook Connection Extras:

  • authMechanism: Thrift authentication mechanism (string)
  • use_ssl: Enable SSL connection (boolean, default: false)

Configuration Constants

Available in airflow.providers.apache.hive.hooks.hive:

HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"]

Helper Functions

def get_context_from_env_var() -> dict[Any, Any]: ...

Extract context from environment variables for use in BashOperator and PythonOperator. This function retrieves Airflow context information (dag_id, task_id, etc.) from environment variables using the AIRFLOW_VAR_NAME_FORMAT_MAPPING configuration.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-hive@9.1.1

docs

data-transfers.md

hooks-connections.md

index.md

macros-utilities.md

partition-monitoring.md

query-execution.md

tile.json