or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md

column-functions.mddocs/

0

# Column Functions

1

2

Core functions for working with Avro binary data in DataFrame columns. These functions enable conversion between Spark's internal Catalyst format and Avro binary format within SQL queries and DataFrame operations.

3

4

## Capabilities

5

6

### from_avro Function

7

8

Converts a binary column containing Avro-format data into its corresponding Catalyst (Spark SQL) value. The specified schema must match the read data.

9

10

```scala { .api }

11

/**

12

* Converts a binary column of avro format into its corresponding catalyst value.

13

*

14

* @param data the binary column containing Avro data

15

* @param jsonFormatSchema the avro schema in JSON string format

16

* @return Column with decoded Catalyst data

17

* @since 2.4.0

18

*/

19

@Experimental

20

def from_avro(data: Column, jsonFormatSchema: String): Column

21

```

22

23

**Usage Examples:**

24

25

```scala

26

import org.apache.spark.sql.avro._

27

import org.apache.spark.sql.functions.col

28

29

// Define Avro schema as JSON string

30

val userSchema = """{

31

"type": "record",

32

"name": "User",

33

"fields": [

34

{"name": "name", "type": "string"},

35

{"name": "age", "type": "int"},

36

{"name": "email", "type": ["null", "string"], "default": null}

37

]

38

}"""

39

40

// Convert binary Avro data to structured columns

41

val df = spark.table("avro_data")

42

val decodedDF = df.select(

43

from_avro(col("avro_bytes"), userSchema).as("user_data")

44

)

45

46

// Access nested fields from decoded data

47

val expandedDF = decodedDF.select(

48

col("user_data.name").as("user_name"),

49

col("user_data.age").as("user_age"),

50

col("user_data.email").as("user_email")

51

)

52

```

53

54

### to_avro Function

55

56

Converts a column into binary Avro format. The input column data is serialized according to its inferred or specified Avro schema.

57

58

```scala { .api }

59

/**

60

* Converts a column into binary of avro format.

61

*

62

* @param data the data column to convert

63

* @return Column with Avro binary data

64

* @since 2.4.0

65

*/

66

@Experimental

67

def to_avro(data: Column): Column

68

```

69

70

**Usage Examples:**

71

72

```scala

73

import org.apache.spark.sql.avro._

74

import org.apache.spark.sql.functions.{col, struct}

75

76

// Convert structured data to Avro binary

77

val df = spark.table("users")

78

val avroDF = df.select(

79

to_avro(struct(

80

col("name"),

81

col("age"),

82

col("email")

83

)).as("avro_data")

84

)

85

86

// Store binary Avro data for later processing

87

avroDF.write

88

.format("parquet")

89

.save("path/to/avro_binary_data")

90

91

// Convert entire row to Avro

92

val rowAsAvroDF = df.select(

93

to_avro(struct(col("*"))).as("row_as_avro")

94

)

95

```

96

97

### Error Handling

98

99

Both functions handle schema mismatches and malformed data:

100

101

- **`from_avro`**: Throws runtime exceptions for schema mismatches or corrupted binary data

102

- **`to_avro`**: May throw `IncompatibleSchemaException` for unsupported data type conversions

103

104

**Common Error Scenarios:**

105

106

```scala

107

// Schema mismatch - will throw runtime exception

108

val wrongSchema = """{"type": "string"}"""

109

val df = spark.table("complex_avro_data")

110

// This will fail if avro_bytes contains record data

111

val failingDF = df.select(from_avro(col("avro_bytes"), wrongSchema))

112

113

// Unsupported type conversion

114

import org.apache.spark.sql.types._

115

val unsupportedDF = spark.range(10).select(

116

// This may fail for complex nested structures not supported by Avro

117

to_avro(col("some_complex_column"))

118

)

119

```

120

121

## Expression Implementation Details

122

123

### AvroDataToCatalyst Expression

124

125

The `from_avro` function is implemented as a code-generated Catalyst expression:

126

127

```scala { .api }

128

case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)

129

extends UnaryExpression with ExpectsInputTypes {

130

131

override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

132

override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType

133

override def nullable: Boolean = true

134

override def prettyName: String = "from_avro"

135

}

136

```

137

138

### CatalystDataToAvro Expression

139

140

The `to_avro` function is implemented as a code-generated Catalyst expression:

141

142

```scala { .api }

143

case class CatalystDataToAvro(child: Expression) extends UnaryExpression {

144

override def dataType: DataType = BinaryType

145

override def prettyName: String = "to_avro"

146

}

147

```