or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md

execution-engine.mddocs/

0

# Execution Engine

1

2

The Apache Spark Hive integration execution engine provides specialized physical plans and execution strategies for Hive table operations, including table scanning, data insertion, and table creation with optimized performance for Hive-compatible formats.

3

4

## Physical Execution Plans

5

6

### HiveTableScanExec

7

8

Physical execution plan for scanning Hive tables with partition pruning and predicate pushdown capabilities.

9

10

```scala { .api }

11

case class HiveTableScanExec(

12

requestedAttributes: Seq[Attribute],

13

relation: HiveTableRelation,

14

partitionPruningPred: Seq[Expression]

15

) extends LeafExecNode with CodegenSupport {

16

17

def doExecute(): RDD[InternalRow]

18

def inputRDDs(): Seq[RDD[InternalRow]]

19

def doProduce(ctx: CodegenContext): String

20

def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String

21

}

22

```

23

24

**Usage Example:**

25

26

The HiveTableScanExec is automatically generated when querying Hive tables:

27

28

```scala

29

// This query will generate HiveTableScanExec plan

30

val result = spark.sql("""

31

SELECT id, name, department

32

FROM employee

33

WHERE department = 'Engineering' AND hire_date > '2020-01-01'

34

""")

35

36

// View execution plan

37

result.explain(true)

38

```

39

40

**Key Features:**

41

- **Partition Pruning**: Automatically eliminates irrelevant partitions

42

- **Predicate Pushdown**: Pushes filters to storage layer when possible

43

- **Column Pruning**: Reads only required columns

44

- **Code Generation**: Supports code generation for better performance

45

46

### HiveTableRelation

47

48

Logical representation of a Hive table used in physical planning.

49

50

```scala { .api }

51

case class HiveTableRelation(

52

tableMeta: CatalogTable,

53

dataCols: Seq[Attribute],

54

partitionCols: Seq[Attribute]

55

) extends LeafNode with MultiInstanceRelation {

56

57

def output: Seq[Attribute]

58

def refresh(): Unit

59

def newInstance(): HiveTableRelation

60

}

61

```

62

63

## Data Insertion Operations

64

65

### InsertIntoHiveTable

66

67

Command for inserting data into Hive tables with support for static and dynamic partitioning.

68

69

```scala { .api }

70

case class InsertIntoHiveTable(

71

table: CatalogTable,

72

partition: Map[String, Option[String]],

73

query: LogicalPlan,

74

overwrite: Boolean,

75

ifPartitionNotExists: Boolean

76

) extends UnaryCommand {

77

78

def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

79

def innerChildren: Seq[QueryPlan[_]]

80

}

81

```

82

83

**Usage Examples:**

84

85

**Static Partitioning:**

86

```scala

87

// Insert into specific partition

88

spark.sql("""

89

INSERT INTO TABLE partitioned_sales PARTITION(year=2023, month=12)

90

SELECT transaction_id, amount, customer_id FROM daily_sales

91

WHERE date_col = '2023-12-01'

92

""")

93

94

// Overwrite partition

95

spark.sql("""

96

INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month=12)

97

SELECT transaction_id, amount, customer_id FROM corrected_sales

98

WHERE date_col = '2023-12-01'

99

""")

100

```

101

102

**Dynamic Partitioning:**

103

```scala

104

// Enable dynamic partitioning

105

spark.conf.set("hive.exec.dynamic.partition", "true")

106

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

107

108

// Insert with dynamic partitioning

109

spark.sql("""

110

INSERT INTO TABLE partitioned_sales PARTITION(year, month)

111

SELECT transaction_id, amount, customer_id,

112

year(transaction_date), month(transaction_date)

113

FROM raw_sales

114

""")

115

```

116

117

**Conditional Insert:**

118

```scala

119

// Insert only if partition doesn't exist

120

spark.sql("""

121

ALTER TABLE partitioned_sales ADD IF NOT EXISTS PARTITION(year=2023, month=11)

122

""")

123

124

spark.sql("""

125

INSERT INTO TABLE partitioned_sales PARTITION(year=2023, month=11)

126

SELECT * FROM source_data WHERE year=2023 AND month=11

127

""")

128

```

129

130

## Table Creation Operations

131

132

### CreateHiveTableAsSelectCommand

133

134

Command for creating Hive tables from query results with configurable storage formats and properties.

135

136

```scala { .api }

137

case class CreateHiveTableAsSelectCommand(

138

tableDesc: CatalogTable,

139

query: LogicalPlan,

140

ignoreIfExists: Boolean

141

) extends DataWritingCommand {

142

143

def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

144

def innerChildren: Seq[QueryPlan[_]]

145

}

146

```

147

148

**Usage Examples:**

149

150

**Basic CTAS:**

151

```scala

152

spark.sql("""

153

CREATE TABLE employee_summary AS

154

SELECT department,

155

COUNT(*) as employee_count,

156

AVG(salary) as avg_salary

157

FROM employee

158

GROUP BY department

159

""")

160

```

161

162

**CTAS with Storage Format:**

163

```scala

164

spark.sql("""

165

CREATE TABLE employee_orc

166

USING HIVE

167

STORED AS ORC

168

AS SELECT * FROM employee

169

""")

170

```

171

172

**CTAS with Partitioning:**

173

```scala

174

spark.sql("""

175

CREATE TABLE partitioned_employee_summary

176

USING HIVE

177

PARTITIONED BY (department)

178

STORED AS PARQUET

179

AS SELECT id, name, salary, department FROM employee

180

""")

181

```

182

183

**CTAS with Properties:**

184

```scala

185

spark.sql("""

186

CREATE TABLE compressed_employee

187

USING HIVE

188

STORED AS ORC

189

TBLPROPERTIES (

190

'orc.compress'='SNAPPY',

191

'orc.stripe.size'='67108864'

192

)

193

AS SELECT * FROM employee

194

""")

195

```

196

197

## Script Transformation

198

199

### ScriptTransformationExec

200

201

Execution plan for TRANSFORM queries using external scripts (MAP-REDUCE style processing).

202

203

```scala { .api }

204

case class ScriptTransformationExec(

205

input: Seq[Expression],

206

script: String,

207

output: Seq[Attribute],

208

child: SparkPlan,

209

ioschema: HiveScriptIOSchema

210

) extends UnaryExecNode {

211

212

def doExecute(): RDD[InternalRow]

213

protected def withNewChildInternal(newChild: SparkPlan): ScriptTransformationExec

214

}

215

```

216

217

**Usage Example:**

218

219

```scala

220

// Transform using external script

221

spark.sql("""

222

SELECT TRANSFORM(id, name, salary)

223

USING 'python process_employee.py'

224

AS (processed_id INT, processed_name STRING, salary_grade STRING)

225

FROM employee

226

""")

227

228

// Transform with custom input/output format

229

spark.sql("""

230

SELECT TRANSFORM(name, department)

231

ROW FORMAT DELIMITED

232

FIELDS TERMINATED BY '\t'

233

USING '/usr/bin/python3 transform_data.py'

234

ROW FORMAT DELIMITED

235

FIELDS TERMINATED BY ','

236

AS (transformed_name STRING, dept_code STRING)

237

FROM employee

238

""")

239

```

240

241

## Hive Query Strategies

242

243

### HiveStrategies

244

245

Planning strategies specific to Hive integration that determine optimal execution plans.

246

247

```scala { .api }

248

object HiveStrategies extends Strategy {

249

def apply(plan: LogicalPlan): Seq[SparkPlan]

250

251

// Specific strategies

252

object Scripts extends Strategy

253

object DataSinks extends Strategy

254

object DDLStrategy extends Strategy

255

}

256

```

257

258

**Key Strategies:**

259

260

1. **Hive Table Scans**: Optimized scanning of Hive tables

261

2. **Script Transformations**: Execution of TRANSFORM queries

262

3. **Data Sinks**: Efficient writing to Hive tables

263

4. **DDL Operations**: Handling Hive DDL commands

264

265

## Optimization Features

266

267

### Predicate Pushdown

268

269

Automatic pushdown of filters to reduce data scanning:

270

271

```scala

272

// Filters pushed down to storage layer

273

val optimizedQuery = spark.sql("""

274

SELECT id, name

275

FROM large_table

276

WHERE year = 2023 AND month = 12 AND status = 'ACTIVE'

277

""")

278

279

// Check pushdown in execution plan

280

optimizedQuery.queryExecution.executedPlan

281

```

282

283

### Partition Pruning

284

285

Elimination of unnecessary partition scans:

286

287

```scala

288

// Only scans relevant partitions

289

val prunedQuery = spark.sql("""

290

SELECT COUNT(*)

291

FROM partitioned_sales

292

WHERE year IN (2022, 2023) AND month > 6

293

""")

294

295

// Verify partition pruning

296

prunedQuery.queryExecution.optimizedPlan

297

```

298

299

### Column Pruning

300

301

Reading only required columns from storage:

302

303

```scala

304

// Only reads 'name' and 'salary' columns

305

val columnPrunedQuery = spark.sql("""

306

SELECT name, salary

307

FROM employee_with_many_columns

308

WHERE department = 'Engineering'

309

""")

310

```

311

312

## Bucketed Table Support

313

314

### Bucketed Reads

315

316

Optimized reading from bucketed Hive tables:

317

318

```scala

319

// Create bucketed table

320

spark.sql("""

321

CREATE TABLE bucketed_employee (

322

id INT, name STRING, department STRING, salary DOUBLE

323

) USING HIVE

324

CLUSTERED BY (id) INTO 4 BUCKETS

325

STORED AS ORC

326

""")

327

328

// Bucketed joins (automatic optimization)

329

val bucketedJoin = spark.sql("""

330

SELECT e1.name, e2.name as manager_name

331

FROM bucketed_employee e1

332

JOIN bucketed_employee e2 ON e1.manager_id = e2.id

333

""")

334

```

335

336

### Sort-Merge Bucket Joins

337

338

High-performance joins for bucketed tables:

339

340

```scala

341

// Enable sort-merge bucket joins

342

spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")

343

344

// Automatic SMB join for compatible bucketed tables

345

val smbJoin = spark.sql("""

346

SELECT o.order_id, c.customer_name

347

FROM bucketed_orders o

348

JOIN bucketed_customers c ON o.customer_id = c.customer_id

349

""")

350

```

351

352

## Configuration and Tuning

353

354

### Execution Configuration

355

356

```scala

357

// Configure Hive execution settings

358

spark.conf.set("hive.exec.dynamic.partition", "true")

359

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

360

spark.conf.set("hive.exec.max.dynamic.partitions", "5000")

361

spark.conf.set("hive.exec.max.dynamic.partitions.pernode", "2000")

362

363

// Control small files

364

spark.conf.set("hive.merge.tezfiles", "true")

365

spark.conf.set("hive.merge.smallfiles.avgsize", "16000000")

366

```

367

368

### Performance Tuning

369

370

```scala

371

// Optimize for large tables

372

spark.conf.set("spark.sql.adaptive.enabled", "true")

373

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

374

375

// Configure join strategies

376

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

377

spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")

378

379

// Memory management

380

spark.conf.set("spark.sql.shuffle.partitions", "200")

381

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

382

```

383

384

## Error Handling

385

386

### Common Execution Errors

387

388

**Partition Not Found:**

389

```scala

390

// Handle missing partitions

391

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

392

393

// Graceful handling

394

val safeQuery = spark.sql("""

395

SELECT * FROM partitioned_table

396

WHERE year = 2023 AND month BETWEEN 1 AND 12

397

""").filter($"year".isNotNull && $"month".isNotNull)

398

```

399

400

**Schema Evolution:**

401

```scala

402

// Handle schema changes

403

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

404

405

// Read with schema tolerance

406

val schema_tolerant = spark.read

407

.option("mergeSchema", "true")

408

.table("evolving_table")

409

```

410

411

**Resource Constraints:**

412

```scala

413

// Optimize for limited resources

414

spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0")

415

spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

416

```

417

418

## Monitoring and Debugging

419

420

### Execution Plan Analysis

421

422

```scala

423

// View complete execution plan

424

val query = spark.sql("SELECT * FROM large_hive_table WHERE id > 1000")

425

426

// Physical plan

427

query.explain(true)

428

429

// Formatted plan

430

query.queryExecution.debug.codegen()

431

432

// Execution statistics

433

query.queryExecution.executedPlan.execute().count()

434

```

435

436

### Performance Metrics

437

438

```scala

439

// Enable metrics collection

440

spark.conf.set("spark.sql.adaptive.enabled", "true")

441

spark.conf.set("spark.sql.adaptive.logLevel", "INFO")

442

443

// Access metrics after execution

444

val metrics = query.queryExecution.executedPlan.metrics

445

metrics.foreach { case (name, metric) =>

446

println(s"$name: ${metric.value}")

447

}

448

```

449

450

## Types

451

452

```scala { .api }

453

// Base execution node

454

trait SparkPlan extends QueryPlan[SparkPlan] {

455

def execute(): RDD[InternalRow]

456

def executeCollect(): Array[InternalRow]

457

def metrics: Map[String, SQLMetric]

458

}

459

460

// Leaf execution node

461

trait LeafExecNode extends SparkPlan {

462

final override def children: Seq[SparkPlan] = Nil

463

def doExecute(): RDD[InternalRow]

464

}

465

466

// Unary execution node

467

trait UnaryExecNode extends SparkPlan {

468

def child: SparkPlan

469

final override def children: Seq[SparkPlan] = child :: Nil

470

}

471

472

// Code generation support

473

trait CodegenSupport extends SparkPlan {

474

def doProduce(ctx: CodegenContext): String

475

def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String

476

}

477

478

// Data writing command

479

trait DataWritingCommand extends Command {

480

def outputColumnNames: Seq[String]

481

def outputColumns: Seq[Attribute]

482

}

483

484

// Command interface

485

trait Command extends LogicalPlan {

486

override def output: Seq[Attribute] = Seq.empty

487

override def children: Seq[LogicalPlan] = Seq.empty

488

def run(sparkSession: SparkSession): Seq[Row]

489

}

490

491

// Hive script I/O schema

492

case class HiveScriptIOSchema(

493

inputRowFormat: Seq[(String, String)],

494

outputRowFormat: Seq[(String, String)],

495

inputSerdeClass: Option[String],

496

outputSerdeClass: Option[String],

497

inputSerdeProps: Seq[(String, String)],

498

outputSerdeProps: Seq[(String, String)],

499

recordReaderClass: Option[String],

500

recordWriterClass: Option[String],

501

schemaLess: Boolean

502

)

503

504

// Table partition specification

505

type TablePartitionSpec = Map[String, String]

506

507

// SQL metric for execution statistics

508

class SQLMetric(

509

val metricType: String,

510

initValue: Long = 0L

511

) extends AccumulatorV2[Long, Long]

512

513

// Expression for column references and computations

514

trait Expression extends TreeNode[Expression] {

515

def dataType: DataType

516

def nullable: Boolean

517

def eval(input: InternalRow): Any

518

}

519

520

// Attribute for column metadata

521

trait Attribute extends Expression with NamedExpression {

522

def name: String

523

def dataType: DataType

524

def nullable: Boolean

525

def qualifier: Seq[String]

526

}

527

```