Apache Spark Hive integration provides comprehensive support for various file formats, with native optimized readers and writers for ORC and Parquet formats, as well as compatibility with traditional Hive file formats.
Spark provides native ORC support with advanced optimizations including vectorized reading, predicate pushdown, and column pruning.
class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable {
// DataSourceRegister interface
def shortName(): String // Returns "orc"
// FileFormat interface
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
}Reading ORC Files:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Read ORC files directly
val df = spark.read.format("orc").load("/path/to/orc/files")
// Read ORC table through Hive metastore
val table = spark.table("my_orc_table")
// Read with options
val dfWithOptions = spark.read
.format("orc")
.option("mergeSchema", "true")
.load("/path/to/orc/files")Writing ORC Files:
// Write DataFrame as ORC
df.write
.format("orc")
.option("compression", "snappy")
.save("/path/to/output/orc")
// Write to Hive table
df.write
.format("orc")
.mode("overwrite")
.saveAsTable("my_new_orc_table")
// Partitioned write
df.write
.format("orc")
.partitionBy("year", "month")
.save("/path/to/partitioned/orc")class OrcOptions(parameters: CaseInsensitiveMap[String]) {
def compressionCodec: String
def enableVectorizedReader: Boolean
def mergeSchema: Boolean
}Available ORC Options:
object OrcFileOperator extends Logging {
def readSchema(files: Seq[String], conf: Option[Configuration]): Option[StructType]
def listOrcFiles(path: String, hadoopConf: Configuration): Seq[String]
def getRowCount(file: String, conf: Configuration): Long
}Usage Example:
import org.apache.spark.sql.hive.orc.OrcFileOperator
// Read schema from ORC files
val schema = OrcFileOperator.readSchema(Seq("/path/to/file.orc"), None)
println(s"Schema: ${schema.get.treeString}")
// Get row count
val conf = spark.sparkContext.hadoopConfiguration
val rowCount = OrcFileOperator.getRowCount("/path/to/file.orc", conf)object OrcFilters extends Logging {
def createFilter(filters: Seq[Filter]): Option[SearchArgument]
def buildSearchArgument(
dataTypeMap: Map[String, DataType],
filters: Seq[Filter]
): Option[SearchArgument]
}ORC supports predicate pushdown for:
=)<, <=, >, >=)Support for traditional Hive file formats using Hive SerDes and input/output formats.
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
}Configuration for Hive-compatible file formats and SerDes.
class HiveOptions(parameters: CaseInsensitiveMap[String]) {
def fileFormat: String
def inputFormat: String
def outputFormat: String
def serde: String
def serdeProperties: Map[String, String]
}Text Files:
// Create table with text file format
spark.sql("""
CREATE TABLE text_table (
id INT,
name STRING
) USING HIVE
STORED AS TEXTFILE
""")Sequence Files:
spark.sql("""
CREATE TABLE seq_table (
id INT,
name STRING
) USING HIVE
STORED AS SEQUENCEFILE
""")Avro Files:
spark.sql("""
CREATE TABLE avro_table (
id INT,
name STRING
) USING HIVE
STORED AS AVRO
""")Custom SerDe:
spark.sql("""
CREATE TABLE custom_table (
id INT,
name STRING
) USING HIVE
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim' = '\t',
'line.delim' = '\n'
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
""")While Parquet support is primarily handled by Spark's native Parquet reader, the Hive integration provides compatibility for Hive-created Parquet tables.
// Enable native Parquet reader for Hive tables
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "true")
// Configure Parquet options
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")// Read Hive Parquet table with native reader
val df = spark.table("my_parquet_table")
// Force use of Hive SerDe (not recommended)
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
val dfWithSerde = spark.table("my_parquet_table")Support for schema evolution in ORC and Parquet formats:
// Enable schema merging for ORC
val df = spark.read
.format("orc")
.option("mergeSchema", "true")
.load("/path/to/evolved/schema")
// Handle missing columns
val dfWithDefaults = spark.read
.format("orc")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/files")ORC Compression Options:
Setting Compression:
// For writes
df.write
.format("orc")
.option("compression", "snappy")
.save("/path/to/output")
// Global setting
spark.conf.set("spark.sql.orc.compression.codec", "snappy")File formats support both static and dynamic partitioning:
// Static partitioning
df.write
.format("orc")
.partitionBy("year", "month")
.save("/partitioned/data")
// Dynamic partitioning in Hive tables
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("""
INSERT INTO TABLE partitioned_table
PARTITION(year, month)
SELECT id, name, year, month FROM source_table
""")Support for bucketed tables for improved join performance:
// Create bucketed table
spark.sql("""
CREATE TABLE bucketed_table (
id INT,
name STRING,
department STRING
) USING HIVE
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
""")
// Write to bucketed table
df.write
.format("orc")
.bucketBy(4, "id")
.saveAsTable("bucketed_table")Common file format errors and solutions:
// Error: Unsupported file format
// Solution: Ensure format is supported or use appropriate SerDe
spark.sql("""
CREATE TABLE custom_format_table (...)
STORED AS INPUTFORMAT 'custom.input.format'
OUTPUTFORMAT 'custom.output.format'
""")// Error: Schema mismatch between file and table
// Solution: Enable schema evolution or fix schema
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")// Error: Unsupported compression codec
// Solution: Use supported codec or install required libraries
df.write.format("orc").option("compression", "snappy").save(path)// Enable vectorized reading
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
// Configure split size
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") // 128MB
// Enable bloom filters
spark.sql("""
CREATE TABLE optimized_table (...)
USING HIVE
STORED AS ORC
TBLPROPERTIES (
'orc.bloom.filter.columns'='id,name',
'orc.create.index'='true'
)
""")// Control file size during writes
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "67108864") // 64MB
// Coalesce small files
df.coalesce(1).write.format("orc").save(path)// File format interface
trait FileFormat {
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
}
// Data source registration
trait DataSourceRegister {
def shortName(): String
}
// Hive file sink configuration
case class FileSinkDesc(
dirName: String,
tableInfo: TableDesc,
compressed: Boolean,
destTableId: Int,
compressCodec: String
)
// Table description for Hive
case class TableDesc(
inputFormat: Class[_ <: InputFormat[_, _]],
outputFormat: Class[_ <: OutputFormat[_, _]],
properties: Properties
)
// ORC search argument for predicate pushdown
trait SearchArgument {
def getLeaves(): java.util.List[PredicateLeaf]
def getExpression(): ExpressionTree
}
// File status information
case class FileStatus(
path: String,
length: Long,
isDirectory: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long
)