or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-functions.mdconfiguration.mdfile-operations.mdindex.mdschema-conversion.md

file-operations.mddocs/

0

# File Operations

1

2

The Spark Avro connector provides DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference, partitioning support, and optimized I/O performance.

3

4

## Reading Avro Files

5

6

### Basic File Reading

7

8

```scala { .api }

9

spark.read.format("avro").load(path: String): DataFrame

10

spark.read.format("avro").load(paths: String*): DataFrame

11

```

12

13

**Basic Usage:**

14

15

```scala

16

import org.apache.spark.sql.SparkSession

17

18

val spark = SparkSession.builder()

19

.appName("AvroReader")

20

.getOrCreate()

21

22

// Read single file or directory

23

val df = spark.read.format("avro").load("path/to/file.avro")

24

25

// Read multiple paths

26

val df2 = spark.read.format("avro").load(

27

"path/to/file1.avro",

28

"path/to/file2.avro",

29

"path/to/directory"

30

)

31

```

32

33

### Schema Specification

34

35

```scala { .api }

36

spark.read.format("avro").schema(schema: StructType): DataFrameReader

37

```

38

39

**Usage with Custom Schema:**

40

41

```scala

42

import org.apache.spark.sql.types._

43

44

val customSchema = StructType(Seq(

45

StructField("id", LongType, nullable = false),

46

StructField("name", StringType, nullable = true),

47

StructField("email", StringType, nullable = true)

48

))

49

50

val df = spark.read

51

.format("avro")

52

.schema(customSchema)

53

.load("path/to/avro/files")

54

```

55

56

### Reading Options

57

58

```scala { .api }

59

spark.read.format("avro").option(key: String, value: String): DataFrameReader

60

spark.read.format("avro").options(options: Map[String, String]): DataFrameReader

61

```

62

63

**Available Options:**

64

65

- `avroSchema`: Specify evolved Avro schema for reading

66

- `avroSchemaUrl`: URL to load Avro schema from

67

- `mode`: Parse mode for handling corrupt records (`PERMISSIVE`, `DROPMALFORMED`, `FAILFAST`)

68

- `positionalFieldMatching`: Match fields by position instead of name

69

- `datetimeRebaseMode`: Rebase DATE/TIMESTAMP values (`EXCEPTION`, `LEGACY`, `CORRECTED`)

70

71

**Usage Example:**

72

73

```scala

74

val df = spark.read

75

.format("avro")

76

.option("mode", "DROPMALFORMED")

77

.option("avroSchema", evolvedSchemaJson)

78

.option("positionalFieldMatching", "true")

79

.load("path/to/avro/files")

80

```

81

82

## Writing Avro Files

83

84

### Basic File Writing

85

86

```scala { .api }

87

DataFrame.write.format("avro").save(path: String): Unit

88

DataFrame.write.format("avro").save(): DataFrameWriter[Row]

89

```

90

91

**Basic Usage:**

92

93

```scala

94

// Write DataFrame to Avro files

95

df.write

96

.format("avro")

97

.save("path/to/output")

98

99

// Write with mode specification

100

df.write

101

.format("avro")

102

.mode("overwrite")

103

.save("path/to/output")

104

```

105

106

### Write Modes

107

108

```scala { .api }

109

DataFrame.write.format("avro").mode(saveMode: String): DataFrameWriter[Row]

110

DataFrame.write.format("avro").mode(saveMode: SaveMode): DataFrameWriter[Row]

111

```

112

113

Supported modes:

114

- `overwrite`: Overwrite existing data

115

- `append`: Append to existing data

116

- `ignore`: Ignore if data exists

117

- `error` (default): Throw error if data exists

118

119

### Partitioning

120

121

```scala { .api }

122

DataFrame.write.format("avro").partitionBy(colNames: String*): DataFrameWriter[Row]

123

```

124

125

**Usage Example:**

126

127

```scala

128

df.write

129

.format("avro")

130

.partitionBy("year", "month")

131

.save("path/to/partitioned/output")

132

```

133

134

### Writing Options

135

136

```scala { .api }

137

DataFrame.write.format("avro").option(key: String, value: String): DataFrameWriter[Row]

138

DataFrame.write.format("avro").options(options: Map[String, String]): DataFrameWriter[Row]

139

```

140

141

**Available Options:**

142

143

- `compression`: Compression codec (`uncompressed`, `snappy`, `deflate`, `bzip2`, `xz`, `zstandard`)

144

- `recordName`: Top-level record name (default: "topLevelRecord")

145

- `recordNamespace`: Record namespace (default: "")

146

- `avroSchema`: Custom output Avro schema

147

148

**Usage Example:**

149

150

```scala

151

df.write

152

.format("avro")

153

.option("compression", "snappy")

154

.option("recordName", "UserRecord")

155

.option("recordNamespace", "com.example.avro")

156

.save("path/to/compressed/output")

157

```

158

159

## Advanced Features

160

161

### Schema Evolution

162

163

Reading with evolved schemas allows you to process Avro files with compatible but different schemas:

164

165

```scala

166

val evolvedSchema = """

167

{

168

"type": "record",

169

"name": "User",

170

"fields": [

171

{"name": "id", "type": "long"},

172

{"name": "name", "type": "string"},

173

{"name": "email", "type": "string"},

174

{"name": "created_at", "type": "long", "default": 0}

175

]

176

}

177

"""

178

179

val df = spark.read

180

.format("avro")

181

.option("avroSchema", evolvedSchema)

182

.load("path/to/older/avro/files")

183

```

184

185

### Predicate Pushdown

186

187

The connector supports predicate pushdown for efficient querying:

188

189

```scala

190

val filteredDF = spark.read

191

.format("avro")

192

.load("path/to/large/dataset")

193

.filter($"created_date" >= "2023-01-01")

194

.filter($"status" === "active")

195

```

196

197

### Compression Support

198

199

Automatic compression detection on read and configurable compression on write:

200

201

```scala

202

// Writing with different compression codecs

203

df.write.format("avro").option("compression", "snappy").save("snappy-output")

204

df.write.format("avro").option("compression", "deflate").save("deflate-output")

205

df.write.format("avro").option("compression", "bzip2").save("bzip2-output")

206

```

207

208

## Error Handling

209

210

### Corrupt Record Handling

211

212

```scala

213

// Drop corrupt records

214

val cleanDF = spark.read

215

.format("avro")

216

.option("mode", "DROPMALFORMED")

217

.load("path/to/files")

218

219

// Collect corrupt records in a column

220

val dfWithCorrupt = spark.read

221

.format("avro")

222

.option("mode", "PERMISSIVE")

223

.option("columnNameOfCorruptRecord", "_corrupt_record")

224

.load("path/to/files")

225

```

226

227

### File Extension Handling

228

229

```scala

230

// Include files without .avro extension

231

val df = spark.read

232

.format("avro")

233

.option("ignoreExtension", "true") // deprecated - use pathGlobFilter instead

234

.load("path/to/mixed/files")

235

236

// Modern approach using pathGlobFilter

237

val df2 = spark.read

238

.format("avro")

239

.option("pathGlobFilter", "*.data")

240

.load("path/to/files")

241

```

242

243

## DataSource V2 Integration

244

245

The connector also provides DataSource V2 implementation for enhanced performance:

246

247

```scala { .api }

248

class AvroDataSourceV2 extends FileDataSourceV2 {

249

def shortName(): String // Returns "avro"

250

def getTable(options: CaseInsensitiveStringMap): Table

251

def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table

252

}

253

```

254

255

The DataSource V2 API provides the same functionality with improved performance characteristics and better integration with Spark's Catalyst optimizer.

256

257

### Supporting V2 Classes

258

259

```scala { .api }

260

class AvroTable extends FileTable

261

class AvroScan extends FileScan

262

class AvroScanBuilder extends FileScanBuilder

263

class AvroPartitionReaderFactory extends FilePartitionReaderFactory

264

class AvroWrite extends FileWrite

265

```

266

267

These classes work together to provide the complete DataSource V2 implementation, handling table metadata, scan planning, partition reading, and write operations.