0
# Column Functions
1
2
Core functions for working with Avro binary data in DataFrame columns. These functions enable conversion between Spark's internal Catalyst format and Avro binary format within SQL queries and DataFrame operations.
3
4
## Capabilities
5
6
### from_avro Function
7
8
Converts a binary column containing Avro-format data into its corresponding Catalyst (Spark SQL) value. The specified schema must match the read data.
9
10
```scala { .api }
11
/**
12
* Converts a binary column of avro format into its corresponding catalyst value.
13
*
14
* @param data the binary column containing Avro data
15
* @param jsonFormatSchema the avro schema in JSON string format
16
* @return Column with decoded Catalyst data
17
* @since 2.4.0
18
*/
19
@Experimental
20
def from_avro(data: Column, jsonFormatSchema: String): Column
21
```
22
23
**Usage Examples:**
24
25
```scala
26
import org.apache.spark.sql.avro._
27
import org.apache.spark.sql.functions.col
28
29
// Define Avro schema as JSON string
30
val userSchema = """{
31
"type": "record",
32
"name": "User",
33
"fields": [
34
{"name": "name", "type": "string"},
35
{"name": "age", "type": "int"},
36
{"name": "email", "type": ["null", "string"], "default": null}
37
]
38
}"""
39
40
// Convert binary Avro data to structured columns
41
val df = spark.table("avro_data")
42
val decodedDF = df.select(
43
from_avro(col("avro_bytes"), userSchema).as("user_data")
44
)
45
46
// Access nested fields from decoded data
47
val expandedDF = decodedDF.select(
48
col("user_data.name").as("user_name"),
49
col("user_data.age").as("user_age"),
50
col("user_data.email").as("user_email")
51
)
52
```
53
54
### to_avro Function
55
56
Converts a column into binary Avro format. The input column data is serialized according to its inferred or specified Avro schema.
57
58
```scala { .api }
59
/**
60
* Converts a column into binary of avro format.
61
*
62
* @param data the data column to convert
63
* @return Column with Avro binary data
64
* @since 2.4.0
65
*/
66
@Experimental
67
def to_avro(data: Column): Column
68
```
69
70
**Usage Examples:**
71
72
```scala
73
import org.apache.spark.sql.avro._
74
import org.apache.spark.sql.functions.{col, struct}
75
76
// Convert structured data to Avro binary
77
val df = spark.table("users")
78
val avroDF = df.select(
79
to_avro(struct(
80
col("name"),
81
col("age"),
82
col("email")
83
)).as("avro_data")
84
)
85
86
// Store binary Avro data for later processing
87
avroDF.write
88
.format("parquet")
89
.save("path/to/avro_binary_data")
90
91
// Convert entire row to Avro
92
val rowAsAvroDF = df.select(
93
to_avro(struct(col("*"))).as("row_as_avro")
94
)
95
```
96
97
### Error Handling
98
99
Both functions handle schema mismatches and malformed data:
100
101
- **`from_avro`**: Throws runtime exceptions for schema mismatches or corrupted binary data
102
- **`to_avro`**: May throw `IncompatibleSchemaException` for unsupported data type conversions
103
104
**Common Error Scenarios:**
105
106
```scala
107
// Schema mismatch - will throw runtime exception
108
val wrongSchema = """{"type": "string"}"""
109
val df = spark.table("complex_avro_data")
110
// This will fail if avro_bytes contains record data
111
val failingDF = df.select(from_avro(col("avro_bytes"), wrongSchema))
112
113
// Unsupported type conversion
114
import org.apache.spark.sql.types._
115
val unsupportedDF = spark.range(10).select(
116
// This may fail for complex nested structures not supported by Avro
117
to_avro(col("some_complex_column"))
118
)
119
```
120
121
## Expression Implementation Details
122
123
### AvroDataToCatalyst Expression
124
125
The `from_avro` function is implemented as a code-generated Catalyst expression:
126
127
```scala { .api }
128
case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
129
extends UnaryExpression with ExpectsInputTypes {
130
131
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
132
override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType
133
override def nullable: Boolean = true
134
override def prettyName: String = "from_avro"
135
}
136
```
137
138
### CatalystDataToAvro Expression
139
140
The `to_avro` function is implemented as a code-generated Catalyst expression:
141
142
```scala { .api }
143
case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
144
override def dataType: DataType = BinaryType
145
override def prettyName: String = "to_avro"
146
}
147
```