or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md

datasource.mddocs/

0

# File DataSource

1

2

The Avro DataSource provides native Spark integration for reading and writing Avro files through the DataSource V2 API. It offers optimized performance with features like predicate pushdown, column pruning, and automatic schema inference.

3

4

## Reading Avro Files

5

6

### Basic Reading

7

8

```scala { .api }

9

val df = spark.read.format("avro").load(path: String)

10

val df = spark.read.format("avro").load(paths: String*)

11

```

12

13

**Usage Example:**

14

```scala

15

// Read single file

16

val df = spark.read.format("avro").load("path/to/file.avro")

17

18

// Read multiple files

19

val df = spark.read.format("avro").load("file1.avro", "file2.avro")

20

21

// Read directory of Avro files

22

val df = spark.read.format("avro").load("path/to/avro/directory")

23

```

24

25

### Reading with Schema

26

27

Specify a schema for consistent structure across files:

28

29

```scala { .api }

30

val df = spark.read.format("avro")

31

.option("avroSchema", jsonSchema)

32

.load(path)

33

```

34

35

**Usage Example:**

36

```scala

37

val userSchema = """{

38

"type": "record",

39

"name": "User",

40

"fields": [

41

{"name": "id", "type": "long"},

42

{"name": "name", "type": "string"},

43

{"name": "email", "type": ["null", "string"], "default": null}

44

]

45

}"""

46

47

val df = spark.read.format("avro")

48

.option("avroSchema", userSchema)

49

.load("users/*.avro")

50

```

51

52

### Reading with Options

53

54

```scala { .api }

55

val df = spark.read.format("avro")

56

.option("ignoreExtension", "true")

57

.option("mode", "PERMISSIVE")

58

.load(path)

59

```

60

61

**Common Read Options:**

62

- `ignoreExtension`: Read files regardless of extension (default: false)

63

- `mode`: Error handling mode (PERMISSIVE, DROPMALFORMED, FAILFAST)

64

- `avroSchema`: Override schema for reading

65

- `recursiveFieldMaxDepth`: Maximum recursion depth for nested fields

66

67

## Writing Avro Files

68

69

### Basic Writing

70

71

```scala { .api }

72

df.write.format("avro").save(path: String)

73

```

74

75

**Usage Example:**

76

```scala

77

val df = spark.range(1000).select($"id", ($"id" * 2).as("value"))

78

df.write.format("avro").save("output/numbers.avro")

79

```

80

81

### Writing with Compression

82

83

```scala { .api }

84

df.write.format("avro")

85

.option("compression", codec)

86

.save(path)

87

```

88

89

**Available Compression Codecs:**

90

- `uncompressed` (default)

91

- `snappy`

92

- `deflate`

93

- `bzip2`

94

- `xz`

95

- `zstandard`

96

97

**Usage Example:**

98

```scala

99

df.write.format("avro")

100

.option("compression", "snappy")

101

.save("output/compressed.avro")

102

```

103

104

### Writing with Custom Schema

105

106

```scala { .api }

107

df.write.format("avro")

108

.option("avroSchema", jsonSchema)

109

.save(path)

110

```

111

112

**Usage Example:**

113

```scala

114

val outputSchema = """{

115

"type": "record",

116

"name": "Output",

117

"namespace": "com.example",

118

"fields": [

119

{"name": "id", "type": "long"},

120

{"name": "value", "type": "double"}

121

]

122

}"""

123

124

df.write.format("avro")

125

.option("avroSchema", outputSchema)

126

.save("output/custom-schema.avro")

127

```

128

129

### Partitioned Writing

130

131

```scala { .api }

132

df.write.format("avro")

133

.partitionBy(columns: String*)

134

.save(path)

135

```

136

137

**Usage Example:**

138

```scala

139

val salesDf = spark.read.format("avro").load("sales.avro")

140

salesDf.write.format("avro")

141

.partitionBy("year", "month")

142

.option("compression", "snappy")

143

.save("partitioned-sales")

144

```

145

146

## Advanced Features

147

148

### Schema Evolution

149

150

The DataSource supports schema evolution when reading multiple files:

151

152

```scala

153

// Reads files with different but compatible schemas

154

val df = spark.read.format("avro")

155

.option("mode", "PERMISSIVE")

156

.load("evolving-schema/*.avro")

157

```

158

159

### Predicate Pushdown

160

161

Automatically pushes down filter predicates to reduce I/O:

162

163

```scala

164

val filteredDf = spark.read.format("avro")

165

.load("large-dataset.avro")

166

.filter($"age" > 21) // Filter pushed down to file scanning

167

```

168

169

### Column Pruning

170

171

Only reads required columns from files:

172

173

```scala

174

val projectedDf = spark.read.format("avro")

175

.load("users.avro")

176

.select("name", "email") // Only reads name and email columns

177

```

178

179

## SQL Integration

180

181

Use Avro files directly in SQL queries:

182

183

```scala

184

// Create temporary view

185

spark.read.format("avro").load("users.avro").createOrReplaceTempView("users")

186

187

// Query with SQL

188

spark.sql("SELECT name, age FROM users WHERE age > 18")

189

190

// Create external table

191

spark.sql("""

192

CREATE TABLE user_data

193

USING avro

194

LOCATION 'path/to/avro/files'

195

""")

196

```

197

198

## DataSource Configuration

199

200

### Write Options

201

202

```scala { .api }

203

Map(

204

"compression" -> "snappy", // Compression codec

205

"avroSchema" -> jsonSchema, // Custom output schema

206

"recordName" -> "MyRecord", // Record name for schema generation

207

"recordNamespace" -> "com.example" // Namespace for generated schema

208

)

209

```

210

211

### Read Options

212

213

```scala { .api }

214

Map(

215

"ignoreExtension" -> "true", // Ignore file extensions

216

"mode" -> "PERMISSIVE", // Error handling mode

217

"avroSchema" -> jsonSchema, // Override input schema

218

"recursiveFieldMaxDepth" -> "5" // Max recursion depth

219

)

220

```

221

222

## Performance Considerations

223

224

### Optimal File Sizes

225

226

- Target 128MB-1GB per file for best performance

227

- Use partitioning for large datasets

228

- Enable compression for storage efficiency

229

230

### Schema Inference

231

232

```scala

233

// Expensive - scans all files

234

val df1 = spark.read.format("avro").load("many-files/*.avro")

235

236

// Efficient - uses provided schema

237

val df2 = spark.read.format("avro")

238

.option("avroSchema", knownSchema)

239

.load("many-files/*.avro")

240

```

241

242

### Compression Trade-offs

243

244

- **Snappy**: Fast compression/decompression, moderate compression ratio

245

- **Deflate**: Better compression ratio, slower than Snappy

246

- **ZStandard**: Best compression ratio, good performance

247

- **Uncompressed**: Fastest I/O, largest storage requirements