or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md

data-source.mddocs/

0

# Data Source Operations

1

2

Comprehensive file-based data source functionality for reading from and writing to Avro files using Spark's standard DataFrame read/write API. Supports automatic schema inference, configurable compression, and efficient distributed processing of Avro data.

3

4

## Capabilities

5

6

### Reading Avro Files

7

8

Read Avro files using Spark's DataFrameReader with automatic schema inference and configurable options.

9

10

```scala { .api }

11

// Basic read operations

12

spark.read.format("avro").load(path: String): DataFrame

13

spark.read.format("avro").load(paths: String*): DataFrame

14

15

// With options

16

spark.read.format("avro")

17

.option(key: String, value: String)

18

.load(path: String): DataFrame

19

```

20

21

**Usage Examples:**

22

23

```scala

24

import org.apache.spark.sql.SparkSession

25

26

val spark = SparkSession.builder()

27

.appName("Avro Reader Example")

28

.getOrCreate()

29

30

// Read single file

31

val df1 = spark.read

32

.format("avro")

33

.load("path/to/data.avro")

34

35

// Read multiple files

36

val df2 = spark.read

37

.format("avro")

38

.load("path/to/dir1", "path/to/dir2")

39

40

// Read with glob pattern

41

val df3 = spark.read

42

.format("avro")

43

.load("path/to/data/*.avro")

44

45

// Read with custom schema

46

val df4 = spark.read

47

.format("avro")

48

.option("avroSchema", customSchemaJson)

49

.load("path/to/data.avro")

50

51

// Read ignoring file extensions

52

val df5 = spark.read

53

.format("avro")

54

.option("ignoreExtension", "true")

55

.load("path/to/files_without_extension")

56

```

57

58

### Writing Avro Files

59

60

Write DataFrames to Avro files with configurable compression and schema options.

61

62

```scala { .api }

63

// Basic write operations

64

df.write.format("avro").save(path: String): Unit

65

df.write.format("avro").mode(saveMode: SaveMode).save(path: String): Unit

66

67

// With options

68

df.write.format("avro")

69

.option(key: String, value: String)

70

.mode(saveMode: SaveMode)

71

.save(path: String): Unit

72

```

73

74

**Usage Examples:**

75

76

```scala

77

import org.apache.spark.sql.SaveMode

78

79

val df = spark.table("source_data")

80

81

// Basic write

82

df.write

83

.format("avro")

84

.save("path/to/output")

85

86

// Write with compression

87

df.write

88

.format("avro")

89

.option("compression", "snappy")

90

.mode(SaveMode.Overwrite)

91

.save("path/to/compressed_output")

92

93

// Write with custom record name and namespace

94

df.write

95

.format("avro")

96

.option("recordName", "MyRecord")

97

.option("recordNamespace", "com.example.data")

98

.mode(SaveMode.Append)

99

.save("path/to/named_output")

100

101

// Write with custom schema

102

val customSchema = """{

103

"type": "record",

104

"name": "CustomRecord",

105

"fields": [

106

{"name": "id", "type": "long"},

107

{"name": "value", "type": "string"}

108

]

109

}"""

110

111

df.write

112

.format("avro")

113

.option("avroSchema", customSchema)

114

.save("path/to/custom_schema_output")

115

```

116

117

## Data Source Options\n\nComprehensive list of options available for Avro read and write operations.\n\n### Available Options Table\n\n| Option | Default | Description | Scope | Example |\n|--------|---------|-------------|-------|---------|\n| `avroSchema` | None | Custom Avro schema in JSON format | Read/Write | `\"{\\\"type\\\": \\\"record\\\", ...}\"` |\n| `recordName` | `\"topLevelRecord\"` | Top-level record name in Avro schema | Write | `\"UserRecord\"` |\n| `recordNamespace` | `\"\"` | Record namespace in Avro schema | Write | `\"com.example.data\"` |\n| `ignoreExtension` | `true` | Ignore file extensions when reading | Read | `\"false\"` |\n| `compression` | `\"snappy\"` | Compression codec for write operations | Write | `\"deflate\"` |\n\n### Schema Inference

118

119

Automatic schema detection from Avro files with support for schema evolution.

120

121

```scala { .api }

122

// Schema inference behavior

123

val inferredSchema = spark.read.format("avro").load(path).schema

124

```

125

126

**Schema Inference Examples:**

127

128

```scala

129

// Inspect inferred schema

130

val df = spark.read.format("avro").load("path/to/data.avro")

131

df.printSchema()

132

133

// Schema inference with multiple files (uses first readable file)

134

val multiFileDF = spark.read

135

.format("avro")

136

.load("path/to/multiple/*.avro")

137

138

// Handle corrupt files during schema inference

139

val robustDF = spark.read

140

.format("avro")

141

.option("ignoreCorruptFiles", "true")

142

.load("path/to/possibly_corrupt/*.avro")

143

```

144

145

### File Splitting and Parallelism

146

147

Avro files are splittable, enabling efficient distributed processing.

148

149

```scala { .api }

150

// Avro files support splitting for parallel processing

151

val parallelDF = spark.read

152

.format("avro")

153

.load("large_avro_file.avro")

154

155

// Spark automatically splits files across partitions

156

val partitionCount = parallelDF.rdd.getNumPartitions

157

```

158

159

### Supported Read Options

160

161

Configuration options for customizing read behavior:

162

163

```scala { .api }

164

// Available read options

165

.option("avroSchema", jsonSchemaString) // Custom schema

166

.option("ignoreExtension", "true|false") // Ignore .avro extension requirement

167

.option("ignoreCorruptFiles", "true|false") // Skip corrupt files during processing

168

```

169

170

### Supported Write Options

171

172

Configuration options for customizing write behavior:

173

174

```scala { .api }

175

// Available write options

176

.option("avroSchema", jsonSchemaString) // Custom output schema

177

.option("recordName", recordName) // Top-level record name

178

.option("recordNamespace", namespace) // Record namespace

179

.option("compression", compressionCodec) // Compression: snappy, deflate, bzip2, xz, uncompressed

180

```

181

182

### Compression Support

183

184

Multiple compression codecs are supported for write operations:

185

186

**Supported Compression Codecs:**

187

188

```scala

189

// Compression options

190

"snappy" // Default - balanced compression and speed

191

"deflate" // Good compression ratio, configurable level

192

"bzip2" // High compression ratio, slower

193

"xz" // High compression ratio, slower

194

"uncompressed" // No compression

195

```

196

197

**Compression Examples:**

198

199

```scala

200

// Snappy compression (default)

201

df.write.format("avro").save("snappy_output")

202

203

// Deflate with custom level (configured via Spark config)

204

spark.conf.set("spark.sql.avro.deflate.level", "6")

205

df.write

206

.format("avro")

207

.option("compression", "deflate")

208

.save("deflate_output")

209

210

// High compression with bzip2

211

df.write

212

.format("avro")

213

.option("compression", "bzip2")

214

.save("bzip2_output")

215

```

216

217

### Error Handling

218

219

Common error scenarios and handling strategies:

220

221

```scala

222

// Handle missing files

223

try {

224

val df = spark.read.format("avro").load("missing_file.avro")

225

} catch {

226

case e: org.apache.spark.sql.AnalysisException =>

227

println(s"File not found: ${e.getMessage}")

228

}

229

230

// Handle schema evolution conflicts

231

try {

232

val df = spark.read

233

.format("avro")

234

.option("avroSchema", strictSchema)

235

.load("evolved_schema_files/*.avro")

236

} catch {

237

case e: Exception =>

238

println(s"Schema compatibility issue: ${e.getMessage}")

239

}

240

```