or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-conversion.mdconfiguration.mddata-types.mdfile-operations.mdindex.mdschema-conversion.md

file-operations.mddocs/

0

# File Operations

1

2

This document covers reading and writing Avro files using the Spark Avro data source.

3

4

## Reading Avro Files

5

6

### Basic Reading

7

8

```scala

9

val df = spark.read

10

.format("avro")

11

.load("path/to/avro/files")

12

```

13

14

### Reading with Schema Evolution

15

16

```scala

17

val evolvedSchema = """

18

{

19

"type": "record",

20

"name": "User",

21

"fields": [

22

{"name": "id", "type": "long"},

23

{"name": "name", "type": "string"},

24

{"name": "email", "type": ["null", "string"], "default": null}

25

]

26

}

27

"""

28

29

val df = spark.read

30

.format("avro")

31

.option("avroSchema", evolvedSchema)

32

.load("path/to/files")

33

```

34

35

### Reading Options

36

37

```scala

38

val df = spark.read

39

.format("avro")

40

.option("mode", "PERMISSIVE") // FAILFAST, PERMISSIVE, DROPMALFORMED

41

.option("ignoreCorruptFiles", "true")

42

.option("recursiveFileLookup", "true")

43

.load("path/to/files")

44

```

45

46

## Writing Avro Files

47

48

### Basic Writing

49

50

```scala { .api }

51

def write: DataFrameWriter[Row]

52

def format(source: String): DataFrameWriter[Row]

53

def option(key: String, value: String): DataFrameWriter[Row]

54

def save(path: String): Unit

55

```

56

57

```scala

58

df.write

59

.format("avro")

60

.save("path/to/output")

61

```

62

63

### Writing with Custom Schema

64

65

```scala

66

val outputSchema = """

67

{

68

"type": "record",

69

"name": "OutputRecord",

70

"namespace": "com.example",

71

"fields": [

72

{"name": "id", "type": "long"},

73

{"name": "data", "type": "string"}

74

]

75

}

76

"""

77

78

df.write

79

.format("avro")

80

.option("avroSchema", outputSchema)

81

.save("path/to/output")

82

```

83

84

### Writing with Compression

85

86

```scala

87

df.write

88

.format("avro")

89

.option("compression", "snappy") // snappy, deflate, bzip2, xz, zstandard, uncompressed

90

.save("path/to/output")

91

```

92

93

### Writing with Custom Record Naming

94

95

```scala

96

df.write

97

.format("avro")

98

.option("recordName", "MyRecord")

99

.option("recordNamespace", "com.example.data")

100

.save("path/to/output")

101

```

102

103

## Configuration Options

104

105

### Read Options

106

107

```scala { .api }

108

// Schema-related options

109

option("avroSchema", "JSON schema string") // Custom schema for reading

110

option("avroSchemaUrl", "path/to/schema.avsc") // Schema file location

111

112

// Parse mode options

113

option("mode", "FAILFAST|PERMISSIVE|DROPMALFORMED") // Error handling mode

114

115

// Field matching options

116

option("positionalFieldMatching", "true|false") // Match by position vs name

117

118

// DateTime handling

119

option("datetimeRebaseMode", "EXCEPTION|LEGACY|CORRECTED") // Calendar rebase mode

120

121

// Union type handling

122

option("enableStableIdentifiersForUnionType", "true|false") // Stable union field names

123

124

// File handling (deprecated)

125

option("ignoreExtension", "true|false") // Ignore .avro extension requirement

126

```

127

128

### Write Options

129

130

```scala { .api }

131

// Schema options

132

option("avroSchema", "JSON schema string") // Output schema specification

133

option("recordName", "string") // Top-level record name (default: "topLevelRecord")

134

option("recordNamespace", "string") // Record namespace (default: "")

135

136

// Compression options

137

option("compression", "codec") // snappy, deflate, bzip2, xz, zstandard, uncompressed

138

139

// Field matching

140

option("positionalFieldMatching", "true|false") // Position-based field matching

141

```

142

143

## Schema Inference

144

145

The Avro data source automatically infers schema from Avro file headers:

146

147

```scala

148

val inferredSchema = spark.read

149

.format("avro")

150

.load("path/to/files")

151

.schema

152

153

println(inferredSchema.treeString)

154

```

155

156

### Schema Evolution Support

157

158

```scala

159

// Original schema: {id: long, name: string}

160

// Evolved schema adds optional email field

161

val evolvedDF = spark.read

162

.format("avro")

163

.option("avroSchema", evolvedSchemaJson)

164

.load("original-files")

165

166

// New field will have null values for old records

167

evolvedDF.select("id", "name", "email").show()

168

```

169

170

## Performance Considerations

171

172

### Partitioning

173

```scala

174

df.write

175

.format("avro")

176

.partitionBy("year", "month")

177

.save("partitioned-output")

178

```

179

180

### Compression Performance

181

```scala

182

// Snappy: fast compression/decompression, moderate compression ratio

183

df.write.format("avro").option("compression", "snappy").save("output")

184

185

// Deflate: slower but better compression ratio

186

df.write.format("avro").option("compression", "deflate").save("output")

187

188

// ZSTD: best compression ratio, good performance

189

df.write.format("avro").option("compression", "zstandard").save("output")

190

```

191

192

### Splittable Files

193

Avro files are splittable by default, enabling parallel processing:

194

195

```scala

196

val parallelDF = spark.read

197

.format("avro")

198

.load("large-avro-files/*") // Automatically splits across partitions

199

```

200

201

## Error Handling

202

203

### Parse Modes

204

```scala

205

// FAILFAST: Throw exception on any parsing error

206

val strictDF = spark.read

207

.format("avro")

208

.option("mode", "FAILFAST")

209

.load("path/to/files")

210

211

// PERMISSIVE: Set malformed records to null, continue processing

212

val permissiveDF = spark.read

213

.format("avro")

214

.option("mode", "PERMISSIVE")

215

.load("path/to/files")

216

217

// DROPMALFORMED: Skip malformed records entirely

218

val droppedDF = spark.read

219

.format("avro")

220

.option("mode", "DROPMALFORMED")

221

.load("path/to/files")

222

```

223

224

### Corrupt File Handling

225

```scala

226

val df = spark.read

227

.format("avro")

228

.option("ignoreCorruptFiles", "true") // Skip corrupt files

229

.load("path/to/files")

230

```

231

232

## SQL Integration

233

234

### Creating Tables

235

```sql

236

-- Create table using Avro data source

237

CREATE TABLE avro_table

238

USING avro

239

OPTIONS (path "path/to/avro/files")

240

241

-- Create table with custom schema

242

CREATE TABLE custom_avro_table

243

USING avro

244

OPTIONS (

245

path "path/to/files",

246

avroSchema '{

247

"type": "record",

248

"name": "Record",

249

"fields": [

250

{"name": "id", "type": "long"},

251

{"name": "value", "type": "string"}

252

]

253

}'

254

)

255

```

256

257

### Querying

258

```sql

259

SELECT * FROM avro_table WHERE id > 100;

260

261

INSERT INTO avro_table VALUES (1, 'example');

262

```