or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-transfers.mdhooks-connections.mdindex.mdmacros-utilities.mdpartition-monitoring.mdquery-execution.md

query-execution.mddocs/

0

# Query Execution

1

2

Execute HQL scripts and queries with comprehensive support for templating, parameter substitution, MapReduce configuration, and job monitoring. Includes operators for running ad-hoc queries and collecting detailed table statistics.

3

4

## Capabilities

5

6

### Hive Query Operator

7

8

Primary operator for executing HQL code or Hive scripts within Airflow workflows with full templating and configuration support.

9

10

```python { .api }

11

class HiveOperator:

12

template_fields: tuple[str, ...] = (

13

'hql', 'schema', 'hive_cli_conn_id', 'mapred_queue',

14

'hiveconfs', 'mapred_job_name', 'mapred_queue_priority', 'proxy_user'

15

)

16

template_ext: tuple[str, ...] = ('.hql', '.sql')

17

18

def __init__(

19

self,

20

*,

21

hql: str,

22

hive_cli_conn_id: str = 'hive_cli_default',

23

schema: str = 'default',

24

hiveconfs: dict | None = None,

25

hiveconf_jinja_translate: bool = False,

26

script_begin_tag: str | None = None,

27

mapred_queue: str | None = None,

28

mapred_queue_priority: str | None = None,

29

mapred_job_name: str | None = None,

30

hive_cli_params: str = '',

31

auth: str | None = None,

32

proxy_user: str | None = None,

33

**kwargs

34

): ...

35

36

def execute(self, context: 'Context') -> None: ...

37

def on_kill(self) -> None: ...

38

```

39

40

**Usage Examples:**

41

42

```python

43

from airflow.providers.apache.hive.operators.hive import HiveOperator

44

from datetime import datetime, timedelta

45

46

# Basic HQL execution

47

simple_query = HiveOperator(

48

task_id='simple_hive_query',

49

hql='''

50

SELECT COUNT(*) as record_count

51

FROM warehouse.sales

52

WHERE ds = '{{ ds }}';

53

''',

54

hive_cli_conn_id='hive_production',

55

schema='warehouse',

56

dag=dag

57

)

58

59

# Complex query with configuration

60

etl_process = HiveOperator(

61

task_id='daily_etl',

62

hql='''

63

SET hive.exec.dynamic.partition=true;

64

SET hive.exec.dynamic.partition.mode=nonstrict;

65

66

INSERT OVERWRITE TABLE warehouse.sales_summary

67

PARTITION (ds='{{ ds }}', region)

68

SELECT

69

product_id,

70

SUM(amount) as total_sales,

71

COUNT(*) as transaction_count,

72

AVG(amount) as avg_sale,

73

region

74

FROM warehouse.daily_sales

75

WHERE ds = '{{ ds }}'

76

GROUP BY product_id, region;

77

''',

78

hiveconfs={

79

'hive.exec.compress.output': 'true',

80

'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'

81

},

82

mapred_queue='analytics',

83

mapred_queue_priority='HIGH',

84

mapred_job_name='sales_etl_{{ ds }}',

85

dag=dag

86

)

87

88

# Execute script from file

89

script_execution = HiveOperator(

90

task_id='run_hive_script',

91

hql='process_customer_data.hql', # File path relative to DAG

92

hiveconf_jinja_translate=True,

93

script_begin_tag='-- BEGIN PROCESSING',

94

proxy_user='etl_service_account',

95

dag=dag

96

)

97

98

# Query with custom parameters

99

parameterized_query = HiveOperator(

100

task_id='parameterized_analysis',

101

hql='''

102

SELECT {{ params.metric_column }}

103

FROM {{ params.source_table }}

104

WHERE ds BETWEEN '{{ ds }}' AND '{{ macros.ds_add(ds, 7) }}'

105

AND region IN ({{ params.regions | join("','") | "'" + _0 + "'" }})

106

GROUP BY {{ params.group_by_columns | join(', ') }};

107

''',

108

params={

109

'metric_column': 'SUM(revenue) as total_revenue',

110

'source_table': 'warehouse.sales',

111

'regions': ['us', 'eu', 'asia'],

112

'group_by_columns': ['product_category', 'customer_segment']

113

},

114

dag=dag

115

)

116

```

117

118

### Hive Statistics Collection Operator

119

120

Specialized operator for gathering partition statistics using Presto queries and storing results in MySQL for monitoring and optimization.

121

122

```python { .api }

123

class HiveStatsCollectionOperator:

124

template_fields: tuple[str, ...] = ('table', 'partition', 'ds', 'dttm')

125

126

def __init__(

127

self,

128

*,

129

table: str,

130

partition: Any,

131

extra_exprs: dict[str, Any] | None = None,

132

excluded_columns: list[str] | None = None,

133

assignment_func: callable | None = None,

134

metastore_conn_id: str = 'metastore_default',

135

presto_conn_id: str = 'presto_default',

136

mysql_conn_id: str = 'airflow_db',

137

ds: str = '{{ ds }}',

138

dttm: str = '{{ logical_date.isoformat() }}',

139

**kwargs

140

): ...

141

142

def execute(self, context: 'Context') -> None: ...

143

def get_default_exprs(self, col: str, col_type: str) -> dict: ...

144

```

145

146

**Usage Example:**

147

148

```python

149

from airflow.providers.apache.hive.operators.hive_stats import HiveStatsCollectionOperator

150

151

# Basic statistics collection

152

collect_stats = HiveStatsCollectionOperator(

153

task_id='collect_table_stats',

154

table='warehouse.customer_transactions',

155

partition={'ds': '{{ ds }}', 'region': 'us'},

156

metastore_conn_id='metastore_prod',

157

presto_conn_id='presto_analytics',

158

mysql_conn_id='stats_db',

159

dag=dag

160

)

161

162

# Advanced statistics with custom expressions

163

advanced_stats = HiveStatsCollectionOperator(

164

task_id='advanced_stats_collection',

165

table='warehouse.sales_data',

166

partition={'ds': '{{ ds }}'},

167

extra_exprs={

168

'revenue_percentile_95': 'APPROX_PERCENTILE(revenue, 0.95)',

169

'unique_customers': 'COUNT(DISTINCT customer_id)',

170

'avg_order_value': 'AVG(order_total)',

171

'max_transaction_time': 'MAX(transaction_timestamp)'

172

},

173

excluded_columns=['raw_data_blob', 'customer_notes'],

174

dag=dag

175

)

176

177

# Custom column assignment function

178

def custom_assignment(col_name: str, col_type: str) -> dict | None:

179

if col_type.lower().startswith('varchar'):

180

return {

181

(col_name, 'max_length'): f'MAX(LENGTH({col_name}))',

182

(col_name, 'avg_length'): f'AVG(LENGTH({col_name}))'

183

}

184

elif col_type.lower() in ['int', 'bigint', 'double', 'decimal']:

185

return {

186

(col_name, 'min'): f'MIN({col_name})',

187

(col_name, 'max'): f'MAX({col_name})',

188

(col_name, 'avg'): f'AVG({col_name})',

189

(col_name, 'stddev'): f'STDDEV({col_name})'

190

}

191

return None

192

193

custom_stats = HiveStatsCollectionOperator(

194

task_id='custom_stats_collection',

195

table='warehouse.product_metrics',

196

partition={'ds': '{{ ds }}'},

197

assignment_func=custom_assignment,

198

dag=dag

199

)

200

```

201

202

## Template Support

203

204

### Airflow Template Fields

205

206

Both operators support extensive templating through Airflow's Jinja2 engine:

207

208

- **HiveOperator**: `hql`, `schema`, `hive_cli_conn_id`, `mapred_queue`, `hiveconfs`, `mapred_job_name`, `mapred_queue_priority`, `proxy_user`

209

- **HiveStatsCollectionOperator**: `table`, `partition`, `ds`, `dttm`

210

211

### Hiveconf Jinja Translation

212

213

When `hiveconf_jinja_translate=True`, the operator automatically translates Hive-style variable substitution to Jinja2 templating:

214

215

- `${var}``{{ var }}`

216

- `${hiveconf:var}``{{ var }}`

217

218

### Template File Extensions

219

220

HiveOperator automatically processes files with `.hql` and `.sql` extensions as templates, enabling external script management with Jinja2 variable substitution.

221

222

## MapReduce Configuration

223

224

### Queue Management

225

226

Configure Hadoop scheduler queues and priorities:

227

228

```python

229

# Queue configuration options

230

HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW']

231

232

operator = HiveOperator(

233

task_id='priority_job',

234

hql='SELECT * FROM large_table',

235

mapred_queue='analytics_high',

236

mapred_queue_priority='HIGH',

237

mapred_job_name='{{ dag.dag_id }}_{{ task.task_id }}_{{ ds }}',

238

dag=dag

239

)

240

```

241

242

### Job Naming Templates

243

244

Customize MapReduce job names for monitoring and debugging:

245

246

```python

247

# Template supports: hostname, dag_id, task_id, execution_date

248

operator = HiveOperator(

249

task_id='named_job',

250

hql='CREATE TABLE summary AS SELECT * FROM source',

251

mapred_job_name='ETL_{{ dag.dag_id }}_{{ task.task_id }}_{{ ts_nodash }}',

252

dag=dag

253

)

254

```

255

256

### Hive Configuration Parameters

257

258

Pass runtime configuration via `hiveconfs` parameter:

259

260

```python

261

operator = HiveOperator(

262

task_id='optimized_query',

263

hql='SELECT * FROM partitioned_table WHERE ds = "{{ ds }}"',

264

hiveconfs={

265

'hive.exec.dynamic.partition': 'true',

266

'hive.exec.dynamic.partition.mode': 'nonstrict',

267

'hive.exec.max.dynamic.partitions': '10000',

268

'hive.exec.compress.output': 'true',

269

'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.SnappyCodec',

270

'mapred.job.queue.name': 'analytics'

271

},

272

dag=dag

273

)

274

```

275

276

## Error Handling and Monitoring

277

278

### Task Termination

279

280

Both operators support graceful task termination through the `on_kill()` method, which cleanly stops running Hive jobs when tasks are cancelled or killed.

281

282

### Script Processing

283

284

HiveOperator supports script preprocessing with `script_begin_tag` to skip setup sections and focus on core processing logic:

285

286

```python

287

operator = HiveOperator(

288

task_id='process_script',

289

hql='full_etl_script.hql',

290

script_begin_tag='-- MAIN PROCESSING BEGINS',

291

dag=dag

292

)

293

```

294

295

### Context Access

296

297

Both operators have full access to Airflow context variables for dynamic execution:

298

299

```python

300

# Available context variables in templates

301

template_context = {

302

'ds': '2024-01-01', # Execution date

303

'ds_nodash': '20240101',

304

'ts': '2024-01-01T00:00:00+00:00', # Timestamp

305

'dag': dag_object,

306

'task': task_object,

307

'macros': airflow_macros,

308

'params': user_defined_params

309

}

310

```