Support for Hive-compatible file formats, particularly ORC files with Hive metadata integration.
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.sources.DataSourceRegisterPrimary file format implementation for reading and writing ORC files with full Hive compatibility.
class OrcFileFormat extends FileFormat with DataSourceRegister {
/**
* Infer schema from ORC files
* @param sparkSession Active Spark session
* @param options Format-specific options
* @param files Sequence of file statuses to analyze
* @returns Inferred schema or None if cannot infer
*/
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
/**
* Prepare write operations for ORC format
* @param sparkSession Active Spark session
* @param job Hadoop job configuration
* @param options Write options and settings
* @param dataSchema Schema of data to write
* @returns OutputWriterFactory for creating writers
*/
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
/**
* Build reader for scanning ORC files
* @param sparkSession Active Spark session
* @param dataSchema Schema of data in files
* @param partitionSchema Schema of partition columns
* @param requiredSchema Schema of required columns
* @param filters Push-down filters for optimization
* @param options Read options and settings
* @param hadoopConf Hadoop configuration
* @returns Function to create PartitionedFile readers
*/
def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
/** Short name for this data source format */
def shortName(): String = "orc"
}Utility operations for ORC file handling and metadata.
object OrcFileOperator extends Logging {
/**
* Read schema from ORC file footer
* @param file Path to ORC file
* @param conf Hadoop configuration
* @param ignoreCorruptFiles Whether to ignore corrupt files
* @returns Tuple of (schema, user metadata)
*/
def readSchema(
file: Path,
conf: Configuration,
ignoreCorruptFiles: Boolean
): Option[(StructType, Map[String, String])]
/**
* Read ORC file metadata including statistics
* @param files Sequence of ORC files to read
* @param conf Hadoop configuration
* @returns Aggregated file metadata
*/
def readFileMetadata(
files: Seq[Path],
conf: Configuration
): Map[String, String]
}Integration layer for Hive-specific file format operations.
private[hive] class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
/**
* Prepare write using Hive OutputFormat
* @param sparkSession Active Spark session
* @param job Hadoop job for write operation
* @param options Write configuration options
* @param dataSchema Schema of data to be written
* @returns OutputWriterFactory using Hive serialization
*/
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
/**
* Build reader using Hive InputFormat and SerDe
* @param sparkSession Active Spark session
* @param dataSchema Full schema of data files
* @param partitionSchema Schema of partition columns
* @param requiredSchema Schema of columns to read
* @param filters Filters to push down to storage
* @param options Read configuration options
* @param hadoopConf Hadoop configuration
* @returns Function to read PartitionedFile
*/
def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration
): PartitionedFile => Iterator[InternalRow]
}Low-level Java classes for ORC integration.
// Note: These are Java classes with Scala signatures for documentation
/**
* Custom ORC record reader optimized for Spark
*/
class SparkOrcNewRecordReader extends RecordReader[NullWritable, VectorizedRowBatch] {
/**
* Initialize the record reader
* @param inputSplit Input split to read
* @param context Task attempt context
*/
def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit
/**
* Read next batch of records
* @returns true if more records available
*/
def nextKeyValue(): Boolean
/**
* Get current key (always null for ORC)
* @returns NullWritable key
*/
def getCurrentKey(): NullWritable
/**
* Get current batch of records
* @returns VectorizedRowBatch containing records
*/
def getCurrentValue(): VectorizedRowBatch
/**
* Get reading progress as percentage
* @returns Progress between 0.0 and 1.0
*/
def getProgress(): Float
/** Close the record reader */
def close(): Unit
}
/**
* Input format for handling symlinked text files
*/
class DelegateSymlinkTextInputFormat extends TextInputFormat {
/**
* Get input splits for symlinked files
* @param job Job configuration
* @returns Array of input splits
*/
def getSplits(job: JobContext): java.util.List[InputSplit]
}import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Read ORC files directly
val orcDF = spark.read
.format("orc")
.option("mergeSchema", "true")
.load("/path/to/orc/files")
orcDF.printSchema()
orcDF.show()
// Read Hive ORC table
val hiveOrcTable = spark.sql("SELECT * FROM hive_orc_table")
hiveOrcTable.explain(true)import org.apache.spark.sql.SaveMode
// Create sample data
val data = Seq(
(1, "Alice", 25),
(2, "Bob", 30),
(3, "Charlie", 35)
).toDF("id", "name", "age")
// Write as ORC with Hive compatibility
data.write
.mode(SaveMode.Overwrite)
.option("compression", "snappy")
.format("orc")
.save("/path/to/output/orc")
// Write to Hive table using ORC format
data.write
.mode(SaveMode.Overwrite)
.saveAsTable("my_database.orc_table")// Enable schema merging for ORC files
val mergedDF = spark.read
.format("orc")
.option("mergeSchema", "true")
.load("/path/to/orc/files/*")
// Handle schema evolution gracefully
val evolvedDF = spark.read
.format("orc")
.option("recursiveFileLookup", "true")
.load("/path/to/evolved/schema/files")
// Check for schema differences
mergedDF.printSchema()
evolvedDF.printSchema()// Read options
val orcOptions = Map(
"mergeSchema" -> "true", // Merge schemas from multiple files
"recursiveFileLookup" -> "true", // Recursively look for files
"ignoreCorruptFiles" -> "false", // Fail on corrupt files
"compression" -> "snappy" // Compression codec
)
// Write options
val writeOptions = Map(
"compression" -> "zlib", // zlib, snappy, lzo, lz4, none
"orc.compress" -> "SNAPPY", // ORC compression
"orc.stripe.size" -> "67108864", // 64MB stripe size
"orc.block.size" -> "268435456" // 256MB block size
)// Configure ORC conversion from Hive metastore
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "true")
spark.conf.set("spark.sql.orc.impl", "native")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")Common file format exceptions:
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
try {
val df = spark.read.format("orc").load("/path/to/corrupt/files")
df.count()
} catch {
case e: java.io.IOException if e.getMessage.contains("Malformed ORC file") =>
println("ORC file is corrupted")
case e: AnalysisException if e.getMessage.contains("Unable to infer schema") =>
println("Cannot determine schema from ORC files")
}case class OrcOptions(
parameters: Map[String, String]
) {
def mergeSchema: Boolean
def ignoreCorruptFiles: Boolean
def recursiveFileLookup: Boolean
def compression: String
}
```scala { .api }
trait FileFormat {
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
}