or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-functions.mdconfiguration.mdfile-operations.mdindex.mdschema-conversion.md

binary-functions.mddocs/

0

# Binary Data Functions

1

2

The Spark Avro connector provides SQL functions for converting between binary Avro data and Spark SQL structures. These functions enable processing of Avro-encoded columns within DataFrames, useful for working with message queues, event streams, and embedded binary Avro data.

3

4

## Core Functions

5

6

### from_avro Function

7

8

Converts binary Avro data to Spark SQL structures using a provided schema.

9

10

```scala { .api }

11

def from_avro(data: Column, jsonFormatSchema: String): Column

12

13

def from_avro(

14

data: Column,

15

jsonFormatSchema: String,

16

options: java.util.Map[String, String]

17

): Column

18

```

19

20

**Parameters:**

21

- `data`: Column containing binary Avro data (BinaryType)

22

- `jsonFormatSchema`: Avro schema in JSON string format

23

- `options`: Additional options for deserialization (optional)

24

25

**Returns:** Column with deserialized Spark SQL data structure

26

27

**Basic Usage:**

28

29

```scala

30

import org.apache.spark.sql.avro.functions._

31

import org.apache.spark.sql.functions._

32

33

val schema = """

34

{

35

"type": "record",

36

"name": "User",

37

"fields": [

38

{"name": "id", "type": "long"},

39

{"name": "name", "type": "string"},

40

{"name": "email", "type": ["null", "string"], "default": null}

41

]

42

}

43

"""

44

45

val df = spark.table("kafka_topic")

46

val decodedDF = df.select(

47

from_avro(col("value"), schema).as("user_data")

48

)

49

50

decodedDF.select(

51

col("user_data.id"),

52

col("user_data.name"),

53

col("user_data.email")

54

).show()

55

```

56

57

**Usage with Options:**

58

59

```scala

60

import scala.collection.JavaConverters._

61

62

val options = Map(

63

"mode" -> "PERMISSIVE",

64

"datetimeRebaseMode" -> "CORRECTED"

65

).asJava

66

67

val decodedDF = df.select(

68

from_avro(col("avro_bytes"), schema, options).as("parsed_data")

69

)

70

```

71

72

### to_avro Function

73

74

Converts Spark SQL data structures to binary Avro format.

75

76

```scala { .api }

77

def to_avro(data: Column): Column

78

79

def to_avro(data: Column, jsonFormatSchema: String): Column

80

```

81

82

**Parameters:**

83

- `data`: Column with Spark SQL data structure

84

- `jsonFormatSchema`: Target Avro schema in JSON format (optional)

85

86

**Returns:** Column with binary Avro data (BinaryType)

87

88

**Basic Usage:**

89

90

```scala

91

import org.apache.spark.sql.avro.functions._

92

import org.apache.spark.sql.functions._

93

94

val df = spark.table("users")

95

96

// Convert entire row to Avro

97

val avroDF = df.select(

98

to_avro(struct(col("*"))).as("avro_data")

99

)

100

101

// Convert specific columns to Avro

102

val avroDF2 = df.select(

103

col("id"),

104

to_avro(struct(

105

col("name"),

106

col("email"),

107

col("created_at")

108

)).as("user_avro")

109

)

110

```

111

112

**Usage with Custom Schema:**

113

114

```scala

115

val outputSchema = """

116

{

117

"type": "record",

118

"name": "UserOutput",

119

"namespace": "com.example",

120

"fields": [

121

{"name": "user_id", "type": "long"},

122

{"name": "full_name", "type": "string"},

123

{"name": "contact_email", "type": ["null", "string"]}

124

]

125

}

126

"""

127

128

val avroDF = df.select(

129

to_avro(

130

struct(

131

col("id").as("user_id"),

132

col("name").as("full_name"),

133

col("email").as("contact_email")

134

),

135

outputSchema

136

).as("formatted_avro")

137

)

138

```

139

140

## Advanced Usage Patterns

141

142

### Roundtrip Processing

143

144

Converting data from Avro to Spark SQL structures, processing, and back to Avro:

145

146

```scala

147

val schema = """

148

{

149

"type": "record",

150

"name": "Event",

151

"fields": [

152

{"name": "event_id", "type": "string"},

153

{"name": "timestamp", "type": "long"},

154

{"name": "user_id", "type": "long"},

155

{"name": "properties", "type": {"type": "map", "values": "string"}}

156

]

157

}

158

"""

159

160

val processedDF = df

161

.select(from_avro(col("event_data"), schema).as("event"))

162

.select(

163

col("event.event_id"),

164

col("event.timestamp"),

165

col("event.user_id"),

166

col("event.properties")

167

)

168

.filter(col("timestamp") > unix_timestamp() - 3600) // Last hour only

169

.withColumn("processed_at", current_timestamp())

170

.select(

171

to_avro(struct(

172

col("event_id"),

173

col("timestamp"),

174

col("user_id"),

175

col("properties"),

176

col("processed_at")

177

)).as("processed_event")

178

)

179

```

180

181

### Kafka Integration

182

183

Common pattern for processing Avro messages from Kafka:

184

185

```scala

186

val kafkaDF = spark

187

.readStream

188

.format("kafka")

189

.option("kafka.bootstrap.servers", "localhost:9092")

190

.option("subscribe", "avro-events")

191

.load()

192

193

val schema = """

194

{

195

"type": "record",

196

"name": "KafkaEvent",

197

"fields": [

198

{"name": "event_type", "type": "string"},

199

{"name": "payload", "type": "string"},

200

{"name": "metadata", "type": {"type": "map", "values": "string"}}

201

]

202

}

203

"""

204

205

val decodedDF = kafkaDF.select(

206

col("key").cast("string").as("message_key"),

207

from_avro(col("value"), schema).as("event_data"),

208

col("timestamp").as("kafka_timestamp")

209

).select(

210

col("message_key"),

211

col("event_data.event_type"),

212

col("event_data.payload"),

213

col("event_data.metadata"),

214

col("kafka_timestamp")

215

)

216

```

217

218

### Nested Schema Handling

219

220

Working with complex nested types:

221

222

```scala

223

val nestedSchema = """

224

{

225

"type": "record",

226

"name": "Order",

227

"fields": [

228

{"name": "order_id", "type": "string"},

229

{"name": "customer", "type": {

230

"type": "record",

231

"name": "Customer",

232

"fields": [

233

{"name": "id", "type": "long"},

234

{"name": "name", "type": "string"},

235

{"name": "address", "type": {

236

"type": "record",

237

"name": "Address",

238

"fields": [

239

{"name": "street", "type": "string"},

240

{"name": "city", "type": "string"},

241

{"name": "country", "type": "string"}

242

]

243

}}

244

]

245

}},

246

{"name": "items", "type": {

247

"type": "array",

248

"items": {

249

"type": "record",

250

"name": "Item",

251

"fields": [

252

{"name": "sku", "type": "string"},

253

{"name": "quantity", "type": "int"},

254

{"name": "price", "type": "double"}

255

]

256

}

257

}}

258

]

259

}

260

"""

261

262

val orderDF = df.select(

263

from_avro(col("order_data"), nestedSchema).as("order")

264

).select(

265

col("order.order_id"),

266

col("order.customer.name").as("customer_name"),

267

col("order.customer.address.city").as("city"),

268

size(col("order.items")).as("item_count"),

269

expr("aggregate(order.items, 0.0, (acc, item) -> acc + item.price * item.quantity)").as("total_amount")

270

)

271

```

272

273

## Options for from_avro

274

275

When using the three-parameter version of `from_avro`, the following options are supported:

276

277

### Mode Options

278

279

- `PERMISSIVE` (default): Parse all records, set corrupt records to null

280

- `DROPMALFORMED`: Drop records that don't match the schema

281

- `FAILFAST`: Throw exception on first corrupt record

282

283

```scala

284

val options = Map("mode" -> "DROPMALFORMED").asJava

285

val cleanDF = df.select(

286

from_avro(col("data"), schema, options).as("parsed")

287

).filter(col("parsed").isNotNull)

288

```

289

290

### DateTime Rebase Mode

291

292

Controls handling of DATE and TIMESTAMP values:

293

294

- `EXCEPTION`: Throw exception for dates that need rebasing

295

- `LEGACY`: Use legacy Julian calendar handling

296

- `CORRECTED`: Apply proleptic Gregorian calendar correction

297

298

```scala

299

val options = Map("datetimeRebaseMode" -> "CORRECTED").asJava

300

val df = sourceDF.select(

301

from_avro(col("event_data"), schema, options).as("event")

302

)

303

```

304

305

### Schema Evolution Support

306

307

The `avroSchema` option allows specifying an evolved schema different from the encoded data:

308

309

```scala

310

val evolvedSchema = """

311

{

312

"type": "record",

313

"name": "User",

314

"fields": [

315

{"name": "id", "type": "long"},

316

{"name": "name", "type": "string"},

317

{"name": "email", "type": "string"},

318

{"name": "phone", "type": ["null", "string"], "default": null}

319

]

320

}

321

"""

322

323

val options = Map("avroSchema" -> evolvedSchema).asJava

324

val df = oldDataDF.select(

325

from_avro(col("user_bytes"), originalSchema, options).as("user")

326

)

327

```

328

329

## Error Handling

330

331

### Schema Mismatch Errors

332

333

When schemas don't match the binary data:

334

335

```scala

336

import org.apache.spark.sql.AnalysisException

337

338

try {

339

val result = df.select(

340

from_avro(col("data"), invalidSchema).as("parsed")

341

).collect()

342

} catch {

343

case e: AnalysisException =>

344

println(s"Schema validation failed: ${e.getMessage}")

345

}

346

```

347

348

### Null Handling

349

350

Binary functions handle null input gracefully:

351

352

```scala

353

// null binary data results in null output

354

val dfWithNulls = df.select(

355

when(col("data").isNull, lit(null))

356

.otherwise(from_avro(col("data"), schema))

357

.as("parsed_data")

358

)

359

```

360

361

## Performance Considerations

362

363

### Caching Parsed Data

364

365

When repeatedly accessing parsed Avro data:

366

367

```scala

368

val parsedDF = df.select(

369

from_avro(col("avro_data"), schema).as("parsed")

370

).cache()

371

372

// Multiple operations on the same parsed data

373

val aggregated = parsedDF.groupBy("parsed.category").count()

374

val filtered = parsedDF.filter(col("parsed.amount") > 100)

375

```

376

377

### Schema Registry Integration

378

379

For production use with schema registries:

380

381

```scala

382

// Pseudo-code for schema registry integration

383

def getSchemaFromRegistry(schemaId: Int): String = {

384

// Fetch schema from Confluent Schema Registry or similar

385

schemaRegistry.getSchemaById(schemaId).toString

386

}

387

388

val schemaId = 42

389

val schema = getSchemaFromRegistry(schemaId)

390

391

val decodedDF = kafkaDF.select(

392

from_avro(col("value"), schema).as("event_data")

393

)

394

```