or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-types.mdencoders.mderror-handling.mdindex.mdrow-operations.mdstreaming-operations.mdutilities.md

encoders.mddocs/

0

# Encoders

1

2

Type-safe conversion between JVM objects and Spark SQL representations for distributed serialization. Encoders provide efficient serialization while preserving type information in Spark's catalyst optimizer.

3

4

## Capabilities

5

6

### Base Encoder Interface

7

8

Core encoder interface for type conversion.

9

10

```scala { .api }

11

/**

12

* Converts JVM objects to/from Spark SQL representation

13

* @tparam T The JVM type being encoded

14

*/

15

trait Encoder[T] extends Serializable {

16

/** Returns the schema for the encoded representation */

17

def schema: StructType

18

19

/** ClassTag for type T */

20

def clsTag: ClassTag[T]

21

}

22

```

23

24

### Agnostic Encoder Interface

25

26

Implementation-agnostic encoder with additional type information.

27

28

```scala { .api }

29

/**

30

* Implementation-agnostic encoder

31

* @tparam T The type being encoded

32

*/

33

trait AgnosticEncoder[T] extends Encoder[T] {

34

/** Whether this encoder represents a primitive type */

35

def isPrimitive: Boolean

36

37

/** Whether the encoded type can be null */

38

def nullable: Boolean

39

40

/** The Spark SQL data type for this encoder */

41

def dataType: DataType

42

43

/** Whether serialization allows lenient type conversion */

44

def lenientSerialization: Boolean

45

46

/** Whether this encoder represents a struct type */

47

def isStruct: Boolean

48

}

49

```

50

51

### Encoder Implementations

52

53

Specific encoder implementations for different data types.

54

55

```scala { .api }

56

/**

57

* Encoder for Option types

58

* @tparam E Element type encoder

59

*/

60

case class OptionEncoder[E](elementEncoder: AgnosticEncoder[E]) extends AgnosticEncoder[Option[E]]

61

62

/**

63

* Encoder for Array types

64

* @tparam E Element type encoder

65

*/

66

case class ArrayEncoder[E](

67

elementEncoder: AgnosticEncoder[E],

68

containsNull: Boolean = true

69

) extends AgnosticEncoder[Array[E]]

70

71

/**

72

* Encoder for Iterable collections

73

* @tparam C Collection type

74

* @tparam E Element type

75

*/

76

case class IterableEncoder[C <: Iterable[E], E](

77

elementEncoder: AgnosticEncoder[E],

78

containsNull: Boolean = true

79

) extends AgnosticEncoder[C]

80

81

/**

82

* Encoder for Map types

83

* @tparam C Map collection type

84

* @tparam K Key type

85

* @tparam V Value type

86

*/

87

case class MapEncoder[C <: Map[K, V], K, V](

88

keyEncoder: AgnosticEncoder[K],

89

valueEncoder: AgnosticEncoder[V],

90

valueContainsNull: Boolean = true

91

) extends AgnosticEncoder[C]

92

93

/**

94

* Represents a field in a struct encoder

95

*/

96

case class EncoderField(

97

name: String,

98

enc: AgnosticEncoder[_],

99

nullable: Boolean,

100

metadata: Metadata = Metadata.empty,

101

readMethod: Option[String] = None,

102

writeMethod: Option[String] = None

103

) {

104

/** Convert to StructField for schema representation */

105

def structField: StructField = StructField(name, enc.dataType, nullable, metadata)

106

}

107

```

108

109

### Row Encoder

110

111

Specialized encoder for Row objects.

112

113

```scala { .api }

114

/**

115

* Encoder for Row objects

116

*/

117

object RowEncoder {

118

/** Create encoder for the given schema */

119

def apply(schema: StructType): Encoder[Row]

120

}

121

```

122

123

## Usage Examples

124

125

**Working with primitive encoders:**

126

127

```scala

128

import org.apache.spark.sql.catalyst.encoders._

129

import org.apache.spark.sql.types._

130

131

// Primitive type encoders are typically provided by the system

132

// but you can work with their properties

133

134

def analyzeEncoder[T](encoder: AgnosticEncoder[T]): Unit = {

135

println(s"Is primitive: ${encoder.isPrimitive}")

136

println(s"Is nullable: ${encoder.nullable}")

137

println(s"Data type: ${encoder.dataType}")

138

println(s"Schema: ${encoder.schema}")

139

println(s"Is struct: ${encoder.isStruct}")

140

println(s"Lenient serialization: ${encoder.lenientSerialization}")

141

}

142

```

143

144

**Option encoder usage:**

145

146

```scala

147

// Working with Option types in encoders

148

case class UserProfile(

149

id: Long,

150

name: String,

151

email: Option[String], // Optional field

152

age: Option[Int] // Optional field

153

)

154

155

// The encoder system handles Option types automatically

156

// Option[String] becomes nullable StringType

157

// Option[Int] becomes nullable IntegerType

158

159

def processOptionalFields[T](optionEncoder: OptionEncoder[T]): Unit = {

160

val elementEncoder = optionEncoder.elementEncoder

161

println(s"Element type: ${elementEncoder.dataType}")

162

println(s"Option is nullable: ${optionEncoder.nullable}") // true

163

}

164

```

165

166

**Array encoder usage:**

167

168

```scala

169

// Working with Array types

170

case class Order(

171

id: String,

172

items: Array[String], // Array of strings

173

quantities: Array[Int], // Array of integers

174

tags: Array[Option[String]] // Array of optional strings

175

)

176

177

def processArrayEncoder[E](arrayEncoder: ArrayEncoder[E]): Unit = {

178

val elementEncoder = arrayEncoder.elementEncoder

179

println(s"Element type: ${elementEncoder.dataType}")

180

println(s"Contains null: ${arrayEncoder.containsNull}")

181

182

// Array schema will be ArrayType(elementType, containsNull)

183

val arrayType = arrayEncoder.dataType.asInstanceOf[ArrayType]

184

println(s"Array element type: ${arrayType.elementType}")

185

println(s"Array contains null: ${arrayType.containsNull}")

186

}

187

```

188

189

**Collection encoder usage:**

190

191

```scala

192

import scala.collection.mutable

193

194

// Working with different collection types

195

case class Analytics(

196

userIds: List[String], // List collection

197

scores: Vector[Double], // Vector collection

198

tags: Set[String], // Set collection

199

buffer: mutable.Buffer[Int] // Mutable collection

200

)

201

202

def processIterableEncoder[C <: Iterable[E], E](iterableEncoder: IterableEncoder[C, E]): Unit = {

203

val elementEncoder = iterableEncoder.elementEncoder

204

println(s"Collection element type: ${elementEncoder.dataType}")

205

println(s"Collection contains null: ${iterableEncoder.containsNull}")

206

}

207

```

208

209

**Map encoder usage:**

210

211

```scala

212

// Working with Map types

213

case class Configuration(

214

settings: Map[String, String], // String to String mapping

215

counters: Map[String, Int], // String to Int mapping

216

metadata: Map[String, Option[String]] // String to optional String

217

)

218

219

def processMapEncoder[C <: Map[K, V], K, V](mapEncoder: MapEncoder[C, K, V]): Unit = {

220

val keyEncoder = mapEncoder.keyEncoder

221

val valueEncoder = mapEncoder.valueEncoder

222

223

println(s"Key type: ${keyEncoder.dataType}")

224

println(s"Value type: ${valueEncoder.dataType}")

225

println(s"Value contains null: ${mapEncoder.valueContainsNull}")

226

227

// Map schema will be MapType(keyType, valueType, valueContainsNull)

228

val mapType = mapEncoder.dataType.asInstanceOf[MapType]

229

println(s"Map key type: ${mapType.keyType}")

230

println(s"Map value type: ${mapType.valueType}")

231

println(s"Map value contains null: ${mapType.valueContainsNull}")

232

}

233

```

234

235

**Struct field encoding:**

236

237

```scala

238

// Working with struct fields

239

case class Person(

240

id: Long,

241

name: String,

242

email: Option[String],

243

addresses: Array[Address],

244

metadata: Map[String, String]

245

)

246

247

case class Address(

248

street: String,

249

city: String,

250

zipCode: String

251

)

252

253

// Encoder fields represent the structure

254

val personFields = Array(

255

EncoderField("id", longEncoder, nullable = false),

256

EncoderField("name", stringEncoder, nullable = false),

257

EncoderField("email", optionStringEncoder, nullable = true),

258

EncoderField("addresses", addressArrayEncoder, nullable = false),

259

EncoderField("metadata", stringMapEncoder, nullable = false)

260

)

261

262

def processEncoderField(field: EncoderField): Unit = {

263

println(s"Field name: ${field.name}")

264

println(s"Field nullable: ${field.nullable}")

265

println(s"Field type: ${field.enc.dataType}")

266

267

// Access metadata if present

268

if (field.metadata != Metadata.empty) {

269

println(s"Field has metadata")

270

}

271

}

272

```

273

274

**Row encoder usage:**

275

276

```scala

277

import org.apache.spark.sql.{Row, Encoder}

278

import org.apache.spark.sql.catalyst.encoders.RowEncoder

279

import org.apache.spark.sql.types._

280

281

// Create schema for Row encoder

282

val schema = StructType(Array(

283

StructField("id", LongType, nullable = false),

284

StructField("name", StringType, nullable = false),

285

StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false),

286

StructField("metadata", MapType(StringType, StringType, valueContainsNull = true), nullable = true)

287

))

288

289

// Create Row encoder

290

val rowEncoder: Encoder[Row] = RowEncoder(schema)

291

292

// Use encoder properties

293

println(s"Row encoder schema: ${rowEncoder.schema}")

294

println(s"Row encoder class tag: ${rowEncoder.clsTag}")

295

296

// Create rows that match the schema

297

val row1 = Row(1L, "Alice", Array(95.5, 87.2, 92.0), Map("department" -> "Engineering"))

298

val row2 = Row(2L, "Bob", Array(88.0, 91.5), null) // null metadata

299

300

// The encoder ensures type safety for these rows

301

```

302

303

**Custom encoder patterns:**

304

305

```scala

306

// Working with encoders in custom functions

307

def processEncodedData[T](data: T, encoder: Encoder[T]): Unit = {

308

val schema = encoder.schema

309

println(s"Processing data with schema: ${schema.treeString}")

310

311

// Access schema fields

312

schema.fields.foreach { field =>

313

println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")

314

}

315

}

316

317

// Type-safe encoder operations

318

def validateEncoder[T](encoder: AgnosticEncoder[T]): Boolean = {

319

val schema = encoder.schema

320

val dataType = encoder.dataType

321

322

// Ensure schema matches data type

323

schema.fields.length match {

324

case 0 if encoder.isPrimitive => true

325

case n if n > 0 && encoder.isStruct => true

326

case _ => false

327

}

328

}

329

```

330

331

**Encoder composition patterns:**

332

333

```scala

334

// Building complex encoders from simpler ones

335

case class NestedData(

336

simple: String,

337

optional: Option[Int],

338

collection: List[Double],

339

mapping: Map[String, Boolean],

340

nested: InnerData

341

)

342

343

case class InnerData(value: String, count: Int)

344

345

// Encoders compose naturally:

346

// - NestedData encoder contains:

347

// - String encoder for simple

348

// - OptionEncoder[Int] for optional

349

// - IterableEncoder[List[Double], Double] for collection

350

// - MapEncoder[Map[String, Boolean], String, Boolean] for mapping

351

// - Struct encoder for nested InnerData

352

353

def analyzeNestedEncoder(encoder: AgnosticEncoder[NestedData]): Unit = {

354

println(s"Nested data schema:")

355

println(encoder.schema.treeString)

356

357

// The schema will show the full nested structure

358

// with appropriate nullability and types

359

}

360

```