0
# Binary Data Functions
1
2
The Spark Avro connector provides SQL functions for converting between binary Avro data and Spark SQL structures. These functions enable processing of Avro-encoded columns within DataFrames, useful for working with message queues, event streams, and embedded binary Avro data.
3
4
## Core Functions
5
6
### from_avro Function
7
8
Converts binary Avro data to Spark SQL structures using a provided schema.
9
10
```scala { .api }
11
def from_avro(data: Column, jsonFormatSchema: String): Column
12
13
def from_avro(
14
data: Column,
15
jsonFormatSchema: String,
16
options: java.util.Map[String, String]
17
): Column
18
```
19
20
**Parameters:**
21
- `data`: Column containing binary Avro data (BinaryType)
22
- `jsonFormatSchema`: Avro schema in JSON string format
23
- `options`: Additional options for deserialization (optional)
24
25
**Returns:** Column with deserialized Spark SQL data structure
26
27
**Basic Usage:**
28
29
```scala
30
import org.apache.spark.sql.avro.functions._
31
import org.apache.spark.sql.functions._
32
33
val schema = """
34
{
35
"type": "record",
36
"name": "User",
37
"fields": [
38
{"name": "id", "type": "long"},
39
{"name": "name", "type": "string"},
40
{"name": "email", "type": ["null", "string"], "default": null}
41
]
42
}
43
"""
44
45
val df = spark.table("kafka_topic")
46
val decodedDF = df.select(
47
from_avro(col("value"), schema).as("user_data")
48
)
49
50
decodedDF.select(
51
col("user_data.id"),
52
col("user_data.name"),
53
col("user_data.email")
54
).show()
55
```
56
57
**Usage with Options:**
58
59
```scala
60
import scala.collection.JavaConverters._
61
62
val options = Map(
63
"mode" -> "PERMISSIVE",
64
"datetimeRebaseMode" -> "CORRECTED"
65
).asJava
66
67
val decodedDF = df.select(
68
from_avro(col("avro_bytes"), schema, options).as("parsed_data")
69
)
70
```
71
72
### to_avro Function
73
74
Converts Spark SQL data structures to binary Avro format.
75
76
```scala { .api }
77
def to_avro(data: Column): Column
78
79
def to_avro(data: Column, jsonFormatSchema: String): Column
80
```
81
82
**Parameters:**
83
- `data`: Column with Spark SQL data structure
84
- `jsonFormatSchema`: Target Avro schema in JSON format (optional)
85
86
**Returns:** Column with binary Avro data (BinaryType)
87
88
**Basic Usage:**
89
90
```scala
91
import org.apache.spark.sql.avro.functions._
92
import org.apache.spark.sql.functions._
93
94
val df = spark.table("users")
95
96
// Convert entire row to Avro
97
val avroDF = df.select(
98
to_avro(struct(col("*"))).as("avro_data")
99
)
100
101
// Convert specific columns to Avro
102
val avroDF2 = df.select(
103
col("id"),
104
to_avro(struct(
105
col("name"),
106
col("email"),
107
col("created_at")
108
)).as("user_avro")
109
)
110
```
111
112
**Usage with Custom Schema:**
113
114
```scala
115
val outputSchema = """
116
{
117
"type": "record",
118
"name": "UserOutput",
119
"namespace": "com.example",
120
"fields": [
121
{"name": "user_id", "type": "long"},
122
{"name": "full_name", "type": "string"},
123
{"name": "contact_email", "type": ["null", "string"]}
124
]
125
}
126
"""
127
128
val avroDF = df.select(
129
to_avro(
130
struct(
131
col("id").as("user_id"),
132
col("name").as("full_name"),
133
col("email").as("contact_email")
134
),
135
outputSchema
136
).as("formatted_avro")
137
)
138
```
139
140
## Advanced Usage Patterns
141
142
### Roundtrip Processing
143
144
Converting data from Avro to Spark SQL structures, processing, and back to Avro:
145
146
```scala
147
val schema = """
148
{
149
"type": "record",
150
"name": "Event",
151
"fields": [
152
{"name": "event_id", "type": "string"},
153
{"name": "timestamp", "type": "long"},
154
{"name": "user_id", "type": "long"},
155
{"name": "properties", "type": {"type": "map", "values": "string"}}
156
]
157
}
158
"""
159
160
val processedDF = df
161
.select(from_avro(col("event_data"), schema).as("event"))
162
.select(
163
col("event.event_id"),
164
col("event.timestamp"),
165
col("event.user_id"),
166
col("event.properties")
167
)
168
.filter(col("timestamp") > unix_timestamp() - 3600) // Last hour only
169
.withColumn("processed_at", current_timestamp())
170
.select(
171
to_avro(struct(
172
col("event_id"),
173
col("timestamp"),
174
col("user_id"),
175
col("properties"),
176
col("processed_at")
177
)).as("processed_event")
178
)
179
```
180
181
### Kafka Integration
182
183
Common pattern for processing Avro messages from Kafka:
184
185
```scala
186
val kafkaDF = spark
187
.readStream
188
.format("kafka")
189
.option("kafka.bootstrap.servers", "localhost:9092")
190
.option("subscribe", "avro-events")
191
.load()
192
193
val schema = """
194
{
195
"type": "record",
196
"name": "KafkaEvent",
197
"fields": [
198
{"name": "event_type", "type": "string"},
199
{"name": "payload", "type": "string"},
200
{"name": "metadata", "type": {"type": "map", "values": "string"}}
201
]
202
}
203
"""
204
205
val decodedDF = kafkaDF.select(
206
col("key").cast("string").as("message_key"),
207
from_avro(col("value"), schema).as("event_data"),
208
col("timestamp").as("kafka_timestamp")
209
).select(
210
col("message_key"),
211
col("event_data.event_type"),
212
col("event_data.payload"),
213
col("event_data.metadata"),
214
col("kafka_timestamp")
215
)
216
```
217
218
### Nested Schema Handling
219
220
Working with complex nested types:
221
222
```scala
223
val nestedSchema = """
224
{
225
"type": "record",
226
"name": "Order",
227
"fields": [
228
{"name": "order_id", "type": "string"},
229
{"name": "customer", "type": {
230
"type": "record",
231
"name": "Customer",
232
"fields": [
233
{"name": "id", "type": "long"},
234
{"name": "name", "type": "string"},
235
{"name": "address", "type": {
236
"type": "record",
237
"name": "Address",
238
"fields": [
239
{"name": "street", "type": "string"},
240
{"name": "city", "type": "string"},
241
{"name": "country", "type": "string"}
242
]
243
}}
244
]
245
}},
246
{"name": "items", "type": {
247
"type": "array",
248
"items": {
249
"type": "record",
250
"name": "Item",
251
"fields": [
252
{"name": "sku", "type": "string"},
253
{"name": "quantity", "type": "int"},
254
{"name": "price", "type": "double"}
255
]
256
}
257
}}
258
]
259
}
260
"""
261
262
val orderDF = df.select(
263
from_avro(col("order_data"), nestedSchema).as("order")
264
).select(
265
col("order.order_id"),
266
col("order.customer.name").as("customer_name"),
267
col("order.customer.address.city").as("city"),
268
size(col("order.items")).as("item_count"),
269
expr("aggregate(order.items, 0.0, (acc, item) -> acc + item.price * item.quantity)").as("total_amount")
270
)
271
```
272
273
## Options for from_avro
274
275
When using the three-parameter version of `from_avro`, the following options are supported:
276
277
### Mode Options
278
279
- `PERMISSIVE` (default): Parse all records, set corrupt records to null
280
- `DROPMALFORMED`: Drop records that don't match the schema
281
- `FAILFAST`: Throw exception on first corrupt record
282
283
```scala
284
val options = Map("mode" -> "DROPMALFORMED").asJava
285
val cleanDF = df.select(
286
from_avro(col("data"), schema, options).as("parsed")
287
).filter(col("parsed").isNotNull)
288
```
289
290
### DateTime Rebase Mode
291
292
Controls handling of DATE and TIMESTAMP values:
293
294
- `EXCEPTION`: Throw exception for dates that need rebasing
295
- `LEGACY`: Use legacy Julian calendar handling
296
- `CORRECTED`: Apply proleptic Gregorian calendar correction
297
298
```scala
299
val options = Map("datetimeRebaseMode" -> "CORRECTED").asJava
300
val df = sourceDF.select(
301
from_avro(col("event_data"), schema, options).as("event")
302
)
303
```
304
305
### Schema Evolution Support
306
307
The `avroSchema` option allows specifying an evolved schema different from the encoded data:
308
309
```scala
310
val evolvedSchema = """
311
{
312
"type": "record",
313
"name": "User",
314
"fields": [
315
{"name": "id", "type": "long"},
316
{"name": "name", "type": "string"},
317
{"name": "email", "type": "string"},
318
{"name": "phone", "type": ["null", "string"], "default": null}
319
]
320
}
321
"""
322
323
val options = Map("avroSchema" -> evolvedSchema).asJava
324
val df = oldDataDF.select(
325
from_avro(col("user_bytes"), originalSchema, options).as("user")
326
)
327
```
328
329
## Error Handling
330
331
### Schema Mismatch Errors
332
333
When schemas don't match the binary data:
334
335
```scala
336
import org.apache.spark.sql.AnalysisException
337
338
try {
339
val result = df.select(
340
from_avro(col("data"), invalidSchema).as("parsed")
341
).collect()
342
} catch {
343
case e: AnalysisException =>
344
println(s"Schema validation failed: ${e.getMessage}")
345
}
346
```
347
348
### Null Handling
349
350
Binary functions handle null input gracefully:
351
352
```scala
353
// null binary data results in null output
354
val dfWithNulls = df.select(
355
when(col("data").isNull, lit(null))
356
.otherwise(from_avro(col("data"), schema))
357
.as("parsed_data")
358
)
359
```
360
361
## Performance Considerations
362
363
### Caching Parsed Data
364
365
When repeatedly accessing parsed Avro data:
366
367
```scala
368
val parsedDF = df.select(
369
from_avro(col("avro_data"), schema).as("parsed")
370
).cache()
371
372
// Multiple operations on the same parsed data
373
val aggregated = parsedDF.groupBy("parsed.category").count()
374
val filtered = parsedDF.filter(col("parsed.amount") > 100)
375
```
376
377
### Schema Registry Integration
378
379
For production use with schema registries:
380
381
```scala
382
// Pseudo-code for schema registry integration
383
def getSchemaFromRegistry(schemaId: Int): String = {
384
// Fetch schema from Confluent Schema Registry or similar
385
schemaRegistry.getSchemaById(schemaId).toString
386
}
387
388
val schemaId = 42
389
val schema = getSchemaFromRegistry(schemaId)
390
391
val decodedDF = kafkaDF.select(
392
from_avro(col("value"), schema).as("event_data")
393
)
394
```