Apache Airflow provider package for Hive integration with comprehensive data warehouse connectivity and orchestration capabilities.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-hive@9.1.00
# Apache Airflow Hive Provider
1
2
The Apache Airflow Hive provider package enables seamless integration between Apache Airflow and Apache Hive, providing comprehensive data warehouse connectivity and orchestration capabilities. This provider offers a complete suite of operators, hooks, sensors, and transfer operators for executing Hive queries, monitoring partitions, transferring data between systems, and managing Hive Metastore operations within Airflow workflows.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-hive
7
- **Package Type**: Python library (Airflow provider)
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-apache-hive`
10
11
## Core Imports
12
13
```python
14
# Hook imports for connecting to Hive services
15
from airflow.providers.apache.hive.hooks.hive import HiveCliHook, HiveMetastoreHook, HiveServer2Hook
16
17
# Operator imports for executing tasks
18
from airflow.providers.apache.hive.operators.hive import HiveOperator
19
from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator
20
21
# Sensor imports for monitoring conditions
22
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
23
from airflow.providers.apache.hive.sensors.metastore_partition import MetastorePartitionSensor
24
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
25
26
# Transfer operator imports for data movement
27
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
28
from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator
29
from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator
30
# Additional transfer operators available
31
32
# Macro imports for template functions
33
from airflow.providers.apache.hive.macros.hive import max_partition, closest_ds_partition
34
```
35
36
## Basic Usage
37
38
```python
39
from datetime import datetime, timedelta
40
from airflow import DAG
41
from airflow.providers.apache.hive.operators.hive import HiveOperator
42
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
43
44
# Define DAG
45
dag = DAG(
46
'hive_example',
47
default_args={
48
'owner': 'data-team',
49
'depends_on_past': False,
50
'start_date': datetime(2024, 1, 1),
51
'retries': 1,
52
'retry_delay': timedelta(minutes=5),
53
},
54
description='Example Hive data processing pipeline',
55
schedule_interval=timedelta(days=1),
56
catchup=False,
57
)
58
59
# Wait for partition to be available
60
wait_for_partition = HivePartitionSensor(
61
task_id='wait_for_partition',
62
table='warehouse.daily_sales',
63
partition="ds='{{ ds }}'",
64
metastore_conn_id='hive_metastore_default',
65
poke_interval=300,
66
timeout=3600,
67
dag=dag,
68
)
69
70
# Execute Hive query
71
process_data = HiveOperator(
72
task_id='process_daily_sales',
73
hql='''
74
INSERT OVERWRITE TABLE warehouse.sales_summary
75
PARTITION (ds='{{ ds }}')
76
SELECT
77
region,
78
product_category,
79
SUM(amount) as total_sales,
80
COUNT(*) as transaction_count
81
FROM warehouse.daily_sales
82
WHERE ds='{{ ds }}'
83
GROUP BY region, product_category;
84
''',
85
hive_cli_conn_id='hive_cli_default',
86
schema='warehouse',
87
dag=dag,
88
)
89
90
# Set task dependencies
91
wait_for_partition >> process_data
92
```
93
94
## Architecture
95
96
The provider is organized around three main connection types and corresponding hooks:
97
98
- **HiveCLI**: Command-line interface for executing HQL scripts and commands
99
- **HiveServer2**: Thrift-based service for JDBC/ODBC connections with query execution
100
- **Hive Metastore**: Thrift service for metadata operations and partition management
101
102
Key components include:
103
104
- **Hooks**: Low-level interfaces for connecting to Hive services
105
- **Operators**: Task-based wrappers for common Hive operations
106
- **Sensors**: Monitoring components for waiting on conditions
107
- **Transfers**: Data movement operators between Hive and other systems
108
- **Macros**: Template functions for partition and metadata operations
109
110
## Capabilities
111
112
### Hive Connections and Hooks
113
114
Core connectivity to Hive services through CLI, HiveServer2, and Metastore interfaces. Provides connection management, query execution, and metadata operations with support for authentication, connection pooling, and configuration management.
115
116
```python { .api }
117
class HiveCliHook:
118
def __init__(self, hive_cli_conn_id: str = 'hive_cli_default', **kwargs): ...
119
def run_cli(self, hql: str, schema: str = 'default') -> None: ...
120
def test_hql(self, hql: str) -> None: ...
121
def load_file(self, filepath: str, table: str, **kwargs) -> None: ...
122
123
class HiveMetastoreHook:
124
def __init__(self, metastore_conn_id: str = 'metastore_default'): ...
125
def check_for_partition(self, schema: str, table: str, partition: str) -> bool: ...
126
def get_table(self, schema: str, table_name: str) -> Any: ...
127
def get_partitions(self, schema: str, table_name: str, **kwargs) -> list: ...
128
129
class HiveServer2Hook:
130
def __init__(self, hiveserver2_conn_id: str = 'hiveserver2_default', **kwargs): ...
131
def get_conn(self, schema: str = None) -> Any: ...
132
def get_pandas_df(self, sql: str, parameters: list = None, **kwargs) -> 'pd.DataFrame': ...
133
```
134
135
[Hooks and Connections](./hooks-connections.md)
136
137
### Hive Query Execution
138
139
Execute HQL scripts and queries with support for templating, parameter substitution, mapreduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting table statistics.
140
141
```python { .api }
142
class HiveOperator:
143
def __init__(self, *, hql: str, hive_cli_conn_id: str = 'hive_cli_default', **kwargs): ...
144
def execute(self, context: 'Context') -> None: ...
145
146
class HiveStatsCollectionOperator:
147
def __init__(self, *, table: str, partition: Any, **kwargs): ...
148
def execute(self, context: 'Context') -> None: ...
149
```
150
151
[Query Execution](./query-execution.md)
152
153
### Partition Monitoring
154
155
Monitor Hive table partitions with flexible sensors for waiting on partition availability. Supports general partition filters, named partitions, and direct metastore queries for efficient partition detection.
156
157
```python { .api }
158
class HivePartitionSensor:
159
def __init__(self, *, table: str, partition: str = "ds='{{ ds }}'", **kwargs): ...
160
def poke(self, context: 'Context') -> bool: ...
161
162
class NamedHivePartitionSensor:
163
def __init__(self, *, partition_names: list[str], **kwargs): ...
164
def poke(self, context: 'Context') -> bool: ...
165
166
class MetastorePartitionSensor:
167
def __init__(self, *, table: str, partition_name: str, **kwargs): ...
168
def poke(self, context: 'Context') -> bool: ...
169
```
170
171
[Partition Monitoring](./partition-monitoring.md)
172
173
### Data Transfer Operations
174
175
Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. Provides bidirectional data movement with transformation and format conversion capabilities.
176
177
```python { .api }
178
class MySqlToHiveOperator:
179
def __init__(self, *, sql: str, table: str, **kwargs): ...
180
181
class S3ToHiveOperator:
182
def __init__(self, *, s3_source_key: str, table: str, **kwargs): ...
183
184
class HiveToMySqlOperator:
185
def __init__(self, *, sql: str, mysql_table: str, **kwargs): ...
186
187
# Additional transfer operators: MsSqlToHiveOperator, VerticaToHiveOperator, HiveToSambaOperator
188
```
189
190
[Data Transfers](./data-transfers.md)
191
192
### Template Macros and Utilities
193
194
Template functions for partition discovery, date-based partition selection, and metadata queries. Includes utilities for finding maximum partitions and closest date partitions for dynamic task execution.
195
196
```python { .api }
197
def max_partition(table: str, schema: str = 'default', field: str = None, **kwargs) -> str: ...
198
def closest_ds_partition(table: str, ds: str, before: bool = True, **kwargs) -> str | None: ...
199
```
200
201
[Macros and Utilities](./macros-utilities.md)