Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
Template functions for partition discovery, date-based partition selection, and metadata queries. These utilities enable dynamic task execution with intelligent partition handling and date-based logic for complex data processing workflows.
Template functions for finding partitions dynamically based on various criteria and filters.
def max_partition(
table: str,
schema: str = 'default',
field: str | None = None,
filter_map: dict | None = None,
metastore_conn_id: str = 'metastore_default'
) -> str: ...
def closest_ds_partition(
table: str,
ds: str,
before: bool = True,
schema: str = 'default',
metastore_conn_id: str = 'metastore_default'
) -> str | None: ...Usage Examples:
from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition
# Use in DAG templates - these functions are automatically available in Jinja templates
dag = DAG(
'partition_macro_example',
default_args=default_args,
schedule_interval=timedelta(days=1),
user_defined_macros={
'max_partition': max_partition,
'closest_ds_partition': closest_ds_partition
}
)
# HiveOperator using max_partition macro
process_latest = HiveOperator(
task_id='process_latest_partition',
hql='''
INSERT OVERWRITE TABLE warehouse.daily_summary
PARTITION (ds='{{ ds }}')
SELECT region, SUM(amount) as total_sales
FROM warehouse.raw_sales
WHERE ds = '{{ max_partition('warehouse.raw_sales') }}'
GROUP BY region;
''',
dag=dag
)
# Using closest_ds_partition for backfill scenarios
backfill_process = HiveOperator(
task_id='backfill_missing_data',
hql='''
INSERT OVERWRITE TABLE warehouse.filled_data
PARTITION (ds='{{ ds }}')
SELECT *
FROM warehouse.source_data
WHERE ds = '{{ closest_ds_partition('warehouse.source_data', ds, True) }}';
''',
dag=dag
)Find the maximum partition value for a table, with optional filtering and field specification.
Function Details:
Parameters:
table: Table name (supports dot notation like "schema.table")schema: Schema name (default: "default", ignored if table uses dot notation)field: Specific partition field to get max value from (inferred if only one partition field)filter_map: Partition key-value filters to narrow search scopemetastore_conn_id: Metastore connection identifierUsage Examples:
# Template usage in HiveOperator
latest_data_processor = HiveOperator(
task_id='process_latest_data',
hql='''
-- Process the most recent partition available
CREATE TABLE temp_latest_data AS
SELECT *
FROM warehouse.transaction_log
WHERE ds = '{{ max_partition("warehouse.transaction_log") }}';
-- Aggregate the latest data
INSERT OVERWRITE TABLE warehouse.daily_metrics
PARTITION (ds='{{ ds }}')
SELECT
transaction_type,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM temp_latest_data
GROUP BY transaction_type;
''',
dag=dag
)
# With field specification for multi-partition tables
region_processor = HiveOperator(
task_id='process_by_region',
hql='''
-- Find latest partition for each region
INSERT OVERWRITE TABLE warehouse.regional_summary
PARTITION (ds='{{ ds }}')
SELECT
region,
MAX(processing_timestamp) as latest_processing,
COUNT(*) as record_count
FROM warehouse.regional_data
WHERE ds = '{{ max_partition("warehouse.regional_data", field="ds") }}'
GROUP BY region;
''',
dag=dag
)
# With filtering to find max within specific criteria
filtered_processor = HiveOperator(
task_id='process_filtered_max',
hql='''
-- Process latest US data only
SELECT *
FROM warehouse.global_sales
WHERE ds = '{{ max_partition("warehouse.global_sales", filter_map={"region": "us"}) }}'
AND region = 'us';
''',
dag=dag
)
# Direct Python usage in custom operators
def get_latest_partition_date(**context):
from airflow.providers.apache.hive.macros.hive import max_partition
latest_partition = max_partition(
table='warehouse.daily_events',
schema='analytics',
metastore_conn_id='metastore_prod'
)
context['task_instance'].xcom_push(key='latest_partition', value=latest_partition)
return latest_partition
get_partition_task = PythonOperator(
task_id='get_latest_partition',
python_callable=get_latest_partition_date,
dag=dag
)Find the date partition closest to a target date, with options for finding the closest before, after, or on either side.
Function Details:
Parameters:
table: Table name (supports dot notation)ds: Target date string in YYYY-MM-DD formatbefore: True for closest before target, False for closest after, None for closest either sideschema: Schema name (default: "default")metastore_conn_id: Metastore connection identifierUsage Examples:
# Backfill with closest available data
backfill_operator = HiveOperator(
task_id='backfill_missing_reports',
hql='''
-- Use closest previous date if current date partition doesn't exist
INSERT OVERWRITE TABLE warehouse.daily_reports
PARTITION (ds='{{ ds }}')
SELECT
report_type,
metrics,
'{{ ds }}' as target_date,
ds as source_date,
CASE
WHEN ds = '{{ ds }}' THEN 'exact_match'
ELSE 'backfilled'
END as data_source_type
FROM warehouse.source_reports
WHERE ds = COALESCE(
(SELECT ds FROM warehouse.source_reports WHERE ds = '{{ ds }}' LIMIT 1),
'{{ closest_ds_partition("warehouse.source_reports", ds, True) }}'
);
''',
dag=dag
)
# Forward-fill with next available data
forward_fill = HiveOperator(
task_id='forward_fill_predictions',
hql='''
-- Use next available date for forecasting
INSERT OVERWRITE TABLE warehouse.forecast_data
PARTITION (ds='{{ ds }}')
SELECT
model_id,
prediction_value,
confidence_score,
'{{ ds }}' as forecast_date
FROM warehouse.model_predictions
WHERE ds = '{{ closest_ds_partition("warehouse.model_predictions", ds, False) }}';
''',
dag=dag
)
# Flexible closest match (either direction)
flexible_match = HiveOperator(
task_id='flexible_data_match',
hql='''
-- Use closest available data from either direction
CREATE TEMPORARY TABLE closest_match AS
SELECT *
FROM warehouse.reference_data
WHERE ds = '{{ closest_ds_partition("warehouse.reference_data", ds, None) }}';
-- Join with current processing data
INSERT OVERWRITE TABLE warehouse.enriched_data
PARTITION (ds='{{ ds }}')
SELECT
p.transaction_id,
p.amount,
r.reference_rate,
p.amount * r.reference_rate as converted_amount
FROM warehouse.daily_transactions p
LEFT JOIN closest_match r ON p.currency_code = r.currency_code
WHERE p.ds = '{{ ds }}';
''',
dag=dag
)
# Conditional processing based on data availability
conditional_processor = HiveOperator(
task_id='conditional_data_processing',
hql='''
-- Different processing logic based on data freshness
{% set closest_date = closest_ds_partition("warehouse.external_data", ds, True) %}
{% set days_old = (ds | as_datetime - closest_date | as_datetime).days %}
INSERT OVERWRITE TABLE warehouse.processed_data
PARTITION (ds='{{ ds }}')
SELECT
*,
{% if days_old <= 1 %}
'current' as data_freshness
{% elif days_old <= 7 %}
'recent' as data_freshness
{% else %}
'stale' as data_freshness
{% endif %}
FROM warehouse.external_data
WHERE ds = '{{ closest_date }}';
''',
dag=dag
)
# Python usage for complex logic
def determine_processing_strategy(**context):
from airflow.providers.apache.hive.macros.hive import closest_ds_partition
target_date = context['ds']
# Check multiple data sources
sources = ['warehouse.source_a', 'warehouse.source_b', 'warehouse.source_c']
available_dates = {}
for source in sources:
closest_date = closest_ds_partition(
table=source,
ds=target_date,
before=True,
metastore_conn_id='metastore_prod'
)
available_dates[source] = closest_date
# Determine processing strategy based on data availability
if all(date == target_date for date in available_dates.values()):
strategy = 'full_processing'
elif any(date == target_date for date in available_dates.values()):
strategy = 'partial_processing'
else:
strategy = 'backfill_processing'
context['task_instance'].xcom_push(key='processing_strategy', value=strategy)
context['task_instance'].xcom_push(key='available_dates', value=available_dates)
strategy_task = PythonOperator(
task_id='determine_strategy',
python_callable=determine_processing_strategy,
dag=dag
)The Hive macros are automatically available through the HivePlugin, which registers them with Airflow's macro system.
class HivePlugin:
name: str = 'hive'
macros: list = [max_partition, closest_ds_partition]Automatic Availability:
When the Hive provider is installed, these macros become automatically available in all Jinja2 templates without explicit import:
# These work automatically in any templated field
operator = HiveOperator(
task_id='auto_macro_usage',
hql='''
SELECT * FROM table
WHERE ds = '{{ max_partition("warehouse.data") }}'
''',
dag=dag
)For custom DAGs or specific use cases, manually register macros:
from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition
dag = DAG(
'custom_macro_dag',
default_args=default_args,
user_defined_macros={
'hive_max_partition': max_partition,
'hive_closest_partition': closest_ds_partition,
'custom_partition_logic': lambda table, **kwargs: max_partition(table, **kwargs)
}
)Use macros to select tables dynamically based on data availability:
# Select the most recent table from a series
dynamic_table_processor = HiveOperator(
task_id='process_dynamic_table',
hql='''
{% set tables = ['warehouse.hourly_data', 'warehouse.daily_data', 'warehouse.weekly_data'] %}
{% set selected_table = None %}
{% for table in tables %}
{% set latest = max_partition(table) %}
{% if latest and latest >= ds %}
{% set selected_table = table %}
{% break %}
{% endif %}
{% endfor %}
{% if selected_table %}
INSERT OVERWRITE TABLE warehouse.processed_output
PARTITION (ds='{{ ds }}')
SELECT * FROM {{ selected_table }}
WHERE ds = '{{ max_partition(selected_table) }}';
{% else %}
-- Handle case when no suitable table found
INSERT OVERWRITE TABLE warehouse.processed_output
PARTITION (ds='{{ ds }}')
SELECT 'No data available' as message;
{% endif %}
''',
dag=dag
)Coordinate partition processing across multiple schemas:
cross_schema_processor = HiveOperator(
task_id='cross_schema_processing',
hql='''
-- Find latest partitions across schemas
{% set sales_max = max_partition('sales.transactions') %}
{% set inventory_max = max_partition('inventory.stock_levels') %}
{% set marketing_max = max_partition('marketing.campaigns') %}
-- Use the earliest of the latest partitions to ensure data consistency
{% set processing_date = [sales_max, inventory_max, marketing_max] | min %}
INSERT OVERWRITE TABLE warehouse.unified_reporting
PARTITION (ds='{{ ds }}')
SELECT
s.transaction_id,
s.product_id,
s.amount,
i.stock_level,
m.campaign_id
FROM sales.transactions s
LEFT JOIN inventory.stock_levels i ON s.product_id = i.product_id
LEFT JOIN marketing.campaigns m ON s.customer_id = m.customer_id
WHERE s.ds = '{{ processing_date }}'
AND i.ds = '{{ processing_date }}'
AND m.ds = '{{ processing_date }}';
''',
dag=dag
)Use partition macros for data quality checks:
def validate_data_consistency(**context):
from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition
target_date = context['ds']
# Check if all required sources have data for the target date
required_sources = [
'warehouse.source_a',
'warehouse.source_b',
'warehouse.source_c'
]
validation_results = {}
for source in required_sources:
latest_partition = max_partition(source, metastore_conn_id='metastore_prod')
closest_partition = closest_ds_partition(source, target_date, before=True)
validation_results[source] = {
'latest_available': latest_partition,
'closest_to_target': closest_partition,
'has_target_date': latest_partition == target_date,
'days_behind': (
datetime.strptime(target_date, '%Y-%m-%d') -
datetime.strptime(closest_partition, '%Y-%m-%d')
).days if closest_partition else None
}
# Determine if processing should proceed
all_current = all(result['has_target_date'] for result in validation_results.values())
max_lag = max(
result['days_behind'] for result in validation_results.values()
if result['days_behind'] is not None
) if validation_results else 0
if not all_current and max_lag > 3:
raise ValueError(f"Data quality check failed: max lag is {max_lag} days")
context['task_instance'].xcom_push(key='validation_results', value=validation_results)
return validation_results
validation_task = PythonOperator(
task_id='validate_data_consistency',
python_callable=validate_data_consistency,
dag=dag
)def _closest_date(
target_dt: datetime.date,
date_list: list[datetime.date],
before_target: bool | None = None
) -> datetime.date | None: ...This internal utility function supports the closest_ds_partition macro by finding the closest date in a list to a target date. While not intended for direct use, it demonstrates the underlying logic for date proximity calculations.
Always handle cases where partitions might not exist:
safe_processor = HiveOperator(
task_id='safe_partition_processing',
hql='''
{% set latest = max_partition('warehouse.data') %}
{% if latest %}
SELECT * FROM warehouse.data WHERE ds = '{{ latest }}';
{% else %}
SELECT 'No partitions found' as error_message;
{% endif %}
''',
dag=dag
)Cache partition lookups for repeated use:
def get_partition_info(**context):
from airflow.providers.apache.hive.macros.hive import max_partition
# Cache partition information in XCom for reuse
tables = ['table_a', 'table_b', 'table_c']
partition_info = {
table: max_partition(table, metastore_conn_id='metastore_prod')
for table in tables
}
context['task_instance'].xcom_push(key='partition_info', value=partition_info)
return partition_info
cache_task = PythonOperator(task_id='cache_partitions', python_callable=get_partition_info, dag=dag)
# Use cached information in subsequent tasks
process_task = HiveOperator(
task_id='process_with_cache',
hql='''
SELECT * FROM table_a
WHERE ds = '{{ task_instance.xcom_pull(task_ids="cache_partitions", key="partition_info")["table_a"] }}'
''',
dag=dag
)
cache_task >> process_taskInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hive