or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md

file-formats.mddocs/

0

# File Format Support

1

2

Support for Hive-compatible file formats, particularly ORC files with Hive metadata integration.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.execution.datasources.FileFormat

8

import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

9

import org.apache.spark.sql.hive.orc.OrcFileOperator

10

import org.apache.spark.sql.hive.execution.HiveFileFormat

11

import org.apache.spark.sql.sources.DataSourceRegister

12

```

13

14

## Capabilities

15

16

### ORC File Format

17

18

Primary file format implementation for reading and writing ORC files with full Hive compatibility.

19

20

```scala { .api }

21

class OrcFileFormat extends FileFormat with DataSourceRegister {

22

23

/**

24

* Infer schema from ORC files

25

* @param sparkSession Active Spark session

26

* @param options Format-specific options

27

* @param files Sequence of file statuses to analyze

28

* @returns Inferred schema or None if cannot infer

29

*/

30

def inferSchema(

31

sparkSession: SparkSession,

32

options: Map[String, String],

33

files: Seq[FileStatus]

34

): Option[StructType]

35

36

/**

37

* Prepare write operations for ORC format

38

* @param sparkSession Active Spark session

39

* @param job Hadoop job configuration

40

* @param options Write options and settings

41

* @param dataSchema Schema of data to write

42

* @returns OutputWriterFactory for creating writers

43

*/

44

def prepareWrite(

45

sparkSession: SparkSession,

46

job: Job,

47

options: Map[String, String],

48

dataSchema: StructType

49

): OutputWriterFactory

50

51

/**

52

* Build reader for scanning ORC files

53

* @param sparkSession Active Spark session

54

* @param dataSchema Schema of data in files

55

* @param partitionSchema Schema of partition columns

56

* @param requiredSchema Schema of required columns

57

* @param filters Push-down filters for optimization

58

* @param options Read options and settings

59

* @param hadoopConf Hadoop configuration

60

* @returns Function to create PartitionedFile readers

61

*/

62

def buildReader(

63

sparkSession: SparkSession,

64

dataSchema: StructType,

65

partitionSchema: StructType,

66

requiredSchema: StructType,

67

filters: Seq[Filter],

68

options: Map[String, String],

69

hadoopConf: Configuration

70

): PartitionedFile => Iterator[InternalRow]

71

72

/** Short name for this data source format */

73

def shortName(): String = "orc"

74

}

75

```

76

77

### ORC File Operations

78

79

Utility operations for ORC file handling and metadata.

80

81

```scala { .api }

82

object OrcFileOperator extends Logging {

83

84

/**

85

* Read schema from ORC file footer

86

* @param file Path to ORC file

87

* @param conf Hadoop configuration

88

* @param ignoreCorruptFiles Whether to ignore corrupt files

89

* @returns Tuple of (schema, user metadata)

90

*/

91

def readSchema(

92

file: Path,

93

conf: Configuration,

94

ignoreCorruptFiles: Boolean

95

): Option[(StructType, Map[String, String])]

96

97

/**

98

* Read ORC file metadata including statistics

99

* @param files Sequence of ORC files to read

100

* @param conf Hadoop configuration

101

* @returns Aggregated file metadata

102

*/

103

def readFileMetadata(

104

files: Seq[Path],

105

conf: Configuration

106

): Map[String, String]

107

}

108

```

109

110

### Hive File Format Integration

111

112

Integration layer for Hive-specific file format operations.

113

114

```scala { .api }

115

private[hive] class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {

116

117

/**

118

* Prepare write using Hive OutputFormat

119

* @param sparkSession Active Spark session

120

* @param job Hadoop job for write operation

121

* @param options Write configuration options

122

* @param dataSchema Schema of data to be written

123

* @returns OutputWriterFactory using Hive serialization

124

*/

125

def prepareWrite(

126

sparkSession: SparkSession,

127

job: Job,

128

options: Map[String, String],

129

dataSchema: StructType

130

): OutputWriterFactory

131

132

/**

133

* Build reader using Hive InputFormat and SerDe

134

* @param sparkSession Active Spark session

135

* @param dataSchema Full schema of data files

136

* @param partitionSchema Schema of partition columns

137

* @param requiredSchema Schema of columns to read

138

* @param filters Filters to push down to storage

139

* @param options Read configuration options

140

* @param hadoopConf Hadoop configuration

141

* @returns Function to read PartitionedFile

142

*/

143

def buildReader(

144

sparkSession: SparkSession,

145

dataSchema: StructType,

146

partitionSchema: StructType,

147

requiredSchema: StructType,

148

filters: Seq[Filter],

149

options: Map[String, String],

150

hadoopConf: Configuration

151

): PartitionedFile => Iterator[InternalRow]

152

}

153

```

154

155

### Java Integration Classes

156

157

Low-level Java classes for ORC integration.

158

159

```scala { .api }

160

// Note: These are Java classes with Scala signatures for documentation

161

162

/**

163

* Custom ORC record reader optimized for Spark

164

*/

165

class SparkOrcNewRecordReader extends RecordReader[NullWritable, VectorizedRowBatch] {

166

167

/**

168

* Initialize the record reader

169

* @param inputSplit Input split to read

170

* @param context Task attempt context

171

*/

172

def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit

173

174

/**

175

* Read next batch of records

176

* @returns true if more records available

177

*/

178

def nextKeyValue(): Boolean

179

180

/**

181

* Get current key (always null for ORC)

182

* @returns NullWritable key

183

*/

184

def getCurrentKey(): NullWritable

185

186

/**

187

* Get current batch of records

188

* @returns VectorizedRowBatch containing records

189

*/

190

def getCurrentValue(): VectorizedRowBatch

191

192

/**

193

* Get reading progress as percentage

194

* @returns Progress between 0.0 and 1.0

195

*/

196

def getProgress(): Float

197

198

/** Close the record reader */

199

def close(): Unit

200

}

201

202

/**

203

* Input format for handling symlinked text files

204

*/

205

class DelegateSymlinkTextInputFormat extends TextInputFormat {

206

207

/**

208

* Get input splits for symlinked files

209

* @param job Job configuration

210

* @returns Array of input splits

211

*/

212

def getSplits(job: JobContext): java.util.List[InputSplit]

213

}

214

```

215

216

## Usage Examples

217

218

### Reading ORC Files

219

220

```scala

221

import org.apache.spark.sql.SparkSession

222

223

val spark = SparkSession.builder()

224

.enableHiveSupport()

225

.getOrCreate()

226

227

// Read ORC files directly

228

val orcDF = spark.read

229

.format("orc")

230

.option("mergeSchema", "true")

231

.load("/path/to/orc/files")

232

233

orcDF.printSchema()

234

orcDF.show()

235

236

// Read Hive ORC table

237

val hiveOrcTable = spark.sql("SELECT * FROM hive_orc_table")

238

hiveOrcTable.explain(true)

239

```

240

241

### Writing ORC Files

242

243

```scala

244

import org.apache.spark.sql.SaveMode

245

246

// Create sample data

247

val data = Seq(

248

(1, "Alice", 25),

249

(2, "Bob", 30),

250

(3, "Charlie", 35)

251

).toDF("id", "name", "age")

252

253

// Write as ORC with Hive compatibility

254

data.write

255

.mode(SaveMode.Overwrite)

256

.option("compression", "snappy")

257

.format("orc")

258

.save("/path/to/output/orc")

259

260

// Write to Hive table using ORC format

261

data.write

262

.mode(SaveMode.Overwrite)

263

.saveAsTable("my_database.orc_table")

264

```

265

266

### Schema Evolution and Merging

267

268

```scala

269

// Enable schema merging for ORC files

270

val mergedDF = spark.read

271

.format("orc")

272

.option("mergeSchema", "true")

273

.load("/path/to/orc/files/*")

274

275

// Handle schema evolution gracefully

276

val evolvedDF = spark.read

277

.format("orc")

278

.option("recursiveFileLookup", "true")

279

.load("/path/to/evolved/schema/files")

280

281

// Check for schema differences

282

mergedDF.printSchema()

283

evolvedDF.printSchema()

284

```

285

286

## Configuration Options

287

288

### ORC-Specific Options

289

290

```scala

291

// Read options

292

val orcOptions = Map(

293

"mergeSchema" -> "true", // Merge schemas from multiple files

294

"recursiveFileLookup" -> "true", // Recursively look for files

295

"ignoreCorruptFiles" -> "false", // Fail on corrupt files

296

"compression" -> "snappy" // Compression codec

297

)

298

299

// Write options

300

val writeOptions = Map(

301

"compression" -> "zlib", // zlib, snappy, lzo, lz4, none

302

"orc.compress" -> "SNAPPY", // ORC compression

303

"orc.stripe.size" -> "67108864", // 64MB stripe size

304

"orc.block.size" -> "268435456" // 256MB block size

305

)

306

```

307

308

### Hive Integration Settings

309

310

```scala

311

// Configure ORC conversion from Hive metastore

312

spark.conf.set("spark.sql.hive.convertMetastoreOrc", "true")

313

spark.conf.set("spark.sql.orc.impl", "native")

314

spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

315

```

316

317

## Error Handling

318

319

Common file format exceptions:

320

321

- **CorruptedFileException**: When ORC files are corrupted or unreadable

322

- **UnsupportedFileFormatException**: When file format is not supported

323

- **SchemaIncompatibleException**: When schemas cannot be merged or converted

324

325

```scala

326

import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

327

328

try {

329

val df = spark.read.format("orc").load("/path/to/corrupt/files")

330

df.count()

331

} catch {

332

case e: java.io.IOException if e.getMessage.contains("Malformed ORC file") =>

333

println("ORC file is corrupted")

334

case e: AnalysisException if e.getMessage.contains("Unable to infer schema") =>

335

println("Cannot determine schema from ORC files")

336

}

337

```

338

339

## Types

340

341

### File Format Types

342

343

```scala { .api }

344

case class OrcOptions(

345

parameters: Map[String, String]

346

) {

347

def mergeSchema: Boolean

348

def ignoreCorruptFiles: Boolean

349

def recursiveFileLookup: Boolean

350

def compression: String

351

}

352

353

```scala { .api }

354

trait FileFormat {

355

def inferSchema(

356

sparkSession: SparkSession,

357

options: Map[String, String],

358

files: Seq[FileStatus]

359

): Option[StructType]

360

361

def prepareWrite(

362

sparkSession: SparkSession,

363

job: Job,

364

options: Map[String, String],

365

dataSchema: StructType

366

): OutputWriterFactory

367

}

368

```