CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-hive

Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.

Overview
Eval results
Files

macros-utilities.mddocs/

Macros and Utilities

Template functions for partition discovery, date-based partition selection, and metadata queries. These utilities enable dynamic task execution with intelligent partition handling and date-based logic for complex data processing workflows.

Capabilities

Partition Discovery Functions

Template functions for finding partitions dynamically based on various criteria and filters.

def max_partition(
    table: str,
    schema: str = 'default',
    field: str | None = None,
    filter_map: dict | None = None,
    metastore_conn_id: str = 'metastore_default'
) -> str: ...

def closest_ds_partition(
    table: str,
    ds: str,
    before: bool = True,
    schema: str = 'default', 
    metastore_conn_id: str = 'metastore_default'
) -> str | None: ...

Usage Examples:

from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition

# Use in DAG templates - these functions are automatically available in Jinja templates
dag = DAG(
    'partition_macro_example',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    user_defined_macros={
        'max_partition': max_partition,
        'closest_ds_partition': closest_ds_partition
    }
)

# HiveOperator using max_partition macro
process_latest = HiveOperator(
    task_id='process_latest_partition',
    hql='''
        INSERT OVERWRITE TABLE warehouse.daily_summary
        PARTITION (ds='{{ ds }}')
        SELECT region, SUM(amount) as total_sales
        FROM warehouse.raw_sales
        WHERE ds = '{{ max_partition('warehouse.raw_sales') }}'
        GROUP BY region;
    ''',
    dag=dag
)

# Using closest_ds_partition for backfill scenarios
backfill_process = HiveOperator(
    task_id='backfill_missing_data',
    hql='''
        INSERT OVERWRITE TABLE warehouse.filled_data
        PARTITION (ds='{{ ds }}')
        SELECT *
        FROM warehouse.source_data
        WHERE ds = '{{ closest_ds_partition('warehouse.source_data', ds, True) }}';
    ''',
    dag=dag
)

Maximum Partition Discovery

Find the maximum partition value for a table, with optional filtering and field specification.

Function Details:

  • Purpose: Get the maximum partition value from a Hive table
  • Returns: String representation of the maximum partition value
  • Use Cases: Finding latest available data, determining processing boundaries

Parameters:

  • table: Table name (supports dot notation like "schema.table")
  • schema: Schema name (default: "default", ignored if table uses dot notation)
  • field: Specific partition field to get max value from (inferred if only one partition field)
  • filter_map: Partition key-value filters to narrow search scope
  • metastore_conn_id: Metastore connection identifier

Usage Examples:

# Template usage in HiveOperator
latest_data_processor = HiveOperator(
    task_id='process_latest_data',
    hql='''
        -- Process the most recent partition available
        CREATE TABLE temp_latest_data AS
        SELECT *
        FROM warehouse.transaction_log
        WHERE ds = '{{ max_partition("warehouse.transaction_log") }}';
        
        -- Aggregate the latest data
        INSERT OVERWRITE TABLE warehouse.daily_metrics
        PARTITION (ds='{{ ds }}')
        SELECT 
            transaction_type,
            COUNT(*) as transaction_count,
            SUM(amount) as total_amount
        FROM temp_latest_data
        GROUP BY transaction_type;
    ''',
    dag=dag
)

# With field specification for multi-partition tables
region_processor = HiveOperator(
    task_id='process_by_region',
    hql='''
        -- Find latest partition for each region
        INSERT OVERWRITE TABLE warehouse.regional_summary
        PARTITION (ds='{{ ds }}')
        SELECT 
            region,
            MAX(processing_timestamp) as latest_processing,
            COUNT(*) as record_count
        FROM warehouse.regional_data
        WHERE ds = '{{ max_partition("warehouse.regional_data", field="ds") }}'
        GROUP BY region;
    ''',
    dag=dag
)

# With filtering to find max within specific criteria
filtered_processor = HiveOperator(
    task_id='process_filtered_max',
    hql='''
        -- Process latest US data only
        SELECT *
        FROM warehouse.global_sales
        WHERE ds = '{{ max_partition("warehouse.global_sales", filter_map={"region": "us"}) }}'
          AND region = 'us';
    ''',
    dag=dag
)

# Direct Python usage in custom operators
def get_latest_partition_date(**context):
    from airflow.providers.apache.hive.macros.hive import max_partition
    
    latest_partition = max_partition(
        table='warehouse.daily_events',
        schema='analytics',
        metastore_conn_id='metastore_prod'
    )
    
    context['task_instance'].xcom_push(key='latest_partition', value=latest_partition)
    return latest_partition

get_partition_task = PythonOperator(
    task_id='get_latest_partition',
    python_callable=get_latest_partition_date,
    dag=dag
)

Closest Date Partition Discovery

Find the date partition closest to a target date, with options for finding the closest before, after, or on either side.

Function Details:

  • Purpose: Find the date partition closest to a target date
  • Returns: Date string in YYYY-MM-DD format or None if no partitions found
  • Use Cases: Backfilling missing data, finding nearest available data for analysis

Parameters:

  • table: Table name (supports dot notation)
  • ds: Target date string in YYYY-MM-DD format
  • before: True for closest before target, False for closest after, None for closest either side
  • schema: Schema name (default: "default")
  • metastore_conn_id: Metastore connection identifier

Usage Examples:

# Backfill with closest available data
backfill_operator = HiveOperator(
    task_id='backfill_missing_reports',
    hql='''
        -- Use closest previous date if current date partition doesn't exist
        INSERT OVERWRITE TABLE warehouse.daily_reports
        PARTITION (ds='{{ ds }}')
        SELECT 
            report_type,
            metrics,
            '{{ ds }}' as target_date,
            ds as source_date,
            CASE 
                WHEN ds = '{{ ds }}' THEN 'exact_match'
                ELSE 'backfilled'
            END as data_source_type
        FROM warehouse.source_reports
        WHERE ds = COALESCE(
            (SELECT ds FROM warehouse.source_reports WHERE ds = '{{ ds }}' LIMIT 1),
            '{{ closest_ds_partition("warehouse.source_reports", ds, True) }}'
        );
    ''',
    dag=dag
)

# Forward-fill with next available data
forward_fill = HiveOperator(
    task_id='forward_fill_predictions',
    hql='''
        -- Use next available date for forecasting
        INSERT OVERWRITE TABLE warehouse.forecast_data
        PARTITION (ds='{{ ds }}')
        SELECT 
            model_id,
            prediction_value,
            confidence_score,
            '{{ ds }}' as forecast_date
        FROM warehouse.model_predictions
        WHERE ds = '{{ closest_ds_partition("warehouse.model_predictions", ds, False) }}';
    ''',
    dag=dag
)

# Flexible closest match (either direction)
flexible_match = HiveOperator(
    task_id='flexible_data_match',
    hql='''
        -- Use closest available data from either direction
        CREATE TEMPORARY TABLE closest_match AS
        SELECT *
        FROM warehouse.reference_data
        WHERE ds = '{{ closest_ds_partition("warehouse.reference_data", ds, None) }}';
        
        -- Join with current processing data
        INSERT OVERWRITE TABLE warehouse.enriched_data
        PARTITION (ds='{{ ds }}')
        SELECT 
            p.transaction_id,
            p.amount,
            r.reference_rate,
            p.amount * r.reference_rate as converted_amount
        FROM warehouse.daily_transactions p
        LEFT JOIN closest_match r ON p.currency_code = r.currency_code
        WHERE p.ds = '{{ ds }}';
    ''',
    dag=dag
)

# Conditional processing based on data availability
conditional_processor = HiveOperator(
    task_id='conditional_data_processing',
    hql='''
        -- Different processing logic based on data freshness
        {% set closest_date = closest_ds_partition("warehouse.external_data", ds, True) %}
        {% set days_old = (ds | as_datetime - closest_date | as_datetime).days %}
        
        INSERT OVERWRITE TABLE warehouse.processed_data
        PARTITION (ds='{{ ds }}')
        SELECT 
            *,
            {% if days_old <= 1 %}
                'current' as data_freshness
            {% elif days_old <= 7 %}
                'recent' as data_freshness  
            {% else %}
                'stale' as data_freshness
            {% endif %}
        FROM warehouse.external_data
        WHERE ds = '{{ closest_date }}';
    ''',
    dag=dag
)

# Python usage for complex logic
def determine_processing_strategy(**context):
    from airflow.providers.apache.hive.macros.hive import closest_ds_partition
    
    target_date = context['ds']
    
    # Check multiple data sources
    sources = ['warehouse.source_a', 'warehouse.source_b', 'warehouse.source_c']
    available_dates = {}
    
    for source in sources:
        closest_date = closest_ds_partition(
            table=source,
            ds=target_date,
            before=True,
            metastore_conn_id='metastore_prod'
        )
        available_dates[source] = closest_date
    
    # Determine processing strategy based on data availability
    if all(date == target_date for date in available_dates.values()):
        strategy = 'full_processing'
    elif any(date == target_date for date in available_dates.values()):
        strategy = 'partial_processing'
    else:
        strategy = 'backfill_processing'
    
    context['task_instance'].xcom_push(key='processing_strategy', value=strategy)
    context['task_instance'].xcom_push(key='available_dates', value=available_dates)

strategy_task = PythonOperator(
    task_id='determine_strategy',
    python_callable=determine_processing_strategy,
    dag=dag
)

Plugin Integration

The Hive macros are automatically available through the HivePlugin, which registers them with Airflow's macro system.

Plugin Configuration

class HivePlugin:
    name: str = 'hive'
    macros: list = [max_partition, closest_ds_partition]

Automatic Availability:

When the Hive provider is installed, these macros become automatically available in all Jinja2 templates without explicit import:

# These work automatically in any templated field
operator = HiveOperator(
    task_id='auto_macro_usage',
    hql='''
        SELECT * FROM table 
        WHERE ds = '{{ max_partition("warehouse.data") }}'
    ''',
    dag=dag
)

Manual Registration

For custom DAGs or specific use cases, manually register macros:

from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition

dag = DAG(
    'custom_macro_dag',
    default_args=default_args,
    user_defined_macros={
        'hive_max_partition': max_partition,
        'hive_closest_partition': closest_ds_partition,
        'custom_partition_logic': lambda table, **kwargs: max_partition(table, **kwargs)
    }
)

Advanced Macro Patterns

Dynamic Table Selection

Use macros to select tables dynamically based on data availability:

# Select the most recent table from a series
dynamic_table_processor = HiveOperator(
    task_id='process_dynamic_table',
    hql='''
        {% set tables = ['warehouse.hourly_data', 'warehouse.daily_data', 'warehouse.weekly_data'] %}
        {% set selected_table = None %}
        {% for table in tables %}
            {% set latest = max_partition(table) %}
            {% if latest and latest >= ds %}
                {% set selected_table = table %}
                {% break %}
            {% endif %}
        {% endfor %}
        
        {% if selected_table %}
            INSERT OVERWRITE TABLE warehouse.processed_output
            PARTITION (ds='{{ ds }}')
            SELECT * FROM {{ selected_table }}
            WHERE ds = '{{ max_partition(selected_table) }}';
        {% else %}
            -- Handle case when no suitable table found
            INSERT OVERWRITE TABLE warehouse.processed_output
            PARTITION (ds='{{ ds }}')
            SELECT 'No data available' as message;
        {% endif %}
    ''',
    dag=dag
)

Cross-Schema Partition Coordination

Coordinate partition processing across multiple schemas:

cross_schema_processor = HiveOperator(
    task_id='cross_schema_processing',
    hql='''
        -- Find latest partitions across schemas
        {% set sales_max = max_partition('sales.transactions') %}
        {% set inventory_max = max_partition('inventory.stock_levels') %}
        {% set marketing_max = max_partition('marketing.campaigns') %}
        
        -- Use the earliest of the latest partitions to ensure data consistency
        {% set processing_date = [sales_max, inventory_max, marketing_max] | min %}
        
        INSERT OVERWRITE TABLE warehouse.unified_reporting
        PARTITION (ds='{{ ds }}')
        SELECT 
            s.transaction_id,
            s.product_id,
            s.amount,
            i.stock_level,
            m.campaign_id
        FROM sales.transactions s
        LEFT JOIN inventory.stock_levels i ON s.product_id = i.product_id
        LEFT JOIN marketing.campaigns m ON s.customer_id = m.customer_id
        WHERE s.ds = '{{ processing_date }}'
          AND i.ds = '{{ processing_date }}'
          AND m.ds = '{{ processing_date }}';
    ''',
    dag=dag
)

Data Quality Validation

Use partition macros for data quality checks:

def validate_data_consistency(**context):
    from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition
    
    target_date = context['ds']
    
    # Check if all required sources have data for the target date
    required_sources = [
        'warehouse.source_a',
        'warehouse.source_b', 
        'warehouse.source_c'
    ]
    
    validation_results = {}
    
    for source in required_sources:
        latest_partition = max_partition(source, metastore_conn_id='metastore_prod')
        closest_partition = closest_ds_partition(source, target_date, before=True)
        
        validation_results[source] = {
            'latest_available': latest_partition,
            'closest_to_target': closest_partition,
            'has_target_date': latest_partition == target_date,
            'days_behind': (
                datetime.strptime(target_date, '%Y-%m-%d') - 
                datetime.strptime(closest_partition, '%Y-%m-%d')
            ).days if closest_partition else None
        }
    
    # Determine if processing should proceed
    all_current = all(result['has_target_date'] for result in validation_results.values())
    max_lag = max(
        result['days_behind'] for result in validation_results.values() 
        if result['days_behind'] is not None
    ) if validation_results else 0
    
    if not all_current and max_lag > 3:
        raise ValueError(f"Data quality check failed: max lag is {max_lag} days")
    
    context['task_instance'].xcom_push(key='validation_results', value=validation_results)
    return validation_results

validation_task = PythonOperator(
    task_id='validate_data_consistency',
    python_callable=validate_data_consistency,
    dag=dag
)

Utility Functions

Internal Helper Functions

def _closest_date(
    target_dt: datetime.date, 
    date_list: list[datetime.date], 
    before_target: bool | None = None
) -> datetime.date | None: ...

This internal utility function supports the closest_ds_partition macro by finding the closest date in a list to a target date. While not intended for direct use, it demonstrates the underlying logic for date proximity calculations.

Best Practices

Error Handling

Always handle cases where partitions might not exist:

safe_processor = HiveOperator(
    task_id='safe_partition_processing',
    hql='''
        {% set latest = max_partition('warehouse.data') %}
        {% if latest %}
            SELECT * FROM warehouse.data WHERE ds = '{{ latest }}';
        {% else %}
            SELECT 'No partitions found' as error_message;
        {% endif %}
    ''',
    dag=dag
)

Performance Considerations

Cache partition lookups for repeated use:

def get_partition_info(**context):
    from airflow.providers.apache.hive.macros.hive import max_partition
    
    # Cache partition information in XCom for reuse
    tables = ['table_a', 'table_b', 'table_c']
    partition_info = {
        table: max_partition(table, metastore_conn_id='metastore_prod')
        for table in tables
    }
    
    context['task_instance'].xcom_push(key='partition_info', value=partition_info)
    return partition_info

cache_task = PythonOperator(task_id='cache_partitions', python_callable=get_partition_info, dag=dag)

# Use cached information in subsequent tasks
process_task = HiveOperator(
    task_id='process_with_cache',
    hql='''
        SELECT * FROM table_a 
        WHERE ds = '{{ task_instance.xcom_pull(task_ids="cache_partitions", key="partition_info")["table_a"] }}'
    ''',
    dag=dag
)

cache_task >> process_task

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-hive@9.1.1

docs

data-transfers.md

hooks-connections.md

index.md

macros-utilities.md

partition-monitoring.md

query-execution.md

tile.json