0
# Query Execution
1
2
Execute HQL scripts and queries with comprehensive support for templating, parameter substitution, MapReduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting detailed table statistics.
3
4
## Capabilities
5
6
### Hive Query Operator
7
8
Primary operator for executing HQL code or Hive scripts within Airflow workflows with full templating and configuration support.
9
10
```python { .api }
11
class HiveOperator:
12
template_fields: tuple[str, ...] = (
13
'hql', 'schema', 'hive_cli_conn_id', 'mapred_queue',
14
'hiveconfs', 'mapred_job_name', 'mapred_queue_priority', 'proxy_user'
15
)
16
template_ext: tuple[str, ...] = ('.hql', '.sql')
17
18
def __init__(
19
self,
20
*,
21
hql: str,
22
hive_cli_conn_id: str = 'hive_cli_default',
23
schema: str = 'default',
24
hiveconfs: dict | None = None,
25
hiveconf_jinja_translate: bool = False,
26
script_begin_tag: str | None = None,
27
mapred_queue: str | None = None,
28
mapred_queue_priority: str | None = None,
29
mapred_job_name: str | None = None,
30
hive_cli_params: str = '',
31
auth: str | None = None,
32
proxy_user: str | None = None,
33
**kwargs
34
): ...
35
36
def execute(self, context: 'Context') -> None: ...
37
def on_kill(self) -> None: ...
38
```
39
40
**Usage Examples:**
41
42
```python
43
from airflow.providers.apache.hive.operators.hive import HiveOperator
44
from datetime import datetime, timedelta
45
46
# Basic HQL execution
47
simple_query = HiveOperator(
48
task_id='simple_hive_query',
49
hql='''
50
SELECT COUNT(*) as record_count
51
FROM warehouse.sales
52
WHERE ds = '{{ ds }}';
53
''',
54
hive_cli_conn_id='hive_production',
55
schema='warehouse',
56
dag=dag
57
)
58
59
# Complex query with configuration
60
etl_process = HiveOperator(
61
task_id='daily_etl',
62
hql='''
63
SET hive.exec.dynamic.partition=true;
64
SET hive.exec.dynamic.partition.mode=nonstrict;
65
66
INSERT OVERWRITE TABLE warehouse.sales_summary
67
PARTITION (ds='{{ ds }}', region)
68
SELECT
69
product_id,
70
SUM(amount) as total_sales,
71
COUNT(*) as transaction_count,
72
AVG(amount) as avg_sale,
73
region
74
FROM warehouse.daily_sales
75
WHERE ds = '{{ ds }}'
76
GROUP BY product_id, region;
77
''',
78
hiveconfs={
79
'hive.exec.compress.output': 'true',
80
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'
81
},
82
mapred_queue='analytics',
83
mapred_queue_priority='HIGH',
84
mapred_job_name='sales_etl_{{ ds }}',
85
dag=dag
86
)
87
88
# Execute script from file
89
script_execution = HiveOperator(
90
task_id='run_hive_script',
91
hql='process_customer_data.hql', # File path relative to DAG
92
hiveconf_jinja_translate=True,
93
script_begin_tag='-- BEGIN PROCESSING',
94
proxy_user='etl_service_account',
95
dag=dag
96
)
97
98
# Query with custom parameters
99
parameterized_query = HiveOperator(
100
task_id='parameterized_analysis',
101
hql='''
102
SELECT {{ params.metric_column }}
103
FROM {{ params.source_table }}
104
WHERE ds BETWEEN '{{ ds }}' AND '{{ macros.ds_add(ds, 7) }}'
105
AND region IN ({{ params.regions | join("','") | "'" + _0 + "'" }})
106
GROUP BY {{ params.group_by_columns | join(', ') }};
107
''',
108
params={
109
'metric_column': 'SUM(revenue) as total_revenue',
110
'source_table': 'warehouse.sales',
111
'regions': ['us', 'eu', 'asia'],
112
'group_by_columns': ['product_category', 'customer_segment']
113
},
114
dag=dag
115
)
116
```
117
118
### Hive Statistics Collection Operator
119
120
Specialized operator for gathering partition statistics using Presto queries and storing results in MySQL for monitoring and optimization.
121
122
```python { .api }
123
class HiveStatsCollectionOperator:
124
template_fields: tuple[str, ...] = ('table', 'partition', 'ds', 'dttm')
125
126
def __init__(
127
self,
128
*,
129
table: str,
130
partition: Any,
131
extra_exprs: dict[str, Any] | None = None,
132
excluded_columns: list[str] | None = None,
133
assignment_func: callable | None = None,
134
metastore_conn_id: str = 'metastore_default',
135
presto_conn_id: str = 'presto_default',
136
mysql_conn_id: str = 'airflow_db',
137
ds: str = '{{ ds }}',
138
dttm: str = '{{ logical_date.isoformat() }}',
139
**kwargs
140
): ...
141
142
def execute(self, context: 'Context') -> None: ...
143
def get_default_exprs(self, col: str, col_type: str) -> dict: ...
144
```
145
146
**Usage Example:**
147
148
```python
149
from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator
150
151
# Basic statistics collection
152
collect_stats = HiveStatsCollectionOperator(
153
task_id='collect_table_stats',
154
table='warehouse.customer_transactions',
155
partition={'ds': '{{ ds }}', 'region': 'us'},
156
metastore_conn_id='metastore_prod',
157
presto_conn_id='presto_analytics',
158
mysql_conn_id='stats_db',
159
dag=dag
160
)
161
162
# Advanced statistics with custom expressions
163
advanced_stats = HiveStatsCollectionOperator(
164
task_id='advanced_stats_collection',
165
table='warehouse.sales_data',
166
partition={'ds': '{{ ds }}'},
167
extra_exprs={
168
'revenue_percentile_95': 'APPROX_PERCENTILE(revenue, 0.95)',
169
'unique_customers': 'COUNT(DISTINCT customer_id)',
170
'avg_order_value': 'AVG(order_total)',
171
'max_transaction_time': 'MAX(transaction_timestamp)'
172
},
173
excluded_columns=['raw_data_blob', 'customer_notes'],
174
dag=dag
175
)
176
177
# Custom column assignment function
178
def custom_assignment(col_name: str, col_type: str) -> dict | None:
179
if col_type.lower().startswith('varchar'):
180
return {
181
(col_name, 'max_length'): f'MAX(LENGTH({col_name}))',
182
(col_name, 'avg_length'): f'AVG(LENGTH({col_name}))'
183
}
184
elif col_type.lower() in ['int', 'bigint', 'double', 'decimal']:
185
return {
186
(col_name, 'min'): f'MIN({col_name})',
187
(col_name, 'max'): f'MAX({col_name})',
188
(col_name, 'avg'): f'AVG({col_name})',
189
(col_name, 'stddev'): f'STDDEV({col_name})'
190
}
191
return None
192
193
custom_stats = HiveStatsCollectionOperator(
194
task_id='custom_stats_collection',
195
table='warehouse.product_metrics',
196
partition={'ds': '{{ ds }}'},
197
assignment_func=custom_assignment,
198
dag=dag
199
)
200
```
201
202
## Template Support
203
204
### Airflow Template Fields
205
206
Both operators support extensive templating through Airflow's Jinja2 engine:
207
208
- **HiveOperator**: `hql`, `schema`, `hive_cli_conn_id`, `mapred_queue`, `hiveconfs`, `mapred_job_name`, `mapred_queue_priority`, `proxy_user`
209
- **HiveStatsCollectionOperator**: `table`, `partition`, `ds`, `dttm`
210
211
### Hiveconf Jinja Translation
212
213
When `hiveconf_jinja_translate=True`, the operator automatically translates Hive-style variable substitution to Jinja2 templating:
214
215
- `${var}` → `{{ var }}`
216
- `${hiveconf:var}` → `{{ var }}`
217
218
### Template File Extensions
219
220
HiveOperator automatically processes files with `.hql` and `.sql` extensions as templates, enabling external script management with Jinja2 variable substitution.
221
222
## MapReduce Configuration
223
224
### Queue Management
225
226
Configure Hadoop scheduler queues and priorities:
227
228
```python
229
# Queue configuration options
230
HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW']
231
232
operator = HiveOperator(
233
task_id='priority_job',
234
hql='SELECT * FROM large_table',
235
mapred_queue='analytics_high',
236
mapred_queue_priority='HIGH',
237
mapred_job_name='{{ dag.dag_id }}_{{ task.task_id }}_{{ ds }}',
238
dag=dag
239
)
240
```
241
242
### Job Naming Templates
243
244
Customize MapReduce job names for monitoring and debugging:
245
246
```python
247
# Template supports: hostname, dag_id, task_id, execution_date
248
operator = HiveOperator(
249
task_id='named_job',
250
hql='CREATE TABLE summary AS SELECT * FROM source',
251
mapred_job_name='ETL_{{ dag.dag_id }}_{{ task.task_id }}_{{ ts_nodash }}',
252
dag=dag
253
)
254
```
255
256
### Hive Configuration Parameters
257
258
Pass runtime configuration via `hiveconfs` parameter:
259
260
```python
261
operator = HiveOperator(
262
task_id='optimized_query',
263
hql='SELECT * FROM partitioned_table WHERE ds = "{{ ds }}"',
264
hiveconfs={
265
'hive.exec.dynamic.partition': 'true',
266
'hive.exec.dynamic.partition.mode': 'nonstrict',
267
'hive.exec.max.dynamic.partitions': '10000',
268
'hive.exec.compress.output': 'true',
269
'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.SnappyCodec',
270
'mapred.job.queue.name': 'analytics'
271
},
272
dag=dag
273
)
274
```
275
276
## Error Handling and Monitoring
277
278
### Task Termination
279
280
Both operators support graceful task termination through the `on_kill()` method, which cleanly stops running Hive jobs when tasks are cancelled or killed.
281
282
### Script Processing
283
284
HiveOperator supports script preprocessing with `script_begin_tag` to skip setup sections and focus on core processing logic:
285
286
```python
287
operator = HiveOperator(
288
task_id='process_script',
289
hql='full_etl_script.hql',
290
script_begin_tag='-- MAIN PROCESSING BEGINS',
291
dag=dag
292
)
293
```
294
295
### Context Access
296
297
Both operators have full access to Airflow context variables for dynamic execution:
298
299
```python
300
# Available context variables in templates
301
template_context = {
302
'ds': '2024-01-01', # Execution date
303
'ds_nodash': '20240101',
304
'ts': '2024-01-01T00:00:00+00:00', # Timestamp
305
'dag': dag_object,
306
'task': task_object,
307
'macros': airflow_macros,
308
'params': user_defined_params
309
}
310
```