or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md

index.mddocs/

0

# Apache Spark Hive Integration

1

2

Apache Spark SQL Hive Integration provides a comprehensive compatibility layer for running Hive queries and accessing Hive tables through Spark's distributed processing engine. This module enables organizations to leverage Spark's high-performance capabilities while maintaining compatibility with existing Hive-based data warehousing infrastructure.

3

4

## Package Information

5

6

- **Package Name**: spark-hive_2.10

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-hive_2.10

11

- **Version**: 2.2.3

12

- **Maven Installation**:

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-hive_2.10</artifactId>

17

<version>2.2.3</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```scala

24

import org.apache.spark.sql.SparkSession

25

import org.apache.spark.sql.hive.HiveContext // Deprecated

26

import org.apache.spark.sql.hive.HiveUtils

27

```

28

29

## Basic Usage

30

31

### Modern Approach (Recommended)

32

33

```scala

34

import org.apache.spark.sql.SparkSession

35

36

// Create SparkSession with Hive support

37

val spark = SparkSession.builder()

38

.appName("Hive Integration Example")

39

.config("hive.metastore.uris", "thrift://localhost:9083")

40

.enableHiveSupport()

41

.getOrCreate()

42

43

// Use HiveQL

44

val df = spark.sql("SELECT * FROM hive_table")

45

df.show()

46

47

// Access Hive tables

48

val table = spark.table("my_database.my_table")

49

table.createOrReplaceTempView("temp_table")

50

```

51

52

### Legacy Approach (Deprecated)

53

54

```scala

55

import org.apache.spark.sql.hive.HiveContext

56

import org.apache.spark.SparkContext

57

58

val sc = new SparkContext()

59

val hiveContext = new HiveContext(sc)

60

61

// Run HiveQL queries

62

val results = hiveContext.sql("SELECT * FROM hive_table")

63

results.show()

64

```

65

66

## Architecture

67

68

The Spark Hive integration is built around several key components:

69

70

- **Hive Compatibility Layer**: Provides seamless integration with existing Hive metastore and table formats

71

- **Query Engine**: Translates HiveQL queries into Spark execution plans using Catalyst optimizer

72

- **File Format Support**: Native support for ORC, Parquet, and other Hive-compatible formats

73

- **UDF Integration**: Execute existing Hive UDFs, UDAFs, and UDTFs within Spark

74

- **Metastore Integration**: Read/write table metadata from Hive metastore with version compatibility

75

76

## Capabilities

77

78

### Core Hive Integration

79

80

Primary entry points and configuration utilities for Hive integration.

81

82

```scala { .api }

83

// Modern entry point (recommended)

84

object SparkSession {

85

def builder(): Builder

86

}

87

88

class Builder {

89

def enableHiveSupport(): Builder

90

}

91

92

// Legacy entry point (deprecated since 2.0.0)

93

class HiveContext(sparkSession: SparkSession) extends SQLContext(sparkSession) {

94

def this(sc: SparkContext)

95

def this(sc: JavaSparkContext)

96

def newSession(): HiveContext

97

def refreshTable(tableName: String): Unit

98

}

99

100

// Configuration utilities

101

object HiveUtils {

102

val hiveExecutionVersion: String

103

val HIVE_METASTORE_VERSION: ConfigEntry[String]

104

val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]

105

val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]

106

107

def withHiveExternalCatalog(sc: SparkContext): SparkContext

108

def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]

109

}

110

```

111

112

[Core Hive Integration](./core-hive-integration.md)

113

114

### File Format Support

115

116

Native support for ORC and Hive-compatible file formats with optimization features.

117

118

```scala { .api }

119

class OrcFileFormat extends FileFormat with DataSourceRegister {

120

def shortName(): String

121

def inferSchema(

122

sparkSession: SparkSession,

123

options: Map[String, String],

124

files: Seq[FileStatus]

125

): Option[StructType]

126

127

def prepareWrite(

128

sparkSession: SparkSession,

129

job: Job,

130

options: Map[String, String],

131

dataSchema: StructType

132

): OutputWriterFactory

133

}

134

135

class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {

136

def prepareWrite(

137

sparkSession: SparkSession,

138

job: Job,

139

options: Map[String, String],

140

dataSchema: StructType

141

): OutputWriterFactory

142

}

143

```

144

145

[File Format Support](./file-formats.md)

146

147

### UDF Integration

148

149

Comprehensive support for executing Hive user-defined functions within Spark.

150

151

```scala { .api }

152

// Simple UDFs

153

case class HiveSimpleUDF(

154

name: String,

155

funcWrapper: HiveFunctionWrapper,

156

children: Seq[Expression]

157

) extends Expression {

158

def eval(input: InternalRow): Any

159

def prettyName: String

160

}

161

162

// Generic UDFs

163

case class HiveGenericUDF(

164

name: String,

165

funcWrapper: HiveFunctionWrapper,

166

children: Seq[Expression]

167

) extends Expression

168

169

// Aggregate functions (UDAFs)

170

case class HiveUDAFFunction(

171

name: String,

172

funcWrapper: HiveFunctionWrapper,

173

children: Seq[Expression]

174

) extends TypedImperativeAggregate[AggregationBuffer]

175

176

// Table-generating functions (UDTFs)

177

case class HiveGenericUDTF(

178

name: String,

179

funcWrapper: HiveFunctionWrapper,

180

children: Seq[Expression]

181

) extends Generator

182

```

183

184

[UDF Integration](./udf-integration.md)

185

186

### Metastore Operations

187

188

Interface for interacting with Hive metastore for database, table, and partition management.

189

190

```scala { .api }

191

trait HiveClient {

192

// Version and configuration

193

def version: HiveVersion

194

def getConf(key: String, defaultValue: String): String

195

196

// SQL execution

197

def runSqlHive(sql: String): Seq[String]

198

199

// Database operations

200

def listTables(dbName: String): Seq[String]

201

def setCurrentDatabase(databaseName: String): Unit

202

def getDatabase(name: String): CatalogDatabase

203

def databaseExists(dbName: String): Boolean

204

205

// Table operations

206

def tableExists(dbName: String, tableName: String): Boolean

207

def getTable(dbName: String, tableName: String): CatalogTable

208

def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit

209

def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

210

211

// Partition operations

212

def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit

213

def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]

214

215

// Function operations

216

def createFunction(db: String, func: CatalogFunction): Unit

217

def listFunctions(db: String, pattern: String): Seq[String]

218

}

219

```

220

221

[Metastore Operations](./metastore-operations.md)

222

223

### Execution Engine

224

225

Physical execution plans and strategies for Hive table operations.

226

227

```scala { .api }

228

case class HiveTableScanExec(

229

requestedAttributes: Seq[Attribute],

230

relation: HiveTableRelation,

231

partitionPruningPred: Seq[Expression]

232

) extends LeafExecNode {

233

def doExecute(): RDD[InternalRow]

234

}

235

236

case class InsertIntoHiveTable(

237

table: CatalogTable,

238

partition: Map[String, Option[String]],

239

query: LogicalPlan,

240

overwrite: Boolean,

241

ifPartitionNotExists: Boolean

242

) extends UnaryCommand {

243

def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]

244

}

245

246

case class CreateHiveTableAsSelectCommand(

247

tableDesc: CatalogTable,

248

query: LogicalPlan,

249

ignoreIfExists: Boolean

250

) extends DataWritingCommand

251

```

252

253

[Execution Engine](./execution-engine.md)

254

255

## Configuration

256

257

### Key Hive Configuration Properties

258

259

- **spark.sql.hive.metastore.version**: Hive metastore version (default: 1.2.1)

260

- **spark.sql.hive.metastore.jars**: Location of Hive metastore JARs ("builtin", "maven", or classpath)

261

- **spark.sql.hive.convertMetastoreParquet**: Use Spark's Parquet reader for Hive tables (default: true)

262

- **spark.sql.hive.convertMetastoreOrc**: Use Spark's ORC reader for Hive tables (default: true)

263

264

### Supported Hive Versions

265

266

- Hive 0.12.0 through 2.1.1

267

- Default execution version: 1.2.1

268

- Configurable metastore version for compatibility

269

270

## Migration from HiveContext

271

272

The `HiveContext` class is deprecated as of Spark 2.0.0. Migration steps:

273

274

1. Replace `HiveContext` with `SparkSession.builder().enableHiveSupport()`

275

2. Update configuration from Hive-specific settings to Spark SQL settings

276

3. Use `spark.sql()` instead of `hiveContext.sql()` for queries

277

4. Access catalog through `spark.catalog` instead of direct metastore calls

278

279

## Error Handling

280

281

Common exceptions and error patterns:

282

283

- **AnalysisException**: Thrown for invalid table references or schema mismatches

284

- **HiveException**: Wrapper for underlying Hive metastore errors

285

- **UnsupportedOperationException**: For unsupported Hive features or version incompatibilities

286

287

## Types

288

289

```scala { .api }

290

// Configuration entries

291

abstract class ConfigEntry[T] {

292

def key: String

293

def defaultValue: Option[T]

294

def doc: String

295

}

296

297

// Hive version representation

298

abstract class HiveVersion {

299

def fullVersion: String

300

def extraDeps: Seq[String]

301

def exclusions: Seq[String]

302

}

303

304

// Function wrapper for Hive UDFs

305

case class HiveFunctionWrapper(

306

className: String,

307

instance: AnyRef

308

)

309

310

// Table partition specification

311

type TablePartitionSpec = Map[String, String]

312

313

// Catalog types (from Spark SQL)

314

case class CatalogTable(

315

identifier: TableIdentifier,

316

tableType: CatalogTableType,

317

storage: CatalogStorageFormat,

318

schema: StructType,

319

partitionColumnNames: Seq[String] = Seq.empty,

320

bucketSpec: Option[BucketSpec] = None

321

)

322

323

case class CatalogTablePartition(

324

spec: TablePartitionSpec,

325

storage: CatalogStorageFormat,

326

parameters: Map[String, String] = Map.empty

327

)

328

329

case class CatalogDatabase(

330

name: String,

331

description: String,

332

locationUri: String,

333

properties: Map[String, String]

334

)

335

336

case class CatalogFunction(

337

identifier: FunctionIdentifier,

338

className: String,

339

resources: Seq[FunctionResource]

340

)

341

```