or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md

functions.mddocs/

0

# Avro Functions

1

2

The Avro functions provide high-level operations for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping and error handling.

3

4

Available in both Scala and Python APIs with equivalent functionality.

5

6

## Core Functions

7

8

### from_avro

9

10

Converts binary Avro data to Catalyst rows using a specified Avro schema.

11

12

```scala { .api }

13

def from_avro(data: Column, jsonFormatSchema: String): Column

14

```

15

16

```python { .api }

17

def from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> Column

18

```

19

20

**Parameters:**

21

- `data`: Column containing binary Avro data

22

- `jsonFormatSchema`: Avro schema in JSON string format

23

- `options` (Python only): Optional dictionary of parsing options

24

25

**Returns:** Column with deserialized Catalyst data matching the Avro schema

26

27

**Usage Example:**

28

29

Scala:

30

```scala

31

import org.apache.spark.sql.avro.functions._

32

import org.apache.spark.sql.functions.col

33

34

val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""

35

val df = spark.read.format("avro").load("users.avro")

36

val decodedDf = df.select(from_avro(col("binary_data"), avroSchema).as("user"))

37

```

38

39

Python:

40

```python

41

from pyspark.sql.avro.functions import from_avro

42

from pyspark.sql.functions import col

43

44

avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'

45

df = spark.read.format("avro").load("users.avro")

46

decoded_df = df.select(from_avro(col("binary_data"), avro_schema).alias("user"))

47

```

48

49

### from_avro (with options)

50

51

Converts binary Avro data to Catalyst rows with additional parsing options.

52

53

```scala { .api }

54

def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column

55

```

56

57

**Parameters:**

58

- `data`: Column containing binary Avro data

59

- `jsonFormatSchema`: Avro schema in JSON string format

60

- `options`: Map of parsing options (mode, dateTimeRebaseMode, etc.)

61

62

**Returns:** Column with deserialized Catalyst data

63

64

**Usage Example:**

65

```scala

66

import java.util.{Map => JMap}

67

import scala.jdk.CollectionConverters._

68

69

val options: JMap[String, String] = Map(

70

"mode" -> "PERMISSIVE",

71

"dateTimeRebaseMode" -> "CORRECTED"

72

).asJava

73

74

val decodedDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))

75

```

76

77

### to_avro

78

79

Converts Catalyst data to binary Avro format using the data's inferred schema.

80

81

```scala { .api }

82

def to_avro(data: Column): Column

83

```

84

85

```python { .api }

86

def to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> Column

87

```

88

89

**Parameters:**

90

- `data`: Column containing Catalyst data to serialize

91

- `jsonFormatSchema` (Python): Optional output Avro schema in JSON string format

92

93

**Returns:** Column with binary Avro data

94

95

**Usage Example:**

96

97

Scala:

98

```scala

99

val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))

100

```

101

102

Python:

103

```python

104

from pyspark.sql.avro.functions import to_avro

105

from pyspark.sql.functions import col

106

107

encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))

108

```

109

110

### to_avro (with schema)

111

112

Converts Catalyst data to binary Avro format using a specified output schema.

113

114

```scala { .api }

115

def to_avro(data: Column, jsonFormatSchema: String): Column

116

```

117

118

**Parameters:**

119

- `data`: Column containing Catalyst data to serialize

120

- `jsonFormatSchema`: Target Avro schema in JSON string format

121

122

**Returns:** Column with binary Avro data conforming to the specified schema

123

124

**Usage Example:**

125

```scala

126

val outputSchema = """{"type":"record","name":"UserOutput","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""

127

val encodedDf = df.select(to_avro(col("user_struct"), outputSchema).as("avro_data"))

128

```

129

130

### schema_of_avro

131

132

Returns the DDL-formatted Spark SQL schema corresponding to an Avro schema.

133

134

```scala { .api }

135

def schema_of_avro(jsonFormatSchema: String): Column

136

```

137

138

**Parameters:**

139

- `jsonFormatSchema`: Avro schema in JSON string format

140

141

**Returns:** Column containing the DDL schema string

142

143

**Usage Example:**

144

```scala

145

val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS spark_schema")

146

schemaDf.show(false)

147

// +--------------------------------------------------+

148

// |spark_schema |

149

// +--------------------------------------------------+

150

// |struct<name:string,age:int> |

151

// +--------------------------------------------------+

152

```

153

154

### schema_of_avro (with options)

155

156

Returns the DDL-formatted Spark SQL schema with additional parsing options.

157

158

```scala { .api }

159

def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Column

160

```

161

162

**Parameters:**

163

- `jsonFormatSchema`: Avro schema in JSON string format

164

- `options`: Map of schema conversion options

165

166

**Returns:** Column containing the DDL schema string

167

168

**Usage Example:**

169

```scala

170

val options: JMap[String, String] = Map(

171

"enableStableIdentifiersForUnionType" -> "true",

172

"recursiveFieldMaxDepth" -> "10"

173

).asJava

174

175

val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema")

176

```

177

178

**Note:** `schema_of_avro` functions are only available in Scala API. Python users should use Spark SQL to access these functions:

179

180

```python

181

# Python - use SQL to access schema_of_avro

182

schema_df = spark.sql(f"SELECT schema_of_avro('{avro_schema}') AS spark_schema")

183

```

184

185

## Deprecated Functions

186

187

### Legacy Package Functions

188

189

The following functions are deprecated and maintained for backward compatibility. Use the functions from `org.apache.spark.sql.avro.functions` instead.

190

191

```scala { .api }

192

// DEPRECATED: Use org.apache.spark.sql.avro.functions.from_avro instead

193

@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")

194

def from_avro(data: Column, jsonFormatSchema: String): Column

195

196

// DEPRECATED: Use org.apache.spark.sql.avro.functions.to_avro instead

197

@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")

198

def to_avro(data: Column): Column

199

```

200

201

## Error Handling

202

203

All functions support different error handling modes through the `mode` option:

204

205

- **PERMISSIVE** (default): Sets corrupt records to null and continues processing

206

- **DROPMALFORMED**: Ignores corrupt records completely

207

- **FAILFAST**: Throws exception on first corrupt record

208

209

```scala

210

val options: JMap[String, String] = Map("mode" -> "FAILFAST").asJava

211

val strictDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))

212

```

213

214

## Type Mapping

215

216

The functions automatically handle conversion between Avro types and Spark SQL types:

217

218

- **Avro primitive types** → Spark SQL primitive types (string, int, long, float, double, boolean, bytes)

219

- **Avro records** → Spark SQL structs

220

- **Avro arrays** → Spark SQL arrays

221

- **Avro maps** → Spark SQL maps

222

- **Avro unions** → Spark SQL structs with nullable fields

223

- **Avro enums** → Spark SQL strings

224

- **Avro logical types** → Appropriate Spark SQL types (timestamps, dates, decimals)

225

226

## Exception Handling

227

228

The Avro functions may throw specific exceptions for schema and type incompatibilities:

229

230

### Exception Classes

231

232

```scala { .api }

233

// Schema incompatibility errors (internal)

234

class IncompatibleSchemaException extends RuntimeException

235

236

// Unsupported Avro type errors (internal)

237

class UnsupportedAvroTypeException extends RuntimeException

238

```

239

240

**Common Error Scenarios:**

241

- **IncompatibleSchemaException**: Thrown when provided schema is incompatible with data

242

- **UnsupportedAvroTypeException**: Thrown when encountering unsupported Avro types

243

- **AnalysisException**: Thrown for invalid schema JSON or malformed parameters

244

- **SparkException**: Thrown for runtime errors during processing

245

246

**Error Handling Best Practices:**

247

```scala

248

import org.apache.spark.sql.AnalysisException

249

import org.apache.spark.SparkException

250

251

try {

252

val result = df.select(from_avro(col("data"), invalidSchema).as("parsed"))

253

result.collect()

254

} catch {

255

case e: AnalysisException =>

256

println(s"Schema validation failed: ${e.getMessage}")

257

case e: SparkException =>

258

println(s"Runtime processing error: ${e.getMessage}")

259

case e: Exception =>

260

println(s"Unexpected error: ${e.getMessage}")

261

}

262

```