Apache Spark Hive integration module that provides support for Hive tables, queries, and SerDes
Native support for Hive file formats including traditional Hive tables and optimized ORC files with Hive compatibility. This enables reading and writing data in various Hive-compatible formats while leveraging Spark's performance optimizations.
FileFormat implementation for traditional Hive tables supporting various SerDes and storage formats.
⚠️ Implementation Status: Currently write-only. Read operations are not implemented and will throw UnsupportedOperationException.
/**
* FileFormat implementation for Hive tables
* Currently supports WRITING data using Hive SerDes and OutputFormat classes
* Reading is not implemented - use HiveTableScanExec for reading Hive tables
*/
class HiveFileFormat extends FileFormat with DataSourceRegister with Logging {
/** Data source short name for SQL registration */
override def shortName(): String = "hive"
/**
* Schema inference is not supported - throws UnsupportedOperationException
* Schema must be provided from Hive metastore
*/
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
/**
* Prepare write operations for Hive tables
* @param sparkSession - Current SparkSession
* @param job - Hadoop Job configuration
* @param options - Write options including SerDe settings
* @param dataSchema - Schema of data to write
* @return OutputWriterFactory for creating individual file writers
*/
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
/**
* Build reader for scanning Hive table files
* @param sparkSession - Current SparkSession
* @param dataSchema - Schema of data to read
* @param partitionSchema - Schema of partition columns
* @param requiredSchema - Columns actually needed by query
* @param filters - Pushdown predicates
* @param options - Read options including SerDe settings
* @param hadoopConf - Hadoop configuration
* @return Function to create PartitionedFile readers
*/
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
}Usage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Create Hive table with specific file format
spark.sql("""
CREATE TABLE text_table (
id INT,
name STRING,
age INT
)
STORED AS TEXTFILE
LOCATION '/user/data/text_table'
""")
// Read from Hive table (automatically uses HiveFileFormat)
val df = spark.sql("SELECT * FROM text_table")
df.show()
// Write to Hive table with custom SerDe
spark.sql("""
CREATE TABLE custom_serde_table (
id INT,
data STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = "\"",
"escapeChar" = "\\"
)
STORED AS TEXTFILE
""")Hive-compatible ORC file format implementation with optimizations.
⚠️ Implementation Note: There are two ORC implementations in Spark:
org.apache.spark.sql.hive.orc.OrcFileFormat) - Uses Hive ORC libraries for compatibilityorg.apache.spark.sql.execution.datasources.orc.OrcFileFormat) - Uses Apache ORC directly for better performanceThe Hive ORC implementation documented here provides better Hive compatibility but may have different performance characteristics.
/**
* Hive-compatible ORC file format implementation
* Provides native ORC reading/writing with Hive metastore integration
*/
class OrcFileFormat extends FileFormat with DataSourceRegister with Logging {
/** Data source short name */
override def shortName(): String = "orc"
/**
* Infer schema from ORC files
* @param sparkSession - Current SparkSession
* @param options - Read options
* @param files - ORC files to analyze
* @return Inferred schema or None if cannot infer
*/
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
/**
* Build optimized ORC reader with predicate pushdown
* @param sparkSession - Current SparkSession
* @param dataSchema - Schema of data in files
* @param partitionSchema - Partition column schema
* @param requiredSchema - Columns needed by query
* @param filters - Pushdown predicates for ORC row groups
* @param options - Read options
* @param hadoopConf - Hadoop configuration
* @return Reader function for ORC files
*/
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
/**
* Prepare ORC write operations
* @param sparkSession - Current SparkSession
* @param job - Hadoop Job for configuration
* @param options - Write options including compression
* @param dataSchema - Schema of data to write
* @return OutputWriterFactory for creating ORC writers
*/
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
/**
* Check if vectorized reading is supported
* @param requiredSchema - Required columns
* @return true if vectorized reading can be used
*/
override def supportBatch(requiredSchema: StructType): Boolean = true
}Usage Examples:
// Create ORC table
spark.sql("""
CREATE TABLE orc_table (
id BIGINT,
name STRING,
age INT,
salary DOUBLE
)
STORED AS ORC
LOCATION '/user/data/orc_table'
TBLPROPERTIES (
'orc.compress' = 'SNAPPY',
'orc.bloom.filter.columns' = 'id,name'
)
""")
// Write data to ORC table with compression
df.write
.mode("overwrite")
.option("compression", "snappy")
.format("orc")
.saveAsTable("orc_table")
// Read with predicate pushdown (automatically optimized)
val filtered = spark.sql("""
SELECT name, salary
FROM orc_table
WHERE age > 25 AND salary > 50000
""")
filtered.explain(true) // Shows predicate pushdownConfiguration class for Hive-specific format options.
/**
* Configuration options for Hive data source operations
*/
class HiveOptions(parameters: Map[String, String]) {
/** File format specification (e.g., "textfile", "sequencefile", "orc") */
val fileFormat: Option[String] = parameters.get(HiveOptions.FILE_FORMAT)
/** Input format class name */
val inputFormat: Option[String] = parameters.get(HiveOptions.INPUT_FORMAT)
/** Output format class name */
val outputFormat: Option[String] = parameters.get(HiveOptions.OUTPUT_FORMAT)
/** SerDe class name */
val serde: Option[String] = parameters.get(HiveOptions.SERDE)
/**
* Check if input/output formats are explicitly specified
* @return true if both input and output formats are provided
*/
def hasInputOutputFormat: Boolean = inputFormat.isDefined && outputFormat.isDefined
/**
* Get SerDe properties from options
* @return Map of SerDe-specific properties
*/
def serdeProperties: Map[String, String] = {
parameters.filterKeys(!HiveOptions.delimiterOptions.contains(_))
}
}
object HiveOptions {
// Option key constants
val FILE_FORMAT = "fileFormat"
val INPUT_FORMAT = "inputFormat"
val OUTPUT_FORMAT = "outputFormat"
val SERDE = "serde"
// Common delimiter option mappings
val delimiterOptions: Map[String, String] = Map(
"field.delim" -> "field.delim",
"line.delim" -> "line.delim",
"collection.delim" -> "collection.delim",
"mapkey.delim" -> "mapkey.delim"
)
/**
* Get compression configuration for Hive writes
* @param sessionState - Hive SessionState
* @param hadoopConf - Hadoop configuration
* @param compressionCodec - Optional compression codec override
* @return Compression codec to use or None
*/
def getHiveWriteCompression(
sessionState: SessionState,
hadoopConf: Configuration,
compressionCodec: Option[String]
): Option[String]
}Comprehensive examples for different Hive file formats.
TextFile Format:
// Create table with TextFile format and custom delimiters
spark.sql("""
CREATE TABLE csv_data (
id INT,
name STRING,
email STRING,
age INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim' = ',',
'line.delim' = '\n',
'serialization.format' = ','
)
STORED AS TEXTFILE
LOCATION '/data/csv'
""")
// Load CSV data
spark.sql("""
LOAD DATA INPATH '/input/data.csv'
INTO TABLE csv_data
""")SequenceFile Format:
// Create SequenceFile table
spark.sql("""
CREATE TABLE sequence_data (
key STRING,
value STRING
)
STORED AS SEQUENCEFILE
LOCATION '/data/sequence'
""")
// Write data in SequenceFile format
df.write
.format("hive")
.option("fileFormat", "sequencefile")
.mode("overwrite")
.saveAsTable("sequence_data")Avro Format:
// Create Avro table
spark.sql("""
CREATE TABLE avro_data (
id BIGINT,
name STRING,
metadata MAP<STRING,STRING>
)
STORED AS AVRO
LOCATION '/data/avro'
TBLPROPERTIES (
'avro.schema.literal' = '{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}'
)
""")Parquet Format (with Hive compatibility):
// Create Parquet table with Hive metastore
spark.sql("""
CREATE TABLE parquet_data (
id BIGINT,
name STRING,
created_date DATE
)
STORED AS PARQUET
LOCATION '/data/parquet'
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY'
)
""")
// Automatic conversion to Spark's native Parquet reader
// (controlled by spark.sql.hive.convertMetastoreParquet)
val df = spark.sql("SELECT * FROM parquet_data")
df.explain() // Shows either HiveTableRelation or parquet scanCustom SerDe Integration:
// Register custom SerDe
spark.sql("ADD JAR /path/to/custom-serde.jar")
spark.sql("""
CREATE TABLE json_data (
id INT,
data STRING
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data/json'
""")
// Use custom SerDe for complex data
val jsonDF = spark.sql("SELECT get_json_object(data, '$.user.name') as username FROM json_data")Multi-Format Table Operations:
// Create partitioned table with different formats per partition
spark.sql("""
CREATE TABLE multi_format_data (
id INT,
name STRING,
value DOUBLE
)
PARTITIONED BY (format_type STRING)
STORED AS TEXTFILE
LOCATION '/data/multi_format'
""")
// Add partitions with different storage formats
spark.sql("""
ALTER TABLE multi_format_data
ADD PARTITION (format_type='text')
LOCATION '/data/multi_format/text'
""")
spark.sql("""
ALTER TABLE multi_format_data
ADD PARTITION (format_type='orc')
LOCATION '/data/multi_format/orc'
""")Compression Configuration:
// Configure compression for different formats
val spark = SparkSession.builder()
.config("spark.sql.hive.convertMetastoreOrc", "true")
.config("spark.sql.orc.compression.codec", "snappy")
.config("spark.sql.parquet.compression.codec", "gzip")
.enableHiveSupport()
.getOrCreate()
// Write with specific compression
df.write
.format("orc")
.option("compression", "zlib")
.mode("overwrite")
.saveAsTable("compressed_table")Vectorized Reading:
// Enable vectorized ORC reading
spark.conf.set("spark.sql.orc.impl", "hive")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
// Query benefits from vectorization
val result = spark.sql("""
SELECT sum(salary), avg(age)
FROM large_orc_table
WHERE department = 'Engineering'
""")
result.explain() // Shows vectorized operationsPredicate Pushdown:
// ORC predicate pushdown automatically applied
val filtered = spark.sql("""
SELECT name, salary
FROM employee_orc
WHERE hire_date >= '2020-01-01'
AND department IN ('Engineering', 'Sales')
AND salary > 75000
""")
// Check pushdown in query plan
filtered.explain(true)
// Shows: PushedFilters: [IsNotNull(hire_date), GreaterThanOrEqual(hire_date,...)]Schema Evolution:
// Handle schema evolution in ORC files
spark.conf.set("spark.sql.orc.mergeSchema", "true")
// Read tables with evolved schemas
val evolvedDF = spark.sql("SELECT * FROM evolved_orc_table")
evolvedDF.printSchema() // Shows merged schema from all filesFormat Detection and Conversion:
// Check table storage format
val tableInfo = spark.sql("DESCRIBE FORMATTED my_table").collect()
val storageFormat = tableInfo.find(_.getString(0) == "InputFormat").map(_.getString(1))
// Convert table format
spark.sql("""
CREATE TABLE orc_converted
STORED AS ORC
AS SELECT * FROM textfile_table
""")
// Optimize table by converting format
spark.sql("""
INSERT OVERWRITE TABLE existing_table
SELECT * FROM existing_table
""") // Uses current table's optimal formatFile Statistics and Metadata:
// Get file-level statistics for ORC
val stats = spark.sql("""
SELECT
input_file_name() as filename,
count(*) as row_count
FROM orc_table
GROUP BY input_file_name()
""")
stats.show()
// Analyze table statistics
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS")
spark.sql("DESCRIBE EXTENDED my_table").show(100, false)Common error patterns and solutions for file format operations:
import org.apache.spark.sql.AnalysisException
try {
spark.sql("SELECT * FROM malformed_table")
} catch {
case e: AnalysisException if e.getMessage.contains("SerDe") =>
println("SerDe configuration error - check SerDe properties")
case e: AnalysisException if e.getMessage.contains("InputFormat") =>
println("InputFormat error - verify file format configuration")
case e: Exception =>
println(s"File format error: ${e.getMessage}")
}
// Handle missing files gracefully
val safeDF = try {
spark.sql("SELECT * FROM potentially_missing_table")
} catch {
case _: AnalysisException =>
spark.emptyDataFrame
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-hive-2-11