0
# File Format Support
1
2
Support for Hive-compatible file formats, particularly ORC files with Hive metadata integration.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.execution.datasources.FileFormat
8
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
9
import org.apache.spark.sql.hive.orc.OrcFileOperator
10
import org.apache.spark.sql.hive.execution.HiveFileFormat
11
import org.apache.spark.sql.sources.DataSourceRegister
12
```
13
14
## Capabilities
15
16
### ORC File Format
17
18
Primary file format implementation for reading and writing ORC files with full Hive compatibility.
19
20
```scala { .api }
21
class OrcFileFormat extends FileFormat with DataSourceRegister {
22
23
/**
24
* Infer schema from ORC files
25
* @param sparkSession Active Spark session
26
* @param options Format-specific options
27
* @param files Sequence of file statuses to analyze
28
* @returns Inferred schema or None if cannot infer
29
*/
30
def inferSchema(
31
sparkSession: SparkSession,
32
options: Map[String, String],
33
files: Seq[FileStatus]
34
): Option[StructType]
35
36
/**
37
* Prepare write operations for ORC format
38
* @param sparkSession Active Spark session
39
* @param job Hadoop job configuration
40
* @param options Write options and settings
41
* @param dataSchema Schema of data to write
42
* @returns OutputWriterFactory for creating writers
43
*/
44
def prepareWrite(
45
sparkSession: SparkSession,
46
job: Job,
47
options: Map[String, String],
48
dataSchema: StructType
49
): OutputWriterFactory
50
51
/**
52
* Build reader for scanning ORC files
53
* @param sparkSession Active Spark session
54
* @param dataSchema Schema of data in files
55
* @param partitionSchema Schema of partition columns
56
* @param requiredSchema Schema of required columns
57
* @param filters Push-down filters for optimization
58
* @param options Read options and settings
59
* @param hadoopConf Hadoop configuration
60
* @returns Function to create PartitionedFile readers
61
*/
62
def buildReader(
63
sparkSession: SparkSession,
64
dataSchema: StructType,
65
partitionSchema: StructType,
66
requiredSchema: StructType,
67
filters: Seq[Filter],
68
options: Map[String, String],
69
hadoopConf: Configuration
70
): PartitionedFile => Iterator[InternalRow]
71
72
/** Short name for this data source format */
73
def shortName(): String = "orc"
74
}
75
```
76
77
### ORC File Operations
78
79
Utility operations for ORC file handling and metadata.
80
81
```scala { .api }
82
object OrcFileOperator extends Logging {
83
84
/**
85
* Read schema from ORC file footer
86
* @param file Path to ORC file
87
* @param conf Hadoop configuration
88
* @param ignoreCorruptFiles Whether to ignore corrupt files
89
* @returns Tuple of (schema, user metadata)
90
*/
91
def readSchema(
92
file: Path,
93
conf: Configuration,
94
ignoreCorruptFiles: Boolean
95
): Option[(StructType, Map[String, String])]
96
97
/**
98
* Read ORC file metadata including statistics
99
* @param files Sequence of ORC files to read
100
* @param conf Hadoop configuration
101
* @returns Aggregated file metadata
102
*/
103
def readFileMetadata(
104
files: Seq[Path],
105
conf: Configuration
106
): Map[String, String]
107
}
108
```
109
110
### Hive File Format Integration
111
112
Integration layer for Hive-specific file format operations.
113
114
```scala { .api }
115
private[hive] class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
116
117
/**
118
* Prepare write using Hive OutputFormat
119
* @param sparkSession Active Spark session
120
* @param job Hadoop job for write operation
121
* @param options Write configuration options
122
* @param dataSchema Schema of data to be written
123
* @returns OutputWriterFactory using Hive serialization
124
*/
125
def prepareWrite(
126
sparkSession: SparkSession,
127
job: Job,
128
options: Map[String, String],
129
dataSchema: StructType
130
): OutputWriterFactory
131
132
/**
133
* Build reader using Hive InputFormat and SerDe
134
* @param sparkSession Active Spark session
135
* @param dataSchema Full schema of data files
136
* @param partitionSchema Schema of partition columns
137
* @param requiredSchema Schema of columns to read
138
* @param filters Filters to push down to storage
139
* @param options Read configuration options
140
* @param hadoopConf Hadoop configuration
141
* @returns Function to read PartitionedFile
142
*/
143
def buildReader(
144
sparkSession: SparkSession,
145
dataSchema: StructType,
146
partitionSchema: StructType,
147
requiredSchema: StructType,
148
filters: Seq[Filter],
149
options: Map[String, String],
150
hadoopConf: Configuration
151
): PartitionedFile => Iterator[InternalRow]
152
}
153
```
154
155
### Java Integration Classes
156
157
Low-level Java classes for ORC integration.
158
159
```scala { .api }
160
// Note: These are Java classes with Scala signatures for documentation
161
162
/**
163
* Custom ORC record reader optimized for Spark
164
*/
165
class SparkOrcNewRecordReader extends RecordReader[NullWritable, VectorizedRowBatch] {
166
167
/**
168
* Initialize the record reader
169
* @param inputSplit Input split to read
170
* @param context Task attempt context
171
*/
172
def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit
173
174
/**
175
* Read next batch of records
176
* @returns true if more records available
177
*/
178
def nextKeyValue(): Boolean
179
180
/**
181
* Get current key (always null for ORC)
182
* @returns NullWritable key
183
*/
184
def getCurrentKey(): NullWritable
185
186
/**
187
* Get current batch of records
188
* @returns VectorizedRowBatch containing records
189
*/
190
def getCurrentValue(): VectorizedRowBatch
191
192
/**
193
* Get reading progress as percentage
194
* @returns Progress between 0.0 and 1.0
195
*/
196
def getProgress(): Float
197
198
/** Close the record reader */
199
def close(): Unit
200
}
201
202
/**
203
* Input format for handling symlinked text files
204
*/
205
class DelegateSymlinkTextInputFormat extends TextInputFormat {
206
207
/**
208
* Get input splits for symlinked files
209
* @param job Job configuration
210
* @returns Array of input splits
211
*/
212
def getSplits(job: JobContext): java.util.List[InputSplit]
213
}
214
```
215
216
## Usage Examples
217
218
### Reading ORC Files
219
220
```scala
221
import org.apache.spark.sql.SparkSession
222
223
val spark = SparkSession.builder()
224
.enableHiveSupport()
225
.getOrCreate()
226
227
// Read ORC files directly
228
val orcDF = spark.read
229
.format("orc")
230
.option("mergeSchema", "true")
231
.load("/path/to/orc/files")
232
233
orcDF.printSchema()
234
orcDF.show()
235
236
// Read Hive ORC table
237
val hiveOrcTable = spark.sql("SELECT * FROM hive_orc_table")
238
hiveOrcTable.explain(true)
239
```
240
241
### Writing ORC Files
242
243
```scala
244
import org.apache.spark.sql.SaveMode
245
246
// Create sample data
247
val data = Seq(
248
(1, "Alice", 25),
249
(2, "Bob", 30),
250
(3, "Charlie", 35)
251
).toDF("id", "name", "age")
252
253
// Write as ORC with Hive compatibility
254
data.write
255
.mode(SaveMode.Overwrite)
256
.option("compression", "snappy")
257
.format("orc")
258
.save("/path/to/output/orc")
259
260
// Write to Hive table using ORC format
261
data.write
262
.mode(SaveMode.Overwrite)
263
.saveAsTable("my_database.orc_table")
264
```
265
266
### Schema Evolution and Merging
267
268
```scala
269
// Enable schema merging for ORC files
270
val mergedDF = spark.read
271
.format("orc")
272
.option("mergeSchema", "true")
273
.load("/path/to/orc/files/*")
274
275
// Handle schema evolution gracefully
276
val evolvedDF = spark.read
277
.format("orc")
278
.option("recursiveFileLookup", "true")
279
.load("/path/to/evolved/schema/files")
280
281
// Check for schema differences
282
mergedDF.printSchema()
283
evolvedDF.printSchema()
284
```
285
286
## Configuration Options
287
288
### ORC-Specific Options
289
290
```scala
291
// Read options
292
val orcOptions = Map(
293
"mergeSchema" -> "true", // Merge schemas from multiple files
294
"recursiveFileLookup" -> "true", // Recursively look for files
295
"ignoreCorruptFiles" -> "false", // Fail on corrupt files
296
"compression" -> "snappy" // Compression codec
297
)
298
299
// Write options
300
val writeOptions = Map(
301
"compression" -> "zlib", // zlib, snappy, lzo, lz4, none
302
"orc.compress" -> "SNAPPY", // ORC compression
303
"orc.stripe.size" -> "67108864", // 64MB stripe size
304
"orc.block.size" -> "268435456" // 256MB block size
305
)
306
```
307
308
### Hive Integration Settings
309
310
```scala
311
// Configure ORC conversion from Hive metastore
312
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "true")
313
spark.conf.set("spark.sql.orc.impl", "native")
314
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
315
```
316
317
## Error Handling
318
319
Common file format exceptions:
320
321
- **CorruptedFileException**: When ORC files are corrupted or unreadable
322
- **UnsupportedFileFormatException**: When file format is not supported
323
- **SchemaIncompatibleException**: When schemas cannot be merged or converted
324
325
```scala
326
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
327
328
try {
329
val df = spark.read.format("orc").load("/path/to/corrupt/files")
330
df.count()
331
} catch {
332
case e: java.io.IOException if e.getMessage.contains("Malformed ORC file") =>
333
println("ORC file is corrupted")
334
case e: AnalysisException if e.getMessage.contains("Unable to infer schema") =>
335
println("Cannot determine schema from ORC files")
336
}
337
```
338
339
## Types
340
341
### File Format Types
342
343
```scala { .api }
344
case class OrcOptions(
345
parameters: Map[String, String]
346
) {
347
def mergeSchema: Boolean
348
def ignoreCorruptFiles: Boolean
349
def recursiveFileLookup: Boolean
350
def compression: String
351
}
352
353
```scala { .api }
354
trait FileFormat {
355
def inferSchema(
356
sparkSession: SparkSession,
357
options: Map[String, String],
358
files: Seq[FileStatus]
359
): Option[StructType]
360
361
def prepareWrite(
362
sparkSession: SparkSession,
363
job: Job,
364
options: Map[String, String],
365
dataSchema: StructType
366
): OutputWriterFactory
367
}
368
```