0
# File Format Support
1
2
Apache Spark Hive integration provides comprehensive support for various file formats, with native optimized readers and writers for ORC and Parquet formats, as well as compatibility with traditional Hive file formats.
3
4
## ORC File Format Support
5
6
Spark provides native ORC support with advanced optimizations including vectorized reading, predicate pushdown, and column pruning.
7
8
### OrcFileFormat
9
10
```scala { .api }
11
class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable {
12
// DataSourceRegister interface
13
def shortName(): String // Returns "orc"
14
15
// FileFormat interface
16
def inferSchema(
17
sparkSession: SparkSession,
18
options: Map[String, String],
19
files: Seq[FileStatus]
20
): Option[StructType]
21
22
def prepareWrite(
23
sparkSession: SparkSession,
24
job: Job,
25
options: Map[String, String],
26
dataSchema: StructType
27
): OutputWriterFactory
28
29
def buildReader(
30
sparkSession: SparkSession,
31
dataSchema: StructType,
32
partitionSchema: StructType,
33
requiredSchema: StructType,
34
filters: Seq[Filter],
35
options: Map[String, String],
36
hadoopConf: Configuration
37
): PartitionedFile => Iterator[InternalRow]
38
}
39
```
40
41
### Using ORC Format
42
43
**Reading ORC Files:**
44
45
```scala
46
import org.apache.spark.sql.SparkSession
47
48
val spark = SparkSession.builder()
49
.enableHiveSupport()
50
.getOrCreate()
51
52
// Read ORC files directly
53
val df = spark.read.format("orc").load("/path/to/orc/files")
54
55
// Read ORC table through Hive metastore
56
val table = spark.table("my_orc_table")
57
58
// Read with options
59
val dfWithOptions = spark.read
60
.format("orc")
61
.option("mergeSchema", "true")
62
.load("/path/to/orc/files")
63
```
64
65
**Writing ORC Files:**
66
67
```scala
68
// Write DataFrame as ORC
69
df.write
70
.format("orc")
71
.option("compression", "snappy")
72
.save("/path/to/output/orc")
73
74
// Write to Hive table
75
df.write
76
.format("orc")
77
.mode("overwrite")
78
.saveAsTable("my_new_orc_table")
79
80
// Partitioned write
81
df.write
82
.format("orc")
83
.partitionBy("year", "month")
84
.save("/path/to/partitioned/orc")
85
```
86
87
### ORC Configuration Options
88
89
```scala { .api }
90
class OrcOptions(parameters: CaseInsensitiveMap[String]) {
91
def compressionCodec: String
92
def enableVectorizedReader: Boolean
93
def mergeSchema: Boolean
94
}
95
```
96
97
**Available ORC Options:**
98
99
- **compression**: Compression codec ("none", "snappy", "zlib", "lzo", "lz4", "zstd")
100
- **enableVectorizedReader**: Enable vectorized ORC reader (default: true)
101
- **mergeSchema**: Merge schemas when reading multiple files (default: false)
102
103
### ORC File Operations
104
105
```scala { .api }
106
object OrcFileOperator extends Logging {
107
def readSchema(files: Seq[String], conf: Option[Configuration]): Option[StructType]
108
def listOrcFiles(path: String, hadoopConf: Configuration): Seq[String]
109
def getRowCount(file: String, conf: Configuration): Long
110
}
111
```
112
113
**Usage Example:**
114
115
```scala
116
import org.apache.spark.sql.hive.orc.OrcFileOperator
117
118
// Read schema from ORC files
119
val schema = OrcFileOperator.readSchema(Seq("/path/to/file.orc"), None)
120
println(s"Schema: ${schema.get.treeString}")
121
122
// Get row count
123
val conf = spark.sparkContext.hadoopConfiguration
124
val rowCount = OrcFileOperator.getRowCount("/path/to/file.orc", conf)
125
```
126
127
### ORC Filter Pushdown
128
129
```scala { .api }
130
object OrcFilters extends Logging {
131
def createFilter(filters: Seq[Filter]): Option[SearchArgument]
132
def buildSearchArgument(
133
dataTypeMap: Map[String, DataType],
134
filters: Seq[Filter]
135
): Option[SearchArgument]
136
}
137
```
138
139
ORC supports predicate pushdown for:
140
- Equality filters (`=`)
141
- Comparison filters (`<`, `<=`, `>`, `>=`)
142
- IN predicates
143
- IS NULL / IS NOT NULL
144
- String pattern matching (LIKE)
145
- Logical combinations (AND, OR, NOT)
146
147
## Hive File Format Support
148
149
Support for traditional Hive file formats using Hive SerDes and input/output formats.
150
151
### HiveFileFormat
152
153
```scala { .api }
154
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
155
def prepareWrite(
156
sparkSession: SparkSession,
157
job: Job,
158
options: Map[String, String],
159
dataSchema: StructType
160
): OutputWriterFactory
161
162
def buildReader(
163
sparkSession: SparkSession,
164
dataSchema: StructType,
165
partitionSchema: StructType,
166
requiredSchema: StructType,
167
filters: Seq[Filter],
168
options: Map[String, String],
169
hadoopConf: Configuration
170
): PartitionedFile => Iterator[InternalRow]
171
}
172
```
173
174
### HiveOptions
175
176
Configuration for Hive-compatible file formats and SerDes.
177
178
```scala { .api }
179
class HiveOptions(parameters: CaseInsensitiveMap[String]) {
180
def fileFormat: String
181
def inputFormat: String
182
def outputFormat: String
183
def serde: String
184
def serdeProperties: Map[String, String]
185
}
186
```
187
188
### Supported Hive File Formats
189
190
**Text Files:**
191
```scala
192
// Create table with text file format
193
spark.sql("""
194
CREATE TABLE text_table (
195
id INT,
196
name STRING
197
) USING HIVE
198
STORED AS TEXTFILE
199
""")
200
```
201
202
**Sequence Files:**
203
```scala
204
spark.sql("""
205
CREATE TABLE seq_table (
206
id INT,
207
name STRING
208
) USING HIVE
209
STORED AS SEQUENCEFILE
210
""")
211
```
212
213
**Avro Files:**
214
```scala
215
spark.sql("""
216
CREATE TABLE avro_table (
217
id INT,
218
name STRING
219
) USING HIVE
220
STORED AS AVRO
221
""")
222
```
223
224
**Custom SerDe:**
225
```scala
226
spark.sql("""
227
CREATE TABLE custom_table (
228
id INT,
229
name STRING
230
) USING HIVE
231
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
232
WITH SERDEPROPERTIES (
233
'field.delim' = '\t',
234
'line.delim' = '\n'
235
)
236
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
237
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
238
""")
239
```
240
241
## Parquet Integration
242
243
While Parquet support is primarily handled by Spark's native Parquet reader, the Hive integration provides compatibility for Hive-created Parquet tables.
244
245
### Parquet Configuration
246
247
```scala
248
// Enable native Parquet reader for Hive tables
249
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "true")
250
251
// Configure Parquet options
252
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
253
spark.conf.set("spark.sql.parquet.mergeSchema", "false")
254
```
255
256
### Reading Hive Parquet Tables
257
258
```scala
259
// Read Hive Parquet table with native reader
260
val df = spark.table("my_parquet_table")
261
262
// Force use of Hive SerDe (not recommended)
263
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
264
val dfWithSerde = spark.table("my_parquet_table")
265
```
266
267
## Advanced File Format Features
268
269
### Schema Evolution
270
271
Support for schema evolution in ORC and Parquet formats:
272
273
```scala
274
// Enable schema merging for ORC
275
val df = spark.read
276
.format("orc")
277
.option("mergeSchema", "true")
278
.load("/path/to/evolved/schema")
279
280
// Handle missing columns
281
val dfWithDefaults = spark.read
282
.format("orc")
283
.option("columnNameOfCorruptRecord", "_corrupt_record")
284
.load("/path/to/files")
285
```
286
287
### Compression Support
288
289
**ORC Compression Options:**
290
- NONE
291
- ZLIB (default)
292
- SNAPPY
293
- LZO
294
- LZ4
295
- ZSTD
296
297
**Setting Compression:**
298
```scala
299
// For writes
300
df.write
301
.format("orc")
302
.option("compression", "snappy")
303
.save("/path/to/output")
304
305
// Global setting
306
spark.conf.set("spark.sql.orc.compression.codec", "snappy")
307
```
308
309
### Partition Support
310
311
File formats support both static and dynamic partitioning:
312
313
```scala
314
// Static partitioning
315
df.write
316
.format("orc")
317
.partitionBy("year", "month")
318
.save("/partitioned/data")
319
320
// Dynamic partitioning in Hive tables
321
spark.sql("SET hive.exec.dynamic.partition = true")
322
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
323
324
spark.sql("""
325
INSERT INTO TABLE partitioned_table
326
PARTITION(year, month)
327
SELECT id, name, year, month FROM source_table
328
""")
329
```
330
331
### Bucketing
332
333
Support for bucketed tables for improved join performance:
334
335
```scala
336
// Create bucketed table
337
spark.sql("""
338
CREATE TABLE bucketed_table (
339
id INT,
340
name STRING,
341
department STRING
342
) USING HIVE
343
CLUSTERED BY (id) INTO 4 BUCKETS
344
STORED AS ORC
345
""")
346
347
// Write to bucketed table
348
df.write
349
.format("orc")
350
.bucketBy(4, "id")
351
.saveAsTable("bucketed_table")
352
```
353
354
## Error Handling
355
356
Common file format errors and solutions:
357
358
### Unsupported File Format
359
```scala
360
// Error: Unsupported file format
361
// Solution: Ensure format is supported or use appropriate SerDe
362
spark.sql("""
363
CREATE TABLE custom_format_table (...)
364
STORED AS INPUTFORMAT 'custom.input.format'
365
OUTPUTFORMAT 'custom.output.format'
366
""")
367
```
368
369
### Schema Mismatch
370
```scala
371
// Error: Schema mismatch between file and table
372
// Solution: Enable schema evolution or fix schema
373
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
374
```
375
376
### Compression Issues
377
```scala
378
// Error: Unsupported compression codec
379
// Solution: Use supported codec or install required libraries
380
df.write.format("orc").option("compression", "snappy").save(path)
381
```
382
383
## Performance Tuning
384
385
### ORC Optimization
386
387
```scala
388
// Enable vectorized reading
389
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
390
391
// Configure split size
392
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") // 128MB
393
394
// Enable bloom filters
395
spark.sql("""
396
CREATE TABLE optimized_table (...)
397
USING HIVE
398
STORED AS ORC
399
TBLPROPERTIES (
400
'orc.bloom.filter.columns'='id,name',
401
'orc.create.index'='true'
402
)
403
""")
404
```
405
406
### File Size Optimization
407
408
```scala
409
// Control file size during writes
410
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "67108864") // 64MB
411
412
// Coalesce small files
413
df.coalesce(1).write.format("orc").save(path)
414
```
415
416
## Types
417
418
```scala { .api }
419
// File format interface
420
trait FileFormat {
421
def inferSchema(
422
sparkSession: SparkSession,
423
options: Map[String, String],
424
files: Seq[FileStatus]
425
): Option[StructType]
426
427
def prepareWrite(
428
sparkSession: SparkSession,
429
job: Job,
430
options: Map[String, String],
431
dataSchema: StructType
432
): OutputWriterFactory
433
}
434
435
// Data source registration
436
trait DataSourceRegister {
437
def shortName(): String
438
}
439
440
// Hive file sink configuration
441
case class FileSinkDesc(
442
dirName: String,
443
tableInfo: TableDesc,
444
compressed: Boolean,
445
destTableId: Int,
446
compressCodec: String
447
)
448
449
// Table description for Hive
450
case class TableDesc(
451
inputFormat: Class[_ <: InputFormat[_, _]],
452
outputFormat: Class[_ <: OutputFormat[_, _]],
453
properties: Properties
454
)
455
456
// ORC search argument for predicate pushdown
457
trait SearchArgument {
458
def getLeaves(): java.util.List[PredicateLeaf]
459
def getExpression(): ExpressionTree
460
}
461
462
// File status information
463
case class FileStatus(
464
path: String,
465
length: Long,
466
isDirectory: Boolean,
467
blockReplication: Short,
468
blockSize: Long,
469
modificationTime: Long,
470
accessTime: Long
471
)
472
```