Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
Monitor Hive table partitions with flexible sensors for waiting on partition availability. The provider offers three complementary approaches: general partition filters, named partitions, and direct metastore queries for efficient partition detection in various scenarios.
General-purpose sensor that waits for partitions using flexible filter expressions with SQL-like syntax.
class HivePartitionSensor:
template_fields: tuple[str, ...] = ('schema', 'table', 'partition')
def __init__(
self,
*,
table: str,
partition: str | None = "ds='{{ ds }}'",
metastore_conn_id: str = 'metastore_default',
schema: str = 'default',
poke_interval: int = 180,
**kwargs
): ...
def poke(self, context: 'Context') -> bool: ...Usage Examples:
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
# Wait for daily partition
daily_partition_sensor = HivePartitionSensor(
task_id='wait_for_daily_data',
table='warehouse.sales',
partition="ds='{{ ds }}'",
metastore_conn_id='metastore_prod',
poke_interval=300, # Check every 5 minutes
timeout=3600, # Timeout after 1 hour
dag=dag
)
# Wait for complex partition condition
complex_partition_sensor = HivePartitionSensor(
task_id='wait_for_regional_data',
table='analytics.customer_metrics',
partition="ds='{{ ds }}' AND region='us' AND status='processed'",
schema='analytics',
poke_interval=600,
dag=dag
)
# Wait for partition with date range
range_partition_sensor = HivePartitionSensor(
task_id='wait_for_data_range',
table='logs.events',
partition="ds >= '{{ ds }}' AND ds <= '{{ macros.ds_add(ds, 7) }}'",
dag=dag
)
# Table with dot notation (schema.table)
dot_notation_sensor = HivePartitionSensor(
task_id='wait_for_cross_schema',
table='warehouse.daily_summary', # Schema extracted from table name
partition="ds='{{ ds }}' AND type='final'",
dag=dag
)Efficient sensor for waiting on specific partitions using fully qualified partition names, optimized for multiple partition monitoring.
class NamedHivePartitionSensor:
template_fields: tuple[str, ...] = ('partition_names',)
def __init__(
self,
*,
partition_names: list[str],
metastore_conn_id: str = 'metastore_default',
poke_interval: int = 180,
hook: Any = None,
**kwargs
): ...
@staticmethod
def parse_partition_name(partition: str) -> tuple[str, str, str]: ...
def poke_partition(self, partition: str) -> bool: ...
def poke(self, context: 'Context') -> bool: ...Usage Examples:
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
# Wait for single named partition
single_partition_sensor = NamedHivePartitionSensor(
task_id='wait_for_specific_partition',
partition_names=['warehouse.sales/ds=2024-01-01/region=us'],
metastore_conn_id='metastore_prod',
dag=dag
)
# Wait for multiple partitions (all must exist)
multi_partition_sensor = NamedHivePartitionSensor(
task_id='wait_for_all_regions',
partition_names=[
'analytics.revenue/ds={{ ds }}/region=us',
'analytics.revenue/ds={{ ds }}/region=eu',
'analytics.revenue/ds={{ ds }}/region=asia'
],
poke_interval=300,
dag=dag
)
# Wait for hierarchical partitions
hierarchical_sensor = NamedHivePartitionSensor(
task_id='wait_for_nested_partitions',
partition_names=[
'warehouse.transactions/year=2024/month=01/day=01/hour=00',
'warehouse.transactions/year=2024/month=01/day=01/hour=01',
'warehouse.transactions/year=2024/month=01/day=01/hour=02'
],
dag=dag
)
# Using default schema (omitted from partition name)
default_schema_sensor = NamedHivePartitionSensor(
task_id='wait_default_schema',
partition_names=['events/ds={{ ds }}/type=user_action'], # Uses 'default' schema
dag=dag
)High-performance sensor that queries the MySQL metastore database directly, bypassing Thrift service overhead for improved efficiency.
class MetastorePartitionSensor:
template_fields: tuple[str, ...] = ('partition_name', 'table', 'schema')
def __init__(
self,
*,
table: str,
partition_name: str,
schema: str = 'default',
mysql_conn_id: str = 'metastore_mysql',
**kwargs
): ...
def poke(self, context: 'Context') -> bool: ...Usage Examples:
from airflow.providers.apache.hive.sensors.metastore_partition import MetastorePartitionSensor
# Direct metastore query for single partition
direct_sensor = MetastorePartitionSensor(
task_id='direct_partition_check',
table='sales_summary',
partition_name='ds=2024-01-01',
schema='warehouse',
mysql_conn_id='metastore_mysql_prod',
dag=dag
)
# Complex partition with multiple keys
complex_direct_sensor = MetastorePartitionSensor(
task_id='complex_direct_check',
table='user_events',
partition_name='ds=2024-01-01/region=us/event_type=click',
schema='analytics',
poke_interval=120,
dag=dag
)
# Sub-partitioned table monitoring
subpartition_sensor = MetastorePartitionSensor(
task_id='subpartition_check',
table='detailed_logs',
partition_name='ds=2024-01-01/sub=processed',
mysql_conn_id='metastore_readonly',
dag=dag
)All sensors support configurable polling intervals and timeout settings:
sensor = HivePartitionSensor(
task_id='configurable_sensor',
table='warehouse.data',
partition="ds='{{ ds }}'",
poke_interval=300, # Poll every 5 minutes
timeout=7200, # Timeout after 2 hours
soft_fail=True, # Don't fail DAG on timeout
exponential_backoff=True, # Increase interval on repeated failures
dag=dag
)# Reuse hook instances for multiple sensors
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
shared_hook = HiveMetastoreHook('metastore_prod')
sensor1 = NamedHivePartitionSensor(
task_id='sensor1',
partition_names=['table1/ds={{ ds }}'],
hook=shared_hook,
dag=dag
)
sensor2 = NamedHivePartitionSensor(
task_id='sensor2',
partition_names=['table2/ds={{ ds }}'],
hook=shared_hook,
dag=dag
)Generate partition names dynamically using Airflow templates:
from datetime import datetime, timedelta
# Generate partition names for the past week
weekly_partitions = [
f"warehouse.sales/ds={{{{ macros.ds_add(ds, {-i}) }}}}"
for i in range(7)
]
weekly_sensor = NamedHivePartitionSensor(
task_id='wait_for_weekly_data',
partition_names=weekly_partitions,
dag=dag
)Use sensors with conditional logic:
from airflow.operators.python import BranchPythonOperator
def choose_partition_sensor(**context):
execution_date = context['ds']
weekday = datetime.strptime(execution_date, '%Y-%m-%d').weekday()
if weekday < 5: # Weekday
return 'weekday_partition_sensor'
else: # Weekend
return 'weekend_partition_sensor'
branch_task = BranchPythonOperator(
task_id='choose_sensor',
python_callable=choose_partition_sensor,
dag=dag
)
weekday_sensor = HivePartitionSensor(
task_id='weekday_partition_sensor',
table='daily_reports',
partition="ds='{{ ds }}' AND report_type='business'",
dag=dag
)
weekend_sensor = HivePartitionSensor(
task_id='weekend_partition_sensor',
table='daily_reports',
partition="ds='{{ ds }}' AND report_type='maintenance'",
dag=dag
)
branch_task >> [weekday_sensor, weekend_sensor]Monitor partitions across multiple schemas and tables:
# Wait for upstream data from multiple sources
upstream_sensors = [
NamedHivePartitionSensor(
task_id=f'wait_for_{source}',
partition_names=[f'{schema}.{table}/ds={{{{ ds }}}}'],
dag=dag
)
for source, (schema, table) in {
'crm': ('sales', 'customer_data'),
'inventory': ('warehouse', 'product_stock'),
'marketing': ('campaigns', 'ad_performance')
}.items()
]
# Process after all upstream partitions are available
process_task = HiveOperator(
task_id='process_combined_data',
hql='''
INSERT OVERWRITE TABLE analytics.daily_summary
PARTITION (ds='{{ ds }}')
SELECT * FROM (
SELECT 'crm' as source, * FROM sales.customer_data WHERE ds='{{ ds }}'
UNION ALL
SELECT 'inventory' as source, * FROM warehouse.product_stock WHERE ds='{{ ds }}'
UNION ALL
SELECT 'marketing' as source, * FROM campaigns.ad_performance WHERE ds='{{ ds }}'
) combined;
''',
dag=dag
)
upstream_sensors >> process_taskValidate partition existence before processing:
def validate_partition_quality(**context):
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook
hook = HiveMetastoreHook('metastore_prod')
partitions = hook.get_partitions('warehouse', 'sales', f"ds='{context['ds']}'")
if not partitions:
raise ValueError(f"No partitions found for {context['ds']}")
# Additional quality checks
partition_info = partitions[0]
if partition_info.get('numRows', 0) < 1000:
raise ValueError(f"Partition has insufficient data: {partition_info.get('numRows')} rows")
validate_task = PythonOperator(
task_id='validate_partition',
python_callable=validate_partition_quality,
dag=dag
)
partition_sensor >> validate_task >> process_taskConfigure alerts for sensor timeouts:
def sensor_failure_callback(context):
task_instance = context['task_instance']
send_alert(f"Partition sensor {task_instance.task_id} failed after timeout")
critical_sensor = HivePartitionSensor(
task_id='critical_data_sensor',
table='critical_business_data',
partition="ds='{{ ds }}'",
timeout=1800, # 30 minutes
on_failure_callback=sensor_failure_callback,
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hive@9.1.1