or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md

file-formats.mddocs/

0

# File Formats

1

2

Native support for Hive file formats including traditional Hive tables and optimized ORC files with Hive compatibility. This enables reading and writing data in various Hive-compatible formats while leveraging Spark's performance optimizations.

3

4

## Capabilities

5

6

### HiveFileFormat

7

8

FileFormat implementation for traditional Hive tables supporting various SerDes and storage formats.

9

10

**⚠️ Implementation Status**: Currently write-only. Read operations are not implemented and will throw UnsupportedOperationException.

11

12

```scala { .api }

13

/**

14

* FileFormat implementation for Hive tables

15

* Currently supports WRITING data using Hive SerDes and OutputFormat classes

16

* Reading is not implemented - use HiveTableScanExec for reading Hive tables

17

*/

18

class HiveFileFormat extends FileFormat with DataSourceRegister with Logging {

19

20

/** Data source short name for SQL registration */

21

override def shortName(): String = "hive"

22

23

/**

24

* Schema inference is not supported - throws UnsupportedOperationException

25

* Schema must be provided from Hive metastore

26

*/

27

override def inferSchema(

28

sparkSession: SparkSession,

29

options: Map[String, String],

30

files: Seq[FileStatus]

31

): Option[StructType]

32

33

/**

34

* Prepare write operations for Hive tables

35

* @param sparkSession - Current SparkSession

36

* @param job - Hadoop Job configuration

37

* @param options - Write options including SerDe settings

38

* @param dataSchema - Schema of data to write

39

* @return OutputWriterFactory for creating individual file writers

40

*/

41

override def prepareWrite(

42

sparkSession: SparkSession,

43

job: Job,

44

options: Map[String, String],

45

dataSchema: StructType

46

): OutputWriterFactory

47

48

/**

49

* Build reader for scanning Hive table files

50

* @param sparkSession - Current SparkSession

51

* @param dataSchema - Schema of data to read

52

* @param partitionSchema - Schema of partition columns

53

* @param requiredSchema - Columns actually needed by query

54

* @param filters - Pushdown predicates

55

* @param options - Read options including SerDe settings

56

* @param hadoopConf - Hadoop configuration

57

* @return Function to create PartitionedFile readers

58

*/

59

override def buildReader(

60

sparkSession: SparkSession,

61

dataSchema: StructType,

62

partitionSchema: StructType,

63

requiredSchema: StructType,

64

filters: Seq[Filter],

65

options: Map[String, String],

66

hadoopConf: Configuration

67

): PartitionedFile => Iterator[InternalRow]

68

}

69

```

70

71

**Usage Examples:**

72

73

```scala

74

import org.apache.spark.sql.SparkSession

75

76

val spark = SparkSession.builder()

77

.enableHiveSupport()

78

.getOrCreate()

79

80

// Create Hive table with specific file format

81

spark.sql("""

82

CREATE TABLE text_table (

83

id INT,

84

name STRING,

85

age INT

86

)

87

STORED AS TEXTFILE

88

LOCATION '/user/data/text_table'

89

""")

90

91

// Read from Hive table (automatically uses HiveFileFormat)

92

val df = spark.sql("SELECT * FROM text_table")

93

df.show()

94

95

// Write to Hive table with custom SerDe

96

spark.sql("""

97

CREATE TABLE custom_serde_table (

98

id INT,

99

data STRING

100

)

101

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'

102

WITH SERDEPROPERTIES (

103

"separatorChar" = ",",

104

"quoteChar" = "\"",

105

"escapeChar" = "\\"

106

)

107

STORED AS TEXTFILE

108

""")

109

```

110

111

### OrcFileFormat

112

113

Hive-compatible ORC file format implementation with optimizations.

114

115

**⚠️ Implementation Note**: There are two ORC implementations in Spark:

116

- **Hive ORC** (`org.apache.spark.sql.hive.orc.OrcFileFormat`) - Uses Hive ORC libraries for compatibility

117

- **Core ORC** (`org.apache.spark.sql.execution.datasources.orc.OrcFileFormat`) - Uses Apache ORC directly for better performance

118

119

The Hive ORC implementation documented here provides better Hive compatibility but may have different performance characteristics.

120

121

```scala { .api }

122

/**

123

* Hive-compatible ORC file format implementation

124

* Provides native ORC reading/writing with Hive metastore integration

125

*/

126

class OrcFileFormat extends FileFormat with DataSourceRegister with Logging {

127

128

/** Data source short name */

129

override def shortName(): String = "orc"

130

131

/**

132

* Infer schema from ORC files

133

* @param sparkSession - Current SparkSession

134

* @param options - Read options

135

* @param files - ORC files to analyze

136

* @return Inferred schema or None if cannot infer

137

*/

138

override def inferSchema(

139

sparkSession: SparkSession,

140

options: Map[String, String],

141

files: Seq[FileStatus]

142

): Option[StructType]

143

144

/**

145

* Build optimized ORC reader with predicate pushdown

146

* @param sparkSession - Current SparkSession

147

* @param dataSchema - Schema of data in files

148

* @param partitionSchema - Partition column schema

149

* @param requiredSchema - Columns needed by query

150

* @param filters - Pushdown predicates for ORC row groups

151

* @param options - Read options

152

* @param hadoopConf - Hadoop configuration

153

* @return Reader function for ORC files

154

*/

155

override def buildReader(

156

sparkSession: SparkSession,

157

dataSchema: StructType,

158

partitionSchema: StructType,

159

requiredSchema: StructType,

160

filters: Seq[Filter],

161

options: Map[String, String],

162

hadoopConf: Configuration

163

): PartitionedFile => Iterator[InternalRow]

164

165

/**

166

* Prepare ORC write operations

167

* @param sparkSession - Current SparkSession

168

* @param job - Hadoop Job for configuration

169

* @param options - Write options including compression

170

* @param dataSchema - Schema of data to write

171

* @return OutputWriterFactory for creating ORC writers

172

*/

173

override def prepareWrite(

174

sparkSession: SparkSession,

175

job: Job,

176

options: Map[String, String],

177

dataSchema: StructType

178

): OutputWriterFactory

179

180

/**

181

* Check if vectorized reading is supported

182

* @param requiredSchema - Required columns

183

* @return true if vectorized reading can be used

184

*/

185

override def supportBatch(requiredSchema: StructType): Boolean = true

186

}

187

```

188

189

**Usage Examples:**

190

191

```scala

192

// Create ORC table

193

spark.sql("""

194

CREATE TABLE orc_table (

195

id BIGINT,

196

name STRING,

197

age INT,

198

salary DOUBLE

199

)

200

STORED AS ORC

201

LOCATION '/user/data/orc_table'

202

TBLPROPERTIES (

203

'orc.compress' = 'SNAPPY',

204

'orc.bloom.filter.columns' = 'id,name'

205

)

206

""")

207

208

// Write data to ORC table with compression

209

df.write

210

.mode("overwrite")

211

.option("compression", "snappy")

212

.format("orc")

213

.saveAsTable("orc_table")

214

215

// Read with predicate pushdown (automatically optimized)

216

val filtered = spark.sql("""

217

SELECT name, salary

218

FROM orc_table

219

WHERE age > 25 AND salary > 50000

220

""")

221

filtered.explain(true) // Shows predicate pushdown

222

```

223

224

### HiveOptions Configuration

225

226

Configuration class for Hive-specific format options.

227

228

```scala { .api }

229

/**

230

* Configuration options for Hive data source operations

231

*/

232

class HiveOptions(parameters: Map[String, String]) {

233

234

/** File format specification (e.g., "textfile", "sequencefile", "orc") */

235

val fileFormat: Option[String] = parameters.get(HiveOptions.FILE_FORMAT)

236

237

/** Input format class name */

238

val inputFormat: Option[String] = parameters.get(HiveOptions.INPUT_FORMAT)

239

240

/** Output format class name */

241

val outputFormat: Option[String] = parameters.get(HiveOptions.OUTPUT_FORMAT)

242

243

/** SerDe class name */

244

val serde: Option[String] = parameters.get(HiveOptions.SERDE)

245

246

/**

247

* Check if input/output formats are explicitly specified

248

* @return true if both input and output formats are provided

249

*/

250

def hasInputOutputFormat: Boolean = inputFormat.isDefined && outputFormat.isDefined

251

252

/**

253

* Get SerDe properties from options

254

* @return Map of SerDe-specific properties

255

*/

256

def serdeProperties: Map[String, String] = {

257

parameters.filterKeys(!HiveOptions.delimiterOptions.contains(_))

258

}

259

}

260

261

object HiveOptions {

262

// Option key constants

263

val FILE_FORMAT = "fileFormat"

264

val INPUT_FORMAT = "inputFormat"

265

val OUTPUT_FORMAT = "outputFormat"

266

val SERDE = "serde"

267

268

// Common delimiter option mappings

269

val delimiterOptions: Map[String, String] = Map(

270

"field.delim" -> "field.delim",

271

"line.delim" -> "line.delim",

272

"collection.delim" -> "collection.delim",

273

"mapkey.delim" -> "mapkey.delim"

274

)

275

276

/**

277

* Get compression configuration for Hive writes

278

* @param sessionState - Hive SessionState

279

* @param hadoopConf - Hadoop configuration

280

* @param compressionCodec - Optional compression codec override

281

* @return Compression codec to use or None

282

*/

283

def getHiveWriteCompression(

284

sessionState: SessionState,

285

hadoopConf: Configuration,

286

compressionCodec: Option[String]

287

): Option[String]

288

}

289

```

290

291

### File Format Examples

292

293

Comprehensive examples for different Hive file formats.

294

295

**TextFile Format:**

296

297

```scala

298

// Create table with TextFile format and custom delimiters

299

spark.sql("""

300

CREATE TABLE csv_data (

301

id INT,

302

name STRING,

303

email STRING,

304

age INT

305

)

306

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'

307

WITH SERDEPROPERTIES (

308

'field.delim' = ',',

309

'line.delim' = '\n',

310

'serialization.format' = ','

311

)

312

STORED AS TEXTFILE

313

LOCATION '/data/csv'

314

""")

315

316

// Load CSV data

317

spark.sql("""

318

LOAD DATA INPATH '/input/data.csv'

319

INTO TABLE csv_data

320

""")

321

```

322

323

**SequenceFile Format:**

324

325

```scala

326

// Create SequenceFile table

327

spark.sql("""

328

CREATE TABLE sequence_data (

329

key STRING,

330

value STRING

331

)

332

STORED AS SEQUENCEFILE

333

LOCATION '/data/sequence'

334

""")

335

336

// Write data in SequenceFile format

337

df.write

338

.format("hive")

339

.option("fileFormat", "sequencefile")

340

.mode("overwrite")

341

.saveAsTable("sequence_data")

342

```

343

344

**Avro Format:**

345

346

```scala

347

// Create Avro table

348

spark.sql("""

349

CREATE TABLE avro_data (

350

id BIGINT,

351

name STRING,

352

metadata MAP<STRING,STRING>

353

)

354

STORED AS AVRO

355

LOCATION '/data/avro'

356

TBLPROPERTIES (

357

'avro.schema.literal' = '{

358

"type": "record",

359

"name": "User",

360

"fields": [

361

{"name": "id", "type": "long"},

362

{"name": "name", "type": "string"},

363

{"name": "metadata", "type": {"type": "map", "values": "string"}}

364

]

365

}'

366

)

367

""")

368

```

369

370

**Parquet Format (with Hive compatibility):**

371

372

```scala

373

// Create Parquet table with Hive metastore

374

spark.sql("""

375

CREATE TABLE parquet_data (

376

id BIGINT,

377

name STRING,

378

created_date DATE

379

)

380

STORED AS PARQUET

381

LOCATION '/data/parquet'

382

TBLPROPERTIES (

383

'parquet.compression' = 'SNAPPY'

384

)

385

""")

386

387

// Automatic conversion to Spark's native Parquet reader

388

// (controlled by spark.sql.hive.convertMetastoreParquet)

389

val df = spark.sql("SELECT * FROM parquet_data")

390

df.explain() // Shows either HiveTableRelation or parquet scan

391

```

392

393

### Advanced File Format Operations

394

395

**Custom SerDe Integration:**

396

397

```scala

398

// Register custom SerDe

399

spark.sql("ADD JAR /path/to/custom-serde.jar")

400

401

spark.sql("""

402

CREATE TABLE json_data (

403

id INT,

404

data STRING

405

)

406

ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'

407

STORED AS TEXTFILE

408

LOCATION '/data/json'

409

""")

410

411

// Use custom SerDe for complex data

412

val jsonDF = spark.sql("SELECT get_json_object(data, '$.user.name') as username FROM json_data")

413

```

414

415

**Multi-Format Table Operations:**

416

417

```scala

418

// Create partitioned table with different formats per partition

419

spark.sql("""

420

CREATE TABLE multi_format_data (

421

id INT,

422

name STRING,

423

value DOUBLE

424

)

425

PARTITIONED BY (format_type STRING)

426

STORED AS TEXTFILE

427

LOCATION '/data/multi_format'

428

""")

429

430

// Add partitions with different storage formats

431

spark.sql("""

432

ALTER TABLE multi_format_data

433

ADD PARTITION (format_type='text')

434

LOCATION '/data/multi_format/text'

435

""")

436

437

spark.sql("""

438

ALTER TABLE multi_format_data

439

ADD PARTITION (format_type='orc')

440

LOCATION '/data/multi_format/orc'

441

""")

442

```

443

444

**Compression Configuration:**

445

446

```scala

447

// Configure compression for different formats

448

val spark = SparkSession.builder()

449

.config("spark.sql.hive.convertMetastoreOrc", "true")

450

.config("spark.sql.orc.compression.codec", "snappy")

451

.config("spark.sql.parquet.compression.codec", "gzip")

452

.enableHiveSupport()

453

.getOrCreate()

454

455

// Write with specific compression

456

df.write

457

.format("orc")

458

.option("compression", "zlib")

459

.mode("overwrite")

460

.saveAsTable("compressed_table")

461

```

462

463

### Performance Optimization

464

465

**Vectorized Reading:**

466

467

```scala

468

// Enable vectorized ORC reading

469

spark.conf.set("spark.sql.orc.impl", "hive")

470

spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

471

472

// Query benefits from vectorization

473

val result = spark.sql("""

474

SELECT sum(salary), avg(age)

475

FROM large_orc_table

476

WHERE department = 'Engineering'

477

""")

478

result.explain() // Shows vectorized operations

479

```

480

481

**Predicate Pushdown:**

482

483

```scala

484

// ORC predicate pushdown automatically applied

485

val filtered = spark.sql("""

486

SELECT name, salary

487

FROM employee_orc

488

WHERE hire_date >= '2020-01-01'

489

AND department IN ('Engineering', 'Sales')

490

AND salary > 75000

491

""")

492

493

// Check pushdown in query plan

494

filtered.explain(true)

495

// Shows: PushedFilters: [IsNotNull(hire_date), GreaterThanOrEqual(hire_date,...)]

496

```

497

498

**Schema Evolution:**

499

500

```scala

501

// Handle schema evolution in ORC files

502

spark.conf.set("spark.sql.orc.mergeSchema", "true")

503

504

// Read tables with evolved schemas

505

val evolvedDF = spark.sql("SELECT * FROM evolved_orc_table")

506

evolvedDF.printSchema() // Shows merged schema from all files

507

```

508

509

### File Format Utilities

510

511

**Format Detection and Conversion:**

512

513

```scala

514

// Check table storage format

515

val tableInfo = spark.sql("DESCRIBE FORMATTED my_table").collect()

516

val storageFormat = tableInfo.find(_.getString(0) == "InputFormat").map(_.getString(1))

517

518

// Convert table format

519

spark.sql("""

520

CREATE TABLE orc_converted

521

STORED AS ORC

522

AS SELECT * FROM textfile_table

523

""")

524

525

// Optimize table by converting format

526

spark.sql("""

527

INSERT OVERWRITE TABLE existing_table

528

SELECT * FROM existing_table

529

""") // Uses current table's optimal format

530

```

531

532

**File Statistics and Metadata:**

533

534

```scala

535

// Get file-level statistics for ORC

536

val stats = spark.sql("""

537

SELECT

538

input_file_name() as filename,

539

count(*) as row_count

540

FROM orc_table

541

GROUP BY input_file_name()

542

""")

543

stats.show()

544

545

// Analyze table statistics

546

spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS")

547

spark.sql("DESCRIBE EXTENDED my_table").show(100, false)

548

```

549

550

## Error Handling

551

552

Common error patterns and solutions for file format operations:

553

554

```scala

555

import org.apache.spark.sql.AnalysisException

556

557

try {

558

spark.sql("SELECT * FROM malformed_table")

559

} catch {

560

case e: AnalysisException if e.getMessage.contains("SerDe") =>

561

println("SerDe configuration error - check SerDe properties")

562

case e: AnalysisException if e.getMessage.contains("InputFormat") =>

563

println("InputFormat error - verify file format configuration")

564

case e: Exception =>

565

println(s"File format error: ${e.getMessage}")

566

}

567

568

// Handle missing files gracefully

569

val safeDF = try {

570

spark.sql("SELECT * FROM potentially_missing_table")

571

} catch {

572

case _: AnalysisException =>

573

spark.emptyDataFrame

574

}

575

```