or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md

file-formats.mddocs/

0

# File Format Support

1

2

Apache Spark Hive integration provides comprehensive support for various file formats, with native optimized readers and writers for ORC and Parquet formats, as well as compatibility with traditional Hive file formats.

3

4

## ORC File Format Support

5

6

Spark provides native ORC support with advanced optimizations including vectorized reading, predicate pushdown, and column pruning.

7

8

### OrcFileFormat

9

10

```scala { .api }

11

class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable {

12

// DataSourceRegister interface

13

def shortName(): String // Returns "orc"

14

15

// FileFormat interface

16

def inferSchema(

17

sparkSession: SparkSession,

18

options: Map[String, String],

19

files: Seq[FileStatus]

20

): Option[StructType]

21

22

def prepareWrite(

23

sparkSession: SparkSession,

24

job: Job,

25

options: Map[String, String],

26

dataSchema: StructType

27

): OutputWriterFactory

28

29

def buildReader(

30

sparkSession: SparkSession,

31

dataSchema: StructType,

32

partitionSchema: StructType,

33

requiredSchema: StructType,

34

filters: Seq[Filter],

35

options: Map[String, String],

36

hadoopConf: Configuration

37

): PartitionedFile => Iterator[InternalRow]

38

}

39

```

40

41

### Using ORC Format

42

43

**Reading ORC Files:**

44

45

```scala

46

import org.apache.spark.sql.SparkSession

47

48

val spark = SparkSession.builder()

49

.enableHiveSupport()

50

.getOrCreate()

51

52

// Read ORC files directly

53

val df = spark.read.format("orc").load("/path/to/orc/files")

54

55

// Read ORC table through Hive metastore

56

val table = spark.table("my_orc_table")

57

58

// Read with options

59

val dfWithOptions = spark.read

60

.format("orc")

61

.option("mergeSchema", "true")

62

.load("/path/to/orc/files")

63

```

64

65

**Writing ORC Files:**

66

67

```scala

68

// Write DataFrame as ORC

69

df.write

70

.format("orc")

71

.option("compression", "snappy")

72

.save("/path/to/output/orc")

73

74

// Write to Hive table

75

df.write

76

.format("orc")

77

.mode("overwrite")

78

.saveAsTable("my_new_orc_table")

79

80

// Partitioned write

81

df.write

82

.format("orc")

83

.partitionBy("year", "month")

84

.save("/path/to/partitioned/orc")

85

```

86

87

### ORC Configuration Options

88

89

```scala { .api }

90

class OrcOptions(parameters: CaseInsensitiveMap[String]) {

91

def compressionCodec: String

92

def enableVectorizedReader: Boolean

93

def mergeSchema: Boolean

94

}

95

```

96

97

**Available ORC Options:**

98

99

- **compression**: Compression codec ("none", "snappy", "zlib", "lzo", "lz4", "zstd")

100

- **enableVectorizedReader**: Enable vectorized ORC reader (default: true)

101

- **mergeSchema**: Merge schemas when reading multiple files (default: false)

102

103

### ORC File Operations

104

105

```scala { .api }

106

object OrcFileOperator extends Logging {

107

def readSchema(files: Seq[String], conf: Option[Configuration]): Option[StructType]

108

def listOrcFiles(path: String, hadoopConf: Configuration): Seq[String]

109

def getRowCount(file: String, conf: Configuration): Long

110

}

111

```

112

113

**Usage Example:**

114

115

```scala

116

import org.apache.spark.sql.hive.orc.OrcFileOperator

117

118

// Read schema from ORC files

119

val schema = OrcFileOperator.readSchema(Seq("/path/to/file.orc"), None)

120

println(s"Schema: ${schema.get.treeString}")

121

122

// Get row count

123

val conf = spark.sparkContext.hadoopConfiguration

124

val rowCount = OrcFileOperator.getRowCount("/path/to/file.orc", conf)

125

```

126

127

### ORC Filter Pushdown

128

129

```scala { .api }

130

object OrcFilters extends Logging {

131

def createFilter(filters: Seq[Filter]): Option[SearchArgument]

132

def buildSearchArgument(

133

dataTypeMap: Map[String, DataType],

134

filters: Seq[Filter]

135

): Option[SearchArgument]

136

}

137

```

138

139

ORC supports predicate pushdown for:

140

- Equality filters (`=`)

141

- Comparison filters (`<`, `<=`, `>`, `>=`)

142

- IN predicates

143

- IS NULL / IS NOT NULL

144

- String pattern matching (LIKE)

145

- Logical combinations (AND, OR, NOT)

146

147

## Hive File Format Support

148

149

Support for traditional Hive file formats using Hive SerDes and input/output formats.

150

151

### HiveFileFormat

152

153

```scala { .api }

154

class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {

155

def prepareWrite(

156

sparkSession: SparkSession,

157

job: Job,

158

options: Map[String, String],

159

dataSchema: StructType

160

): OutputWriterFactory

161

162

def buildReader(

163

sparkSession: SparkSession,

164

dataSchema: StructType,

165

partitionSchema: StructType,

166

requiredSchema: StructType,

167

filters: Seq[Filter],

168

options: Map[String, String],

169

hadoopConf: Configuration

170

): PartitionedFile => Iterator[InternalRow]

171

}

172

```

173

174

### HiveOptions

175

176

Configuration for Hive-compatible file formats and SerDes.

177

178

```scala { .api }

179

class HiveOptions(parameters: CaseInsensitiveMap[String]) {

180

def fileFormat: String

181

def inputFormat: String

182

def outputFormat: String

183

def serde: String

184

def serdeProperties: Map[String, String]

185

}

186

```

187

188

### Supported Hive File Formats

189

190

**Text Files:**

191

```scala

192

// Create table with text file format

193

spark.sql("""

194

CREATE TABLE text_table (

195

id INT,

196

name STRING

197

) USING HIVE

198

STORED AS TEXTFILE

199

""")

200

```

201

202

**Sequence Files:**

203

```scala

204

spark.sql("""

205

CREATE TABLE seq_table (

206

id INT,

207

name STRING

208

) USING HIVE

209

STORED AS SEQUENCEFILE

210

""")

211

```

212

213

**Avro Files:**

214

```scala

215

spark.sql("""

216

CREATE TABLE avro_table (

217

id INT,

218

name STRING

219

) USING HIVE

220

STORED AS AVRO

221

""")

222

```

223

224

**Custom SerDe:**

225

```scala

226

spark.sql("""

227

CREATE TABLE custom_table (

228

id INT,

229

name STRING

230

) USING HIVE

231

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'

232

WITH SERDEPROPERTIES (

233

'field.delim' = '\t',

234

'line.delim' = '\n'

235

)

236

STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'

237

OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

238

""")

239

```

240

241

## Parquet Integration

242

243

While Parquet support is primarily handled by Spark's native Parquet reader, the Hive integration provides compatibility for Hive-created Parquet tables.

244

245

### Parquet Configuration

246

247

```scala

248

// Enable native Parquet reader for Hive tables

249

spark.conf.set("spark.sql.hive.convertMetastoreParquet", "true")

250

251

// Configure Parquet options

252

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")

253

spark.conf.set("spark.sql.parquet.mergeSchema", "false")

254

```

255

256

### Reading Hive Parquet Tables

257

258

```scala

259

// Read Hive Parquet table with native reader

260

val df = spark.table("my_parquet_table")

261

262

// Force use of Hive SerDe (not recommended)

263

spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")

264

val dfWithSerde = spark.table("my_parquet_table")

265

```

266

267

## Advanced File Format Features

268

269

### Schema Evolution

270

271

Support for schema evolution in ORC and Parquet formats:

272

273

```scala

274

// Enable schema merging for ORC

275

val df = spark.read

276

.format("orc")

277

.option("mergeSchema", "true")

278

.load("/path/to/evolved/schema")

279

280

// Handle missing columns

281

val dfWithDefaults = spark.read

282

.format("orc")

283

.option("columnNameOfCorruptRecord", "_corrupt_record")

284

.load("/path/to/files")

285

```

286

287

### Compression Support

288

289

**ORC Compression Options:**

290

- NONE

291

- ZLIB (default)

292

- SNAPPY

293

- LZO

294

- LZ4

295

- ZSTD

296

297

**Setting Compression:**

298

```scala

299

// For writes

300

df.write

301

.format("orc")

302

.option("compression", "snappy")

303

.save("/path/to/output")

304

305

// Global setting

306

spark.conf.set("spark.sql.orc.compression.codec", "snappy")

307

```

308

309

### Partition Support

310

311

File formats support both static and dynamic partitioning:

312

313

```scala

314

// Static partitioning

315

df.write

316

.format("orc")

317

.partitionBy("year", "month")

318

.save("/partitioned/data")

319

320

// Dynamic partitioning in Hive tables

321

spark.sql("SET hive.exec.dynamic.partition = true")

322

spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")

323

324

spark.sql("""

325

INSERT INTO TABLE partitioned_table

326

PARTITION(year, month)

327

SELECT id, name, year, month FROM source_table

328

""")

329

```

330

331

### Bucketing

332

333

Support for bucketed tables for improved join performance:

334

335

```scala

336

// Create bucketed table

337

spark.sql("""

338

CREATE TABLE bucketed_table (

339

id INT,

340

name STRING,

341

department STRING

342

) USING HIVE

343

CLUSTERED BY (id) INTO 4 BUCKETS

344

STORED AS ORC

345

""")

346

347

// Write to bucketed table

348

df.write

349

.format("orc")

350

.bucketBy(4, "id")

351

.saveAsTable("bucketed_table")

352

```

353

354

## Error Handling

355

356

Common file format errors and solutions:

357

358

### Unsupported File Format

359

```scala

360

// Error: Unsupported file format

361

// Solution: Ensure format is supported or use appropriate SerDe

362

spark.sql("""

363

CREATE TABLE custom_format_table (...)

364

STORED AS INPUTFORMAT 'custom.input.format'

365

OUTPUTFORMAT 'custom.output.format'

366

""")

367

```

368

369

### Schema Mismatch

370

```scala

371

// Error: Schema mismatch between file and table

372

// Solution: Enable schema evolution or fix schema

373

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

374

```

375

376

### Compression Issues

377

```scala

378

// Error: Unsupported compression codec

379

// Solution: Use supported codec or install required libraries

380

df.write.format("orc").option("compression", "snappy").save(path)

381

```

382

383

## Performance Tuning

384

385

### ORC Optimization

386

387

```scala

388

// Enable vectorized reading

389

spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

390

391

// Configure split size

392

spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") // 128MB

393

394

// Enable bloom filters

395

spark.sql("""

396

CREATE TABLE optimized_table (...)

397

USING HIVE

398

STORED AS ORC

399

TBLPROPERTIES (

400

'orc.bloom.filter.columns'='id,name',

401

'orc.create.index'='true'

402

)

403

""")

404

```

405

406

### File Size Optimization

407

408

```scala

409

// Control file size during writes

410

spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "67108864") // 64MB

411

412

// Coalesce small files

413

df.coalesce(1).write.format("orc").save(path)

414

```

415

416

## Types

417

418

```scala { .api }

419

// File format interface

420

trait FileFormat {

421

def inferSchema(

422

sparkSession: SparkSession,

423

options: Map[String, String],

424

files: Seq[FileStatus]

425

): Option[StructType]

426

427

def prepareWrite(

428

sparkSession: SparkSession,

429

job: Job,

430

options: Map[String, String],

431

dataSchema: StructType

432

): OutputWriterFactory

433

}

434

435

// Data source registration

436

trait DataSourceRegister {

437

def shortName(): String

438

}

439

440

// Hive file sink configuration

441

case class FileSinkDesc(

442

dirName: String,

443

tableInfo: TableDesc,

444

compressed: Boolean,

445

destTableId: Int,

446

compressCodec: String

447

)

448

449

// Table description for Hive

450

case class TableDesc(

451

inputFormat: Class[_ <: InputFormat[_, _]],

452

outputFormat: Class[_ <: OutputFormat[_, _]],

453

properties: Properties

454

)

455

456

// ORC search argument for predicate pushdown

457

trait SearchArgument {

458

def getLeaves(): java.util.List[PredicateLeaf]

459

def getExpression(): ExpressionTree

460

}

461

462

// File status information

463

case class FileStatus(

464

path: String,

465

length: Long,

466

isDirectory: Boolean,

467

blockReplication: Short,

468

blockSize: Long,

469

modificationTime: Long,

470

accessTime: Long

471

)

472

```