Native support for Hive file formats including traditional Hive tables and optimized ORC files with Hive compatibility. This enables reading and writing data in various Hive-compatible formats while leveraging Spark's performance optimizations.
FileFormat implementation for traditional Hive tables supporting various SerDes and storage formats.
⚠️ Implementation Status: Currently write-only. Read operations are not implemented and will throw UnsupportedOperationException.
/**
* FileFormat implementation for Hive tables
* Currently supports WRITING data using Hive SerDes and OutputFormat classes
* Reading is not implemented - use HiveTableScanExec for reading Hive tables
*/
class HiveFileFormat extends FileFormat with DataSourceRegister with Logging {
/** Data source short name for SQL registration */
override def shortName(): String = "hive"
/**
* Schema inference is not supported - throws UnsupportedOperationException
* Schema must be provided from Hive metastore
*/
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
/**
* Prepare write operations for Hive tables
* @param sparkSession - Current SparkSession
* @param job - Hadoop Job configuration
* @param options - Write options including SerDe settings
* @param dataSchema - Schema of data to write
* @return OutputWriterFactory for creating individual file writers
*/
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
/**
* Build reader for scanning Hive table files
* @param sparkSession - Current SparkSession
* @param dataSchema - Schema of data to read
* @param partitionSchema - Schema of partition columns
* @param requiredSchema - Columns actually needed by query
* @param filters - Pushdown predicates
* @param options - Read options including SerDe settings
* @param hadoopConf - Hadoop configuration
* @return Function to create PartitionedFile readers
*/
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
}Usage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Create Hive table with specific file format
spark.sql("""
CREATE TABLE text_table (
id INT,
name STRING,
age INT
)
STORED AS TEXTFILE
LOCATION '/user/data/text_table'
""")
// Read from Hive table (automatically uses HiveFileFormat)
val df = spark.sql("SELECT * FROM text_table")
df.show()
// Write to Hive table with custom SerDe
spark.sql("""
CREATE TABLE custom_serde_table (
id INT,
data STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = "\"",
"escapeChar" = "\\"
)
STORED AS TEXTFILE
""")Hive-compatible ORC file format implementation with optimizations.
⚠️ Implementation Note: There are two ORC implementations in Spark:
org.apache.spark.sql.hive.orc.OrcFileFormat) - Uses Hive ORC libraries for compatibilityorg.apache.spark.sql.execution.datasources.orc.OrcFileFormat) - Uses Apache ORC directly for better performanceThe Hive ORC implementation documented here provides better Hive compatibility but may have different performance characteristics.
/**
* Hive-compatible ORC file format implementation
* Provides native ORC reading/writing with Hive metastore integration
*/
class OrcFileFormat extends FileFormat with DataSourceRegister with Logging {
/** Data source short name */
override def shortName(): String = "orc"
/**
* Infer schema from ORC files
* @param sparkSession - Current SparkSession
* @param options - Read options
* @param files - ORC files to analyze
* @return Inferred schema or None if cannot infer
*/
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
/**
* Build optimized ORC reader with predicate pushdown
* @param sparkSession - Current SparkSession
* @param dataSchema - Schema of data in files
* @param partitionSchema - Partition column schema
* @param requiredSchema - Columns needed by query
* @param filters - Pushdown predicates for ORC row groups
* @param options - Read options
* @param hadoopConf - Hadoop configuration
* @return Reader function for ORC files
*/
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
/**
* Prepare ORC write operations
* @param sparkSession - Current SparkSession
* @param job - Hadoop Job for configuration
* @param options - Write options including compression
* @param dataSchema - Schema of data to write
* @return OutputWriterFactory for creating ORC writers
*/
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
/**
* Check if vectorized reading is supported
* @param requiredSchema - Required columns
* @return true if vectorized reading can be used
*/
override def supportBatch(requiredSchema: StructType): Boolean = true
}Usage Examples:
// Create ORC table
spark.sql("""
CREATE TABLE orc_table (
id BIGINT,
name STRING,
age INT,
salary DOUBLE
)
STORED AS ORC
LOCATION '/user/data/orc_table'
TBLPROPERTIES (
'orc.compress' = 'SNAPPY',
'orc.bloom.filter.columns' = 'id,name'
)
""")
// Write data to ORC table with compression
df.write
.mode("overwrite")
.option("compression", "snappy")
.format("orc")
.saveAsTable("orc_table")
// Read with predicate pushdown (automatically optimized)
val filtered = spark.sql("""
SELECT name, salary
FROM orc_table
WHERE age > 25 AND salary > 50000
""")
filtered.explain(true) // Shows predicate pushdownConfiguration class for Hive-specific format options.
/**
* Configuration options for Hive data source operations
*/
class HiveOptions(parameters: Map[String, String]) {
/** File format specification (e.g., "textfile", "sequencefile", "orc") */
val fileFormat: Option[String] = parameters.get(HiveOptions.FILE_FORMAT)
/** Input format class name */
val inputFormat: Option[String] = parameters.get(HiveOptions.INPUT_FORMAT)
/** Output format class name */
val outputFormat: Option[String] = parameters.get(HiveOptions.OUTPUT_FORMAT)
/** SerDe class name */
val serde: Option[String] = parameters.get(HiveOptions.SERDE)
/**
* Check if input/output formats are explicitly specified
* @return true if both input and output formats are provided
*/
def hasInputOutputFormat: Boolean = inputFormat.isDefined && outputFormat.isDefined
/**
* Get SerDe properties from options
* @return Map of SerDe-specific properties
*/
def serdeProperties: Map[String, String] = {
parameters.filterKeys(!HiveOptions.delimiterOptions.contains(_))
}
}
object HiveOptions {
// Option key constants
val FILE_FORMAT = "fileFormat"
val INPUT_FORMAT = "inputFormat"
val OUTPUT_FORMAT = "outputFormat"
val SERDE = "serde"
// Common delimiter option mappings
val delimiterOptions: Map[String, String] = Map(
"field.delim" -> "field.delim",
"line.delim" -> "line.delim",
"collection.delim" -> "collection.delim",
"mapkey.delim" -> "mapkey.delim"
)
/**
* Get compression configuration for Hive writes
* @param sessionState - Hive SessionState
* @param hadoopConf - Hadoop configuration
* @param compressionCodec - Optional compression codec override
* @return Compression codec to use or None
*/
def getHiveWriteCompression(
sessionState: SessionState,
hadoopConf: Configuration,
compressionCodec: Option[String]
): Option[String]
}Comprehensive examples for different Hive file formats.
TextFile Format:
// Create table with TextFile format and custom delimiters
spark.sql("""
CREATE TABLE csv_data (
id INT,
name STRING,
email STRING,
age INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim' = ',',
'line.delim' = '\n',
'serialization.format' = ','
)
STORED AS TEXTFILE
LOCATION '/data/csv'
""")
// Load CSV data
spark.sql("""
LOAD DATA INPATH '/input/data.csv'
INTO TABLE csv_data
""")SequenceFile Format:
// Create SequenceFile table
spark.sql("""
CREATE TABLE sequence_data (
key STRING,
value STRING
)
STORED AS SEQUENCEFILE
LOCATION '/data/sequence'
""")
// Write data in SequenceFile format
df.write
.format("hive")
.option("fileFormat", "sequencefile")
.mode("overwrite")
.saveAsTable("sequence_data")Avro Format:
// Create Avro table
spark.sql("""
CREATE TABLE avro_data (
id BIGINT,
name STRING,
metadata MAP<STRING,STRING>
)
STORED AS AVRO
LOCATION '/data/avro'
TBLPROPERTIES (
'avro.schema.literal' = '{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}'
)
""")Parquet Format (with Hive compatibility):
// Create Parquet table with Hive metastore
spark.sql("""
CREATE TABLE parquet_data (
id BIGINT,
name STRING,
created_date DATE
)
STORED AS PARQUET
LOCATION '/data/parquet'
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY'
)
""")
// Automatic conversion to Spark's native Parquet reader
// (controlled by spark.sql.hive.convertMetastoreParquet)
val df = spark.sql("SELECT * FROM parquet_data")
df.explain() // Shows either HiveTableRelation or parquet scanCustom SerDe Integration:
// Register custom SerDe
spark.sql("ADD JAR /path/to/custom-serde.jar")
spark.sql("""
CREATE TABLE json_data (
id INT,
data STRING
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/data/json'
""")
// Use custom SerDe for complex data
val jsonDF = spark.sql("SELECT get_json_object(data, '$.user.name') as username FROM json_data")Multi-Format Table Operations:
// Create partitioned table with different formats per partition
spark.sql("""
CREATE TABLE multi_format_data (
id INT,
name STRING,
value DOUBLE
)
PARTITIONED BY (format_type STRING)
STORED AS TEXTFILE
LOCATION '/data/multi_format'
""")
// Add partitions with different storage formats
spark.sql("""
ALTER TABLE multi_format_data
ADD PARTITION (format_type='text')
LOCATION '/data/multi_format/text'
""")
spark.sql("""
ALTER TABLE multi_format_data
ADD PARTITION (format_type='orc')
LOCATION '/data/multi_format/orc'
""")Compression Configuration:
// Configure compression for different formats
val spark = SparkSession.builder()
.config("spark.sql.hive.convertMetastoreOrc", "true")
.config("spark.sql.orc.compression.codec", "snappy")
.config("spark.sql.parquet.compression.codec", "gzip")
.enableHiveSupport()
.getOrCreate()
// Write with specific compression
df.write
.format("orc")
.option("compression", "zlib")
.mode("overwrite")
.saveAsTable("compressed_table")Vectorized Reading:
// Enable vectorized ORC reading
spark.conf.set("spark.sql.orc.impl", "hive")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
// Query benefits from vectorization
val result = spark.sql("""
SELECT sum(salary), avg(age)
FROM large_orc_table
WHERE department = 'Engineering'
""")
result.explain() // Shows vectorized operationsPredicate Pushdown:
// ORC predicate pushdown automatically applied
val filtered = spark.sql("""
SELECT name, salary
FROM employee_orc
WHERE hire_date >= '2020-01-01'
AND department IN ('Engineering', 'Sales')
AND salary > 75000
""")
// Check pushdown in query plan
filtered.explain(true)
// Shows: PushedFilters: [IsNotNull(hire_date), GreaterThanOrEqual(hire_date,...)]Schema Evolution:
// Handle schema evolution in ORC files
spark.conf.set("spark.sql.orc.mergeSchema", "true")
// Read tables with evolved schemas
val evolvedDF = spark.sql("SELECT * FROM evolved_orc_table")
evolvedDF.printSchema() // Shows merged schema from all filesFormat Detection and Conversion:
// Check table storage format
val tableInfo = spark.sql("DESCRIBE FORMATTED my_table").collect()
val storageFormat = tableInfo.find(_.getString(0) == "InputFormat").map(_.getString(1))
// Convert table format
spark.sql("""
CREATE TABLE orc_converted
STORED AS ORC
AS SELECT * FROM textfile_table
""")
// Optimize table by converting format
spark.sql("""
INSERT OVERWRITE TABLE existing_table
SELECT * FROM existing_table
""") // Uses current table's optimal formatFile Statistics and Metadata:
// Get file-level statistics for ORC
val stats = spark.sql("""
SELECT
input_file_name() as filename,
count(*) as row_count
FROM orc_table
GROUP BY input_file_name()
""")
stats.show()
// Analyze table statistics
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS")
spark.sql("DESCRIBE EXTENDED my_table").show(100, false)Common error patterns and solutions for file format operations:
import org.apache.spark.sql.AnalysisException
try {
spark.sql("SELECT * FROM malformed_table")
} catch {
case e: AnalysisException if e.getMessage.contains("SerDe") =>
println("SerDe configuration error - check SerDe properties")
case e: AnalysisException if e.getMessage.contains("InputFormat") =>
println("InputFormat error - verify file format configuration")
case e: Exception =>
println(s"File format error: ${e.getMessage}")
}
// Handle missing files gracefully
val safeDF = try {
spark.sql("SELECT * FROM potentially_missing_table")
} catch {
case _: AnalysisException =>
spark.emptyDataFrame
}