0
# Binary Data Conversion
1
2
This document covers converting between binary Avro data and Spark DataFrame columns using the built-in functions.
3
4
## Core Functions
5
6
```scala { .api }
7
import org.apache.spark.sql.avro.functions._
8
import org.apache.spark.sql.Column
9
10
/**
11
* Converts a binary column of Avro format into its corresponding Catalyst value.
12
* @param data the binary column containing Avro data
13
* @param jsonFormatSchema the Avro schema in JSON string format
14
* @return Column with decoded Catalyst data
15
*/
16
def from_avro(data: Column, jsonFormatSchema: String): Column
17
18
/**
19
* Converts a binary column of Avro format with options.
20
* @param data the binary column containing Avro data
21
* @param jsonFormatSchema the Avro schema in JSON string format
22
* @param options options to control parsing behavior
23
* @return Column with decoded Catalyst data
24
*/
25
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
26
27
/**
28
* Converts a column into binary Avro format.
29
* @param data the data column to encode
30
* @return Column with binary Avro data
31
*/
32
def to_avro(data: Column): Column
33
34
/**
35
* Converts a column into binary Avro format with custom schema.
36
* @param data the data column to encode
37
* @param jsonFormatSchema user-specified output Avro schema in JSON format
38
* @return Column with binary Avro data
39
*/
40
def to_avro(data: Column, jsonFormatSchema: String): Column
41
```
42
43
## Basic Usage
44
45
### Decoding Binary Avro Data
46
47
```scala
48
import org.apache.spark.sql.avro.functions._
49
import org.apache.spark.sql.functions._
50
51
val avroSchema = """
52
{
53
"type": "record",
54
"name": "User",
55
"fields": [
56
{"name": "id", "type": "long"},
57
{"name": "name", "type": "string"},
58
{"name": "email", "type": ["null", "string"], "default": null}
59
]
60
}
61
"""
62
63
// Assuming df has a column 'avro_data' containing binary Avro data
64
val decodedDF = df.select(
65
from_avro(col("avro_data"), avroSchema).as("user_data")
66
)
67
68
// Extract individual fields
69
val expandedDF = decodedDF.select(
70
col("user_data.id").as("user_id"),
71
col("user_data.name").as("user_name"),
72
col("user_data.email").as("user_email")
73
)
74
```
75
76
### Encoding Data to Binary Avro
77
78
```scala
79
// Convert struct data to binary Avro
80
val encodedDF = df.select(
81
to_avro(struct(
82
col("id"),
83
col("name"),
84
col("email")
85
)).as("avro_data")
86
)
87
88
// With custom schema
89
val outputSchema = """
90
{
91
"type": "record",
92
"name": "OutputUser",
93
"fields": [
94
{"name": "id", "type": "long"},
95
{"name": "name", "type": "string"}
96
]
97
}
98
"""
99
100
val customEncodedDF = df.select(
101
to_avro(struct(col("id"), col("name")), outputSchema).as("avro_data")
102
)
103
```
104
105
## Advanced Usage
106
107
### Using Conversion Options
108
109
```scala
110
import scala.collection.JavaConverters._
111
112
val options = Map(
113
"mode" -> "PERMISSIVE",
114
"datetimeRebaseMode" -> "CORRECTED"
115
).asJava
116
117
val decodedDF = df.select(
118
from_avro(col("avro_data"), avroSchema, options).as("decoded")
119
)
120
```
121
122
### Handling Complex Nested Data
123
124
```scala
125
val nestedSchema = """
126
{
127
"type": "record",
128
"name": "Order",
129
"fields": [
130
{"name": "orderId", "type": "long"},
131
{"name": "customer", "type": {
132
"type": "record",
133
"name": "Customer",
134
"fields": [
135
{"name": "id", "type": "long"},
136
{"name": "name", "type": "string"}
137
]
138
}},
139
{"name": "items", "type": {
140
"type": "array",
141
"items": {
142
"type": "record",
143
"name": "Item",
144
"fields": [
145
{"name": "productId", "type": "long"},
146
{"name": "quantity", "type": "int"},
147
{"name": "price", "type": "double"}
148
]
149
}
150
}}
151
]
152
}
153
"""
154
155
val decodedOrderDF = df.select(
156
from_avro(col("order_avro"), nestedSchema).as("order")
157
)
158
159
// Access nested fields
160
val flattenedDF = decodedOrderDF.select(
161
col("order.orderId"),
162
col("order.customer.name").as("customer_name"),
163
size(col("order.items")).as("item_count"),
164
col("order.items").as("items")
165
)
166
```
167
168
### Array and Map Conversion
169
170
```scala
171
val arraySchema = """
172
{
173
"type": "array",
174
"items": "string"
175
}
176
"""
177
178
val mapSchema = """
179
{
180
"type": "map",
181
"values": "int"
182
}
183
"""
184
185
// Decode array
186
val arrayDF = df.select(
187
from_avro(col("string_array_avro"), arraySchema).as("strings")
188
)
189
190
// Decode map
191
val mapDF = df.select(
192
from_avro(col("map_avro"), mapSchema).as("int_map")
193
)
194
195
// Encode array and map
196
val encodedCollectionsDF = df.select(
197
to_avro(col("string_array")).as("array_avro"),
198
to_avro(col("int_map")).as("map_avro")
199
)
200
```
201
202
## Union Type Handling
203
204
```scala
205
val unionSchema = """
206
{
207
"type": "record",
208
"name": "Event",
209
"fields": [
210
{"name": "id", "type": "long"},
211
{"name": "data", "type": [
212
"null",
213
{"type": "record", "name": "TextData", "fields": [{"name": "text", "type": "string"}]},
214
{"type": "record", "name": "NumericData", "fields": [{"name": "value", "type": "double"}]}
215
]}
216
]
217
}
218
"""
219
220
val unionOptions = Map(
221
"enableStableIdentifiersForUnionType" -> "true"
222
).asJava
223
224
val decodedUnionDF = df.select(
225
from_avro(col("event_avro"), unionSchema, unionOptions).as("event")
226
)
227
228
// Access union members (with stable identifiers)
229
val processedDF = decodedUnionDF.select(
230
col("event.id"),
231
when(col("event.data.member_textdata").isNotNull,
232
col("event.data.member_textdata.text"))
233
.when(col("event.data.member_numericdata").isNotNull,
234
col("event.data.member_numericdata.value").cast("string"))
235
.as("data_value")
236
)
237
```
238
239
## Error Handling and Parse Modes
240
241
### Parse Mode Options
242
243
```scala
244
// FAILFAST mode - throw exception on parsing errors
245
val failfastOptions = Map("mode" -> "FAILFAST").asJava
246
val strictDF = df.select(
247
from_avro(col("avro_data"), schema, failfastOptions).as("decoded")
248
)
249
250
// PERMISSIVE mode - set malformed records to null
251
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava
252
val lenientDF = df.select(
253
from_avro(col("avro_data"), schema, permissiveOptions).as("decoded")
254
)
255
256
// Filter out null results from parsing errors
257
val cleanDF = lenientDF.filter(col("decoded").isNotNull)
258
```
259
260
### Handling Invalid Schema
261
262
```scala
263
try {
264
val invalidSchema = """{"type": "invalid"}"""
265
val result = df.select(from_avro(col("data"), invalidSchema))
266
result.show()
267
} catch {
268
case e: AnalysisException =>
269
println(s"Schema validation failed: ${e.getMessage}")
270
}
271
```
272
273
## Performance Optimization
274
275
### Caching Decoded Data
276
```scala
277
// Cache expensive decode operations
278
val decodedDF = df.select(
279
from_avro(col("avro_data"), complexSchema).as("decoded")
280
).cache()
281
282
// Use cached result multiple times
283
val summary1 = decodedDF.groupBy("decoded.category").count()
284
val summary2 = decodedDF.filter(col("decoded.amount") > 100)
285
```
286
287
### Projection Pushdown
288
```scala
289
// Only decode needed fields to improve performance
290
val projectedSchema = """
291
{
292
"type": "record",
293
"name": "User",
294
"fields": [
295
{"name": "id", "type": "long"},
296
{"name": "name", "type": "string"}
297
]
298
}
299
"""
300
301
// Decode only required fields instead of full schema
302
val efficientDF = df.select(
303
from_avro(col("avro_data"), projectedSchema).as("user")
304
).select("user.id", "user.name")
305
```
306
307
## Integration with Streaming
308
309
### Structured Streaming Usage
310
```scala
311
val streamingDF = spark.readStream
312
.format("kafka")
313
.option("kafka.bootstrap.servers", "localhost:9092")
314
.option("subscribe", "avro-topic")
315
.load()
316
317
// Decode Avro data from Kafka value
318
val decodedStream = streamingDF.select(
319
from_avro(col("value"), avroSchema).as("decoded_data")
320
)
321
322
val query = decodedStream.writeStream
323
.format("console")
324
.outputMode("append")
325
.start()
326
```
327
328
### Encoding for Kafka Output
329
```scala
330
val encodedStream = processedDF.select(
331
col("key"),
332
to_avro(struct(col("*"))).as("value")
333
)
334
335
val kafkaQuery = encodedStream.writeStream
336
.format("kafka")
337
.option("kafka.bootstrap.servers", "localhost:9092")
338
.option("topic", "output-topic")
339
.start()
340
```
341
342
## SQL Function Usage
343
344
```sql
345
-- Register functions for SQL usage (functions are available by default)
346
SELECT
347
from_avro(avro_column, '{
348
"type": "record",
349
"name": "Data",
350
"fields": [
351
{"name": "id", "type": "long"},
352
{"name": "value", "type": "string"}
353
]
354
}') as decoded_data
355
FROM source_table;
356
357
-- Encode data to Avro
358
SELECT
359
to_avro(struct(id, name, email)) as avro_data
360
FROM user_table;
361
```