or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-conversion.mdconfiguration.mddata-types.mdfile-operations.mdindex.mdschema-conversion.md

binary-conversion.mddocs/

0

# Binary Data Conversion

1

2

This document covers converting between binary Avro data and Spark DataFrame columns using the built-in functions.

3

4

## Core Functions

5

6

```scala { .api }

7

import org.apache.spark.sql.avro.functions._

8

import org.apache.spark.sql.Column

9

10

/**

11

* Converts a binary column of Avro format into its corresponding Catalyst value.

12

* @param data the binary column containing Avro data

13

* @param jsonFormatSchema the Avro schema in JSON string format

14

* @return Column with decoded Catalyst data

15

*/

16

def from_avro(data: Column, jsonFormatSchema: String): Column

17

18

/**

19

* Converts a binary column of Avro format with options.

20

* @param data the binary column containing Avro data

21

* @param jsonFormatSchema the Avro schema in JSON string format

22

* @param options options to control parsing behavior

23

* @return Column with decoded Catalyst data

24

*/

25

def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column

26

27

/**

28

* Converts a column into binary Avro format.

29

* @param data the data column to encode

30

* @return Column with binary Avro data

31

*/

32

def to_avro(data: Column): Column

33

34

/**

35

* Converts a column into binary Avro format with custom schema.

36

* @param data the data column to encode

37

* @param jsonFormatSchema user-specified output Avro schema in JSON format

38

* @return Column with binary Avro data

39

*/

40

def to_avro(data: Column, jsonFormatSchema: String): Column

41

```

42

43

## Basic Usage

44

45

### Decoding Binary Avro Data

46

47

```scala

48

import org.apache.spark.sql.avro.functions._

49

import org.apache.spark.sql.functions._

50

51

val avroSchema = """

52

{

53

"type": "record",

54

"name": "User",

55

"fields": [

56

{"name": "id", "type": "long"},

57

{"name": "name", "type": "string"},

58

{"name": "email", "type": ["null", "string"], "default": null}

59

]

60

}

61

"""

62

63

// Assuming df has a column 'avro_data' containing binary Avro data

64

val decodedDF = df.select(

65

from_avro(col("avro_data"), avroSchema).as("user_data")

66

)

67

68

// Extract individual fields

69

val expandedDF = decodedDF.select(

70

col("user_data.id").as("user_id"),

71

col("user_data.name").as("user_name"),

72

col("user_data.email").as("user_email")

73

)

74

```

75

76

### Encoding Data to Binary Avro

77

78

```scala

79

// Convert struct data to binary Avro

80

val encodedDF = df.select(

81

to_avro(struct(

82

col("id"),

83

col("name"),

84

col("email")

85

)).as("avro_data")

86

)

87

88

// With custom schema

89

val outputSchema = """

90

{

91

"type": "record",

92

"name": "OutputUser",

93

"fields": [

94

{"name": "id", "type": "long"},

95

{"name": "name", "type": "string"}

96

]

97

}

98

"""

99

100

val customEncodedDF = df.select(

101

to_avro(struct(col("id"), col("name")), outputSchema).as("avro_data")

102

)

103

```

104

105

## Advanced Usage

106

107

### Using Conversion Options

108

109

```scala

110

import scala.collection.JavaConverters._

111

112

val options = Map(

113

"mode" -> "PERMISSIVE",

114

"datetimeRebaseMode" -> "CORRECTED"

115

).asJava

116

117

val decodedDF = df.select(

118

from_avro(col("avro_data"), avroSchema, options).as("decoded")

119

)

120

```

121

122

### Handling Complex Nested Data

123

124

```scala

125

val nestedSchema = """

126

{

127

"type": "record",

128

"name": "Order",

129

"fields": [

130

{"name": "orderId", "type": "long"},

131

{"name": "customer", "type": {

132

"type": "record",

133

"name": "Customer",

134

"fields": [

135

{"name": "id", "type": "long"},

136

{"name": "name", "type": "string"}

137

]

138

}},

139

{"name": "items", "type": {

140

"type": "array",

141

"items": {

142

"type": "record",

143

"name": "Item",

144

"fields": [

145

{"name": "productId", "type": "long"},

146

{"name": "quantity", "type": "int"},

147

{"name": "price", "type": "double"}

148

]

149

}

150

}}

151

]

152

}

153

"""

154

155

val decodedOrderDF = df.select(

156

from_avro(col("order_avro"), nestedSchema).as("order")

157

)

158

159

// Access nested fields

160

val flattenedDF = decodedOrderDF.select(

161

col("order.orderId"),

162

col("order.customer.name").as("customer_name"),

163

size(col("order.items")).as("item_count"),

164

col("order.items").as("items")

165

)

166

```

167

168

### Array and Map Conversion

169

170

```scala

171

val arraySchema = """

172

{

173

"type": "array",

174

"items": "string"

175

}

176

"""

177

178

val mapSchema = """

179

{

180

"type": "map",

181

"values": "int"

182

}

183

"""

184

185

// Decode array

186

val arrayDF = df.select(

187

from_avro(col("string_array_avro"), arraySchema).as("strings")

188

)

189

190

// Decode map

191

val mapDF = df.select(

192

from_avro(col("map_avro"), mapSchema).as("int_map")

193

)

194

195

// Encode array and map

196

val encodedCollectionsDF = df.select(

197

to_avro(col("string_array")).as("array_avro"),

198

to_avro(col("int_map")).as("map_avro")

199

)

200

```

201

202

## Union Type Handling

203

204

```scala

205

val unionSchema = """

206

{

207

"type": "record",

208

"name": "Event",

209

"fields": [

210

{"name": "id", "type": "long"},

211

{"name": "data", "type": [

212

"null",

213

{"type": "record", "name": "TextData", "fields": [{"name": "text", "type": "string"}]},

214

{"type": "record", "name": "NumericData", "fields": [{"name": "value", "type": "double"}]}

215

]}

216

]

217

}

218

"""

219

220

val unionOptions = Map(

221

"enableStableIdentifiersForUnionType" -> "true"

222

).asJava

223

224

val decodedUnionDF = df.select(

225

from_avro(col("event_avro"), unionSchema, unionOptions).as("event")

226

)

227

228

// Access union members (with stable identifiers)

229

val processedDF = decodedUnionDF.select(

230

col("event.id"),

231

when(col("event.data.member_textdata").isNotNull,

232

col("event.data.member_textdata.text"))

233

.when(col("event.data.member_numericdata").isNotNull,

234

col("event.data.member_numericdata.value").cast("string"))

235

.as("data_value")

236

)

237

```

238

239

## Error Handling and Parse Modes

240

241

### Parse Mode Options

242

243

```scala

244

// FAILFAST mode - throw exception on parsing errors

245

val failfastOptions = Map("mode" -> "FAILFAST").asJava

246

val strictDF = df.select(

247

from_avro(col("avro_data"), schema, failfastOptions).as("decoded")

248

)

249

250

// PERMISSIVE mode - set malformed records to null

251

val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava

252

val lenientDF = df.select(

253

from_avro(col("avro_data"), schema, permissiveOptions).as("decoded")

254

)

255

256

// Filter out null results from parsing errors

257

val cleanDF = lenientDF.filter(col("decoded").isNotNull)

258

```

259

260

### Handling Invalid Schema

261

262

```scala

263

try {

264

val invalidSchema = """{"type": "invalid"}"""

265

val result = df.select(from_avro(col("data"), invalidSchema))

266

result.show()

267

} catch {

268

case e: AnalysisException =>

269

println(s"Schema validation failed: ${e.getMessage}")

270

}

271

```

272

273

## Performance Optimization

274

275

### Caching Decoded Data

276

```scala

277

// Cache expensive decode operations

278

val decodedDF = df.select(

279

from_avro(col("avro_data"), complexSchema).as("decoded")

280

).cache()

281

282

// Use cached result multiple times

283

val summary1 = decodedDF.groupBy("decoded.category").count()

284

val summary2 = decodedDF.filter(col("decoded.amount") > 100)

285

```

286

287

### Projection Pushdown

288

```scala

289

// Only decode needed fields to improve performance

290

val projectedSchema = """

291

{

292

"type": "record",

293

"name": "User",

294

"fields": [

295

{"name": "id", "type": "long"},

296

{"name": "name", "type": "string"}

297

]

298

}

299

"""

300

301

// Decode only required fields instead of full schema

302

val efficientDF = df.select(

303

from_avro(col("avro_data"), projectedSchema).as("user")

304

).select("user.id", "user.name")

305

```

306

307

## Integration with Streaming

308

309

### Structured Streaming Usage

310

```scala

311

val streamingDF = spark.readStream

312

.format("kafka")

313

.option("kafka.bootstrap.servers", "localhost:9092")

314

.option("subscribe", "avro-topic")

315

.load()

316

317

// Decode Avro data from Kafka value

318

val decodedStream = streamingDF.select(

319

from_avro(col("value"), avroSchema).as("decoded_data")

320

)

321

322

val query = decodedStream.writeStream

323

.format("console")

324

.outputMode("append")

325

.start()

326

```

327

328

### Encoding for Kafka Output

329

```scala

330

val encodedStream = processedDF.select(

331

col("key"),

332

to_avro(struct(col("*"))).as("value")

333

)

334

335

val kafkaQuery = encodedStream.writeStream

336

.format("kafka")

337

.option("kafka.bootstrap.servers", "localhost:9092")

338

.option("topic", "output-topic")

339

.start()

340

```

341

342

## SQL Function Usage

343

344

```sql

345

-- Register functions for SQL usage (functions are available by default)

346

SELECT

347

from_avro(avro_column, '{

348

"type": "record",

349

"name": "Data",

350

"fields": [

351

{"name": "id", "type": "long"},

352

{"name": "value", "type": "string"}

353

]

354

}') as decoded_data

355

FROM source_table;

356

357

-- Encode data to Avro

358

SELECT

359

to_avro(struct(id, name, email)) as avro_data

360

FROM user_table;

361

```