CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-hive

Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.

Overview
Eval results
Files

partition-monitoring.mddocs/

Partition Monitoring

Monitor Hive table partitions with flexible sensors for waiting on partition availability. The provider offers three complementary approaches: general partition filters, named partitions, and direct metastore queries for efficient partition detection in various scenarios.

Capabilities

Hive Partition Sensor

General-purpose sensor that waits for partitions using flexible filter expressions with SQL-like syntax.

class HivePartitionSensor:
    template_fields: tuple[str, ...] = ('schema', 'table', 'partition')
    
    def __init__(
        self,
        *,
        table: str,
        partition: str | None = "ds='{{ ds }}'",
        metastore_conn_id: str = 'metastore_default',
        schema: str = 'default',
        poke_interval: int = 180,
        **kwargs
    ): ...
    
    def poke(self, context: 'Context') -> bool: ...

Usage Examples:

from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor

# Wait for daily partition
daily_partition_sensor = HivePartitionSensor(
    task_id='wait_for_daily_data',
    table='warehouse.sales',
    partition="ds='{{ ds }}'",
    metastore_conn_id='metastore_prod',
    poke_interval=300,  # Check every 5 minutes
    timeout=3600,       # Timeout after 1 hour
    dag=dag
)

# Wait for complex partition condition
complex_partition_sensor = HivePartitionSensor(
    task_id='wait_for_regional_data',
    table='analytics.customer_metrics',
    partition="ds='{{ ds }}' AND region='us' AND status='processed'",
    schema='analytics',
    poke_interval=600,
    dag=dag
)

# Wait for partition with date range
range_partition_sensor = HivePartitionSensor(
    task_id='wait_for_data_range',
    table='logs.events',
    partition="ds >= '{{ ds }}' AND ds <= '{{ macros.ds_add(ds, 7) }}'",
    dag=dag
)

# Table with dot notation (schema.table)
dot_notation_sensor = HivePartitionSensor(
    task_id='wait_for_cross_schema',
    table='warehouse.daily_summary',  # Schema extracted from table name
    partition="ds='{{ ds }}' AND type='final'",
    dag=dag
)

Named Hive Partition Sensor

Efficient sensor for waiting on specific partitions using fully qualified partition names, optimized for multiple partition monitoring.

class NamedHivePartitionSensor:
    template_fields: tuple[str, ...] = ('partition_names',)
    
    def __init__(
        self,
        *,
        partition_names: list[str],
        metastore_conn_id: str = 'metastore_default',
        poke_interval: int = 180,
        hook: Any = None,
        **kwargs
    ): ...
    
    @staticmethod
    def parse_partition_name(partition: str) -> tuple[str, str, str]: ...
    def poke_partition(self, partition: str) -> bool: ...
    def poke(self, context: 'Context') -> bool: ...

Usage Examples:

from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor

# Wait for single named partition
single_partition_sensor = NamedHivePartitionSensor(
    task_id='wait_for_specific_partition',
    partition_names=['warehouse.sales/ds=2024-01-01/region=us'],
    metastore_conn_id='metastore_prod',
    dag=dag
)

# Wait for multiple partitions (all must exist)
multi_partition_sensor = NamedHivePartitionSensor(
    task_id='wait_for_all_regions',
    partition_names=[
        'analytics.revenue/ds={{ ds }}/region=us',
        'analytics.revenue/ds={{ ds }}/region=eu',
        'analytics.revenue/ds={{ ds }}/region=asia'
    ],
    poke_interval=300,
    dag=dag
)

# Wait for hierarchical partitions
hierarchical_sensor = NamedHivePartitionSensor(
    task_id='wait_for_nested_partitions',
    partition_names=[
        'warehouse.transactions/year=2024/month=01/day=01/hour=00',
        'warehouse.transactions/year=2024/month=01/day=01/hour=01',
        'warehouse.transactions/year=2024/month=01/day=01/hour=02'
    ],
    dag=dag
)

# Using default schema (omitted from partition name)
default_schema_sensor = NamedHivePartitionSensor(
    task_id='wait_default_schema',
    partition_names=['events/ds={{ ds }}/type=user_action'],  # Uses 'default' schema
    dag=dag
)

Metastore Partition Sensor

High-performance sensor that queries the MySQL metastore database directly, bypassing Thrift service overhead for improved efficiency.

class MetastorePartitionSensor:
    template_fields: tuple[str, ...] = ('partition_name', 'table', 'schema')
    
    def __init__(
        self,
        *,
        table: str,
        partition_name: str,
        schema: str = 'default',
        mysql_conn_id: str = 'metastore_mysql',
        **kwargs
    ): ...
    
    def poke(self, context: 'Context') -> bool: ...

Usage Examples:

from airflow.providers.apache.hive.sensors.metastore_partition import MetastorePartitionSensor

# Direct metastore query for single partition
direct_sensor = MetastorePartitionSensor(
    task_id='direct_partition_check',
    table='sales_summary',
    partition_name='ds=2024-01-01',
    schema='warehouse',
    mysql_conn_id='metastore_mysql_prod',
    dag=dag
)

# Complex partition with multiple keys
complex_direct_sensor = MetastorePartitionSensor(
    task_id='complex_direct_check',
    table='user_events',
    partition_name='ds=2024-01-01/region=us/event_type=click',
    schema='analytics',
    poke_interval=120,
    dag=dag
)

# Sub-partitioned table monitoring
subpartition_sensor = MetastorePartitionSensor(
    task_id='subpartition_check',
    table='detailed_logs',
    partition_name='ds=2024-01-01/sub=processed',
    mysql_conn_id='metastore_readonly',
    dag=dag
)

Sensor Configuration

Polling Behavior

All sensors support configurable polling intervals and timeout settings:

sensor = HivePartitionSensor(
    task_id='configurable_sensor',
    table='warehouse.data',
    partition="ds='{{ ds }}'",
    poke_interval=300,      # Poll every 5 minutes
    timeout=7200,           # Timeout after 2 hours
    soft_fail=True,         # Don't fail DAG on timeout
    exponential_backoff=True,  # Increase interval on repeated failures
    dag=dag
)

Performance Optimization

Sensor Selection Guidelines

  • HivePartitionSensor: Use for complex filter expressions and logical operators
  • NamedHivePartitionSensor: Use for multiple specific partitions or when partition names are fully known
  • MetastorePartitionSensor: Use for high-frequency monitoring or when Thrift service performance is suboptimal

Connection Pooling

# Reuse hook instances for multiple sensors
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook

shared_hook = HiveMetastoreHook('metastore_prod')

sensor1 = NamedHivePartitionSensor(
    task_id='sensor1',
    partition_names=['table1/ds={{ ds }}'],
    hook=shared_hook,
    dag=dag
)

sensor2 = NamedHivePartitionSensor(
    task_id='sensor2', 
    partition_names=['table2/ds={{ ds }}'],
    hook=shared_hook,
    dag=dag
)

Advanced Partition Patterns

Dynamic Partition Names

Generate partition names dynamically using Airflow templates:

from datetime import datetime, timedelta

# Generate partition names for the past week
weekly_partitions = [
    f"warehouse.sales/ds={{{{ macros.ds_add(ds, {-i}) }}}}"
    for i in range(7)
]

weekly_sensor = NamedHivePartitionSensor(
    task_id='wait_for_weekly_data',
    partition_names=weekly_partitions,
    dag=dag
)

Conditional Partition Monitoring

Use sensors with conditional logic:

from airflow.operators.python import BranchPythonOperator

def choose_partition_sensor(**context):
    execution_date = context['ds']
    weekday = datetime.strptime(execution_date, '%Y-%m-%d').weekday()
    
    if weekday < 5:  # Weekday
        return 'weekday_partition_sensor'
    else:  # Weekend
        return 'weekend_partition_sensor'

branch_task = BranchPythonOperator(
    task_id='choose_sensor',
    python_callable=choose_partition_sensor,
    dag=dag
)

weekday_sensor = HivePartitionSensor(
    task_id='weekday_partition_sensor',
    table='daily_reports',
    partition="ds='{{ ds }}' AND report_type='business'",
    dag=dag
)

weekend_sensor = HivePartitionSensor(
    task_id='weekend_partition_sensor',
    table='daily_reports', 
    partition="ds='{{ ds }}' AND report_type='maintenance'",
    dag=dag
)

branch_task >> [weekday_sensor, weekend_sensor]

Cross-Schema Partition Dependencies

Monitor partitions across multiple schemas and tables:

# Wait for upstream data from multiple sources
upstream_sensors = [
    NamedHivePartitionSensor(
        task_id=f'wait_for_{source}',
        partition_names=[f'{schema}.{table}/ds={{{{ ds }}}}'],
        dag=dag
    )
    for source, (schema, table) in {
        'crm': ('sales', 'customer_data'),
        'inventory': ('warehouse', 'product_stock'),
        'marketing': ('campaigns', 'ad_performance')
    }.items()
]

# Process after all upstream partitions are available
process_task = HiveOperator(
    task_id='process_combined_data',
    hql='''
        INSERT OVERWRITE TABLE analytics.daily_summary
        PARTITION (ds='{{ ds }}')
        SELECT * FROM (
            SELECT 'crm' as source, * FROM sales.customer_data WHERE ds='{{ ds }}'
            UNION ALL
            SELECT 'inventory' as source, * FROM warehouse.product_stock WHERE ds='{{ ds }}'
            UNION ALL  
            SELECT 'marketing' as source, * FROM campaigns.ad_performance WHERE ds='{{ ds }}'
        ) combined;
    ''',
    dag=dag
)

upstream_sensors >> process_task

Error Handling and Monitoring

Partition Validation

Validate partition existence before processing:

def validate_partition_quality(**context):
    from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
    
    hook = HiveMetastoreHook('metastore_prod')
    partitions = hook.get_partitions('warehouse', 'sales', f"ds='{context['ds']}'")
    
    if not partitions:
        raise ValueError(f"No partitions found for {context['ds']}")
    
    # Additional quality checks
    partition_info = partitions[0]
    if partition_info.get('numRows', 0) < 1000:
        raise ValueError(f"Partition has insufficient data: {partition_info.get('numRows')} rows")

validate_task = PythonOperator(
    task_id='validate_partition',
    python_callable=validate_partition_quality,
    dag=dag
)

partition_sensor >> validate_task >> process_task

Sensor Alerts and Notifications

Configure alerts for sensor timeouts:

def sensor_failure_callback(context):
    task_instance = context['task_instance']
    send_alert(f"Partition sensor {task_instance.task_id} failed after timeout")

critical_sensor = HivePartitionSensor(
    task_id='critical_data_sensor',
    table='critical_business_data',
    partition="ds='{{ ds }}'",
    timeout=1800,  # 30 minutes
    on_failure_callback=sensor_failure_callback,
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-hive

docs

data-transfers.md

hooks-connections.md

index.md

macros-utilities.md

partition-monitoring.md

query-execution.md

tile.json