0
# File Formats
1
2
Native support for Hive file formats including traditional Hive tables and optimized ORC files with Hive compatibility. This enables reading and writing data in various Hive-compatible formats while leveraging Spark's performance optimizations.
3
4
## Capabilities
5
6
### HiveFileFormat
7
8
FileFormat implementation for traditional Hive tables supporting various SerDes and storage formats.
9
10
**⚠️ Implementation Status**: Currently write-only. Read operations are not implemented and will throw UnsupportedOperationException.
11
12
```scala { .api }
13
/**
14
* FileFormat implementation for Hive tables
15
* Currently supports WRITING data using Hive SerDes and OutputFormat classes
16
* Reading is not implemented - use HiveTableScanExec for reading Hive tables
17
*/
18
class HiveFileFormat extends FileFormat with DataSourceRegister with Logging {
19
20
/** Data source short name for SQL registration */
21
override def shortName(): String = "hive"
22
23
/**
24
* Schema inference is not supported - throws UnsupportedOperationException
25
* Schema must be provided from Hive metastore
26
*/
27
override def inferSchema(
28
sparkSession: SparkSession,
29
options: Map[String, String],
30
files: Seq[FileStatus]
31
): Option[StructType]
32
33
/**
34
* Prepare write operations for Hive tables
35
* @param sparkSession - Current SparkSession
36
* @param job - Hadoop Job configuration
37
* @param options - Write options including SerDe settings
38
* @param dataSchema - Schema of data to write
39
* @return OutputWriterFactory for creating individual file writers
40
*/
41
override def prepareWrite(
42
sparkSession: SparkSession,
43
job: Job,
44
options: Map[String, String],
45
dataSchema: StructType
46
): OutputWriterFactory
47
48
/**
49
* Build reader for scanning Hive table files
50
* @param sparkSession - Current SparkSession
51
* @param dataSchema - Schema of data to read
52
* @param partitionSchema - Schema of partition columns
53
* @param requiredSchema - Columns actually needed by query
54
* @param filters - Pushdown predicates
55
* @param options - Read options including SerDe settings
56
* @param hadoopConf - Hadoop configuration
57
* @return Function to create PartitionedFile readers
58
*/
59
override def buildReader(
60
sparkSession: SparkSession,
61
dataSchema: StructType,
62
partitionSchema: StructType,
63
requiredSchema: StructType,
64
filters: Seq[Filter],
65
options: Map[String, String],
66
hadoopConf: Configuration
67
): PartitionedFile => Iterator[InternalRow]
68
}
69
```
70
71
**Usage Examples:**
72
73
```scala
74
import org.apache.spark.sql.SparkSession
75
76
val spark = SparkSession.builder()
77
.enableHiveSupport()
78
.getOrCreate()
79
80
// Create Hive table with specific file format
81
spark.sql("""
82
CREATE TABLE text_table (
83
id INT,
84
name STRING,
85
age INT
86
)
87
STORED AS TEXTFILE
88
LOCATION '/user/data/text_table'
89
""")
90
91
// Read from Hive table (automatically uses HiveFileFormat)
92
val df = spark.sql("SELECT * FROM text_table")
93
df.show()
94
95
// Write to Hive table with custom SerDe
96
spark.sql("""
97
CREATE TABLE custom_serde_table (
98
id INT,
99
data STRING
100
)
101
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
102
WITH SERDEPROPERTIES (
103
"separatorChar" = ",",
104
"quoteChar" = "\"",
105
"escapeChar" = "\\"
106
)
107
STORED AS TEXTFILE
108
""")
109
```
110
111
### OrcFileFormat
112
113
Hive-compatible ORC file format implementation with optimizations.
114
115
**⚠️ Implementation Note**: There are two ORC implementations in Spark:
116
- **Hive ORC** (`org.apache.spark.sql.hive.orc.OrcFileFormat`) - Uses Hive ORC libraries for compatibility
117
- **Core ORC** (`org.apache.spark.sql.execution.datasources.orc.OrcFileFormat`) - Uses Apache ORC directly for better performance
118
119
The Hive ORC implementation documented here provides better Hive compatibility but may have different performance characteristics.
120
121
```scala { .api }
122
/**
123
* Hive-compatible ORC file format implementation
124
* Provides native ORC reading/writing with Hive metastore integration
125
*/
126
class OrcFileFormat extends FileFormat with DataSourceRegister with Logging {
127
128
/** Data source short name */
129
override def shortName(): String = "orc"
130
131
/**
132
* Infer schema from ORC files
133
* @param sparkSession - Current SparkSession
134
* @param options - Read options
135
* @param files - ORC files to analyze
136
* @return Inferred schema or None if cannot infer
137
*/
138
override def inferSchema(
139
sparkSession: SparkSession,
140
options: Map[String, String],
141
files: Seq[FileStatus]
142
): Option[StructType]
143
144
/**
145
* Build optimized ORC reader with predicate pushdown
146
* @param sparkSession - Current SparkSession
147
* @param dataSchema - Schema of data in files
148
* @param partitionSchema - Partition column schema
149
* @param requiredSchema - Columns needed by query
150
* @param filters - Pushdown predicates for ORC row groups
151
* @param options - Read options
152
* @param hadoopConf - Hadoop configuration
153
* @return Reader function for ORC files
154
*/
155
override def buildReader(
156
sparkSession: SparkSession,
157
dataSchema: StructType,
158
partitionSchema: StructType,
159
requiredSchema: StructType,
160
filters: Seq[Filter],
161
options: Map[String, String],
162
hadoopConf: Configuration
163
): PartitionedFile => Iterator[InternalRow]
164
165
/**
166
* Prepare ORC write operations
167
* @param sparkSession - Current SparkSession
168
* @param job - Hadoop Job for configuration
169
* @param options - Write options including compression
170
* @param dataSchema - Schema of data to write
171
* @return OutputWriterFactory for creating ORC writers
172
*/
173
override def prepareWrite(
174
sparkSession: SparkSession,
175
job: Job,
176
options: Map[String, String],
177
dataSchema: StructType
178
): OutputWriterFactory
179
180
/**
181
* Check if vectorized reading is supported
182
* @param requiredSchema - Required columns
183
* @return true if vectorized reading can be used
184
*/
185
override def supportBatch(requiredSchema: StructType): Boolean = true
186
}
187
```
188
189
**Usage Examples:**
190
191
```scala
192
// Create ORC table
193
spark.sql("""
194
CREATE TABLE orc_table (
195
id BIGINT,
196
name STRING,
197
age INT,
198
salary DOUBLE
199
)
200
STORED AS ORC
201
LOCATION '/user/data/orc_table'
202
TBLPROPERTIES (
203
'orc.compress' = 'SNAPPY',
204
'orc.bloom.filter.columns' = 'id,name'
205
)
206
""")
207
208
// Write data to ORC table with compression
209
df.write
210
.mode("overwrite")
211
.option("compression", "snappy")
212
.format("orc")
213
.saveAsTable("orc_table")
214
215
// Read with predicate pushdown (automatically optimized)
216
val filtered = spark.sql("""
217
SELECT name, salary
218
FROM orc_table
219
WHERE age > 25 AND salary > 50000
220
""")
221
filtered.explain(true) // Shows predicate pushdown
222
```
223
224
### HiveOptions Configuration
225
226
Configuration class for Hive-specific format options.
227
228
```scala { .api }
229
/**
230
* Configuration options for Hive data source operations
231
*/
232
class HiveOptions(parameters: Map[String, String]) {
233
234
/** File format specification (e.g., "textfile", "sequencefile", "orc") */
235
val fileFormat: Option[String] = parameters.get(HiveOptions.FILE_FORMAT)
236
237
/** Input format class name */
238
val inputFormat: Option[String] = parameters.get(HiveOptions.INPUT_FORMAT)
239
240
/** Output format class name */
241
val outputFormat: Option[String] = parameters.get(HiveOptions.OUTPUT_FORMAT)
242
243
/** SerDe class name */
244
val serde: Option[String] = parameters.get(HiveOptions.SERDE)
245
246
/**
247
* Check if input/output formats are explicitly specified
248
* @return true if both input and output formats are provided
249
*/
250
def hasInputOutputFormat: Boolean = inputFormat.isDefined && outputFormat.isDefined
251
252
/**
253
* Get SerDe properties from options
254
* @return Map of SerDe-specific properties
255
*/
256
def serdeProperties: Map[String, String] = {
257
parameters.filterKeys(!HiveOptions.delimiterOptions.contains(_))
258
}
259
}
260
261
object HiveOptions {
262
// Option key constants
263
val FILE_FORMAT = "fileFormat"
264
val INPUT_FORMAT = "inputFormat"
265
val OUTPUT_FORMAT = "outputFormat"
266
val SERDE = "serde"
267
268
// Common delimiter option mappings
269
val delimiterOptions: Map[String, String] = Map(
270
"field.delim" -> "field.delim",
271
"line.delim" -> "line.delim",
272
"collection.delim" -> "collection.delim",
273
"mapkey.delim" -> "mapkey.delim"
274
)
275
276
/**
277
* Get compression configuration for Hive writes
278
* @param sessionState - Hive SessionState
279
* @param hadoopConf - Hadoop configuration
280
* @param compressionCodec - Optional compression codec override
281
* @return Compression codec to use or None
282
*/
283
def getHiveWriteCompression(
284
sessionState: SessionState,
285
hadoopConf: Configuration,
286
compressionCodec: Option[String]
287
): Option[String]
288
}
289
```
290
291
### File Format Examples
292
293
Comprehensive examples for different Hive file formats.
294
295
**TextFile Format:**
296
297
```scala
298
// Create table with TextFile format and custom delimiters
299
spark.sql("""
300
CREATE TABLE csv_data (
301
id INT,
302
name STRING,
303
email STRING,
304
age INT
305
)
306
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
307
WITH SERDEPROPERTIES (
308
'field.delim' = ',',
309
'line.delim' = '\n',
310
'serialization.format' = ','
311
)
312
STORED AS TEXTFILE
313
LOCATION '/data/csv'
314
""")
315
316
// Load CSV data
317
spark.sql("""
318
LOAD DATA INPATH '/input/data.csv'
319
INTO TABLE csv_data
320
""")
321
```
322
323
**SequenceFile Format:**
324
325
```scala
326
// Create SequenceFile table
327
spark.sql("""
328
CREATE TABLE sequence_data (
329
key STRING,
330
value STRING
331
)
332
STORED AS SEQUENCEFILE
333
LOCATION '/data/sequence'
334
""")
335
336
// Write data in SequenceFile format
337
df.write
338
.format("hive")
339
.option("fileFormat", "sequencefile")
340
.mode("overwrite")
341
.saveAsTable("sequence_data")
342
```
343
344
**Avro Format:**
345
346
```scala
347
// Create Avro table
348
spark.sql("""
349
CREATE TABLE avro_data (
350
id BIGINT,
351
name STRING,
352
metadata MAP<STRING,STRING>
353
)
354
STORED AS AVRO
355
LOCATION '/data/avro'
356
TBLPROPERTIES (
357
'avro.schema.literal' = '{
358
"type": "record",
359
"name": "User",
360
"fields": [
361
{"name": "id", "type": "long"},
362
{"name": "name", "type": "string"},
363
{"name": "metadata", "type": {"type": "map", "values": "string"}}
364
]
365
}'
366
)
367
""")
368
```
369
370
**Parquet Format (with Hive compatibility):**
371
372
```scala
373
// Create Parquet table with Hive metastore
374
spark.sql("""
375
CREATE TABLE parquet_data (
376
id BIGINT,
377
name STRING,
378
created_date DATE
379
)
380
STORED AS PARQUET
381
LOCATION '/data/parquet'
382
TBLPROPERTIES (
383
'parquet.compression' = 'SNAPPY'
384
)
385
""")
386
387
// Automatic conversion to Spark's native Parquet reader
388
// (controlled by spark.sql.hive.convertMetastoreParquet)
389
val df = spark.sql("SELECT * FROM parquet_data")
390
df.explain() // Shows either HiveTableRelation or parquet scan
391
```
392
393
### Advanced File Format Operations
394
395
**Custom SerDe Integration:**
396
397
```scala
398
// Register custom SerDe
399
spark.sql("ADD JAR /path/to/custom-serde.jar")
400
401
spark.sql("""
402
CREATE TABLE json_data (
403
id INT,
404
data STRING
405
)
406
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
407
STORED AS TEXTFILE
408
LOCATION '/data/json'
409
""")
410
411
// Use custom SerDe for complex data
412
val jsonDF = spark.sql("SELECT get_json_object(data, '$.user.name') as username FROM json_data")
413
```
414
415
**Multi-Format Table Operations:**
416
417
```scala
418
// Create partitioned table with different formats per partition
419
spark.sql("""
420
CREATE TABLE multi_format_data (
421
id INT,
422
name STRING,
423
value DOUBLE
424
)
425
PARTITIONED BY (format_type STRING)
426
STORED AS TEXTFILE
427
LOCATION '/data/multi_format'
428
""")
429
430
// Add partitions with different storage formats
431
spark.sql("""
432
ALTER TABLE multi_format_data
433
ADD PARTITION (format_type='text')
434
LOCATION '/data/multi_format/text'
435
""")
436
437
spark.sql("""
438
ALTER TABLE multi_format_data
439
ADD PARTITION (format_type='orc')
440
LOCATION '/data/multi_format/orc'
441
""")
442
```
443
444
**Compression Configuration:**
445
446
```scala
447
// Configure compression for different formats
448
val spark = SparkSession.builder()
449
.config("spark.sql.hive.convertMetastoreOrc", "true")
450
.config("spark.sql.orc.compression.codec", "snappy")
451
.config("spark.sql.parquet.compression.codec", "gzip")
452
.enableHiveSupport()
453
.getOrCreate()
454
455
// Write with specific compression
456
df.write
457
.format("orc")
458
.option("compression", "zlib")
459
.mode("overwrite")
460
.saveAsTable("compressed_table")
461
```
462
463
### Performance Optimization
464
465
**Vectorized Reading:**
466
467
```scala
468
// Enable vectorized ORC reading
469
spark.conf.set("spark.sql.orc.impl", "hive")
470
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
471
472
// Query benefits from vectorization
473
val result = spark.sql("""
474
SELECT sum(salary), avg(age)
475
FROM large_orc_table
476
WHERE department = 'Engineering'
477
""")
478
result.explain() // Shows vectorized operations
479
```
480
481
**Predicate Pushdown:**
482
483
```scala
484
// ORC predicate pushdown automatically applied
485
val filtered = spark.sql("""
486
SELECT name, salary
487
FROM employee_orc
488
WHERE hire_date >= '2020-01-01'
489
AND department IN ('Engineering', 'Sales')
490
AND salary > 75000
491
""")
492
493
// Check pushdown in query plan
494
filtered.explain(true)
495
// Shows: PushedFilters: [IsNotNull(hire_date), GreaterThanOrEqual(hire_date,...)]
496
```
497
498
**Schema Evolution:**
499
500
```scala
501
// Handle schema evolution in ORC files
502
spark.conf.set("spark.sql.orc.mergeSchema", "true")
503
504
// Read tables with evolved schemas
505
val evolvedDF = spark.sql("SELECT * FROM evolved_orc_table")
506
evolvedDF.printSchema() // Shows merged schema from all files
507
```
508
509
### File Format Utilities
510
511
**Format Detection and Conversion:**
512
513
```scala
514
// Check table storage format
515
val tableInfo = spark.sql("DESCRIBE FORMATTED my_table").collect()
516
val storageFormat = tableInfo.find(_.getString(0) == "InputFormat").map(_.getString(1))
517
518
// Convert table format
519
spark.sql("""
520
CREATE TABLE orc_converted
521
STORED AS ORC
522
AS SELECT * FROM textfile_table
523
""")
524
525
// Optimize table by converting format
526
spark.sql("""
527
INSERT OVERWRITE TABLE existing_table
528
SELECT * FROM existing_table
529
""") // Uses current table's optimal format
530
```
531
532
**File Statistics and Metadata:**
533
534
```scala
535
// Get file-level statistics for ORC
536
val stats = spark.sql("""
537
SELECT
538
input_file_name() as filename,
539
count(*) as row_count
540
FROM orc_table
541
GROUP BY input_file_name()
542
""")
543
stats.show()
544
545
// Analyze table statistics
546
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS")
547
spark.sql("DESCRIBE EXTENDED my_table").show(100, false)
548
```
549
550
## Error Handling
551
552
Common error patterns and solutions for file format operations:
553
554
```scala
555
import org.apache.spark.sql.AnalysisException
556
557
try {
558
spark.sql("SELECT * FROM malformed_table")
559
} catch {
560
case e: AnalysisException if e.getMessage.contains("SerDe") =>
561
println("SerDe configuration error - check SerDe properties")
562
case e: AnalysisException if e.getMessage.contains("InputFormat") =>
563
println("InputFormat error - verify file format configuration")
564
case e: Exception =>
565
println(s"File format error: ${e.getMessage}")
566
}
567
568
// Handle missing files gracefully
569
val safeDF = try {
570
spark.sql("SELECT * FROM potentially_missing_table")
571
} catch {
572
case _: AnalysisException =>
573
spark.emptyDataFrame
574
}
575
```