0
# Encoders
1
2
Type-safe conversion between JVM objects and Spark SQL representations for distributed serialization. Encoders provide efficient serialization while preserving type information in Spark's catalyst optimizer.
3
4
## Capabilities
5
6
### Base Encoder Interface
7
8
Core encoder interface for type conversion.
9
10
```scala { .api }
11
/**
12
* Converts JVM objects to/from Spark SQL representation
13
* @tparam T The JVM type being encoded
14
*/
15
trait Encoder[T] extends Serializable {
16
/** Returns the schema for the encoded representation */
17
def schema: StructType
18
19
/** ClassTag for type T */
20
def clsTag: ClassTag[T]
21
}
22
```
23
24
### Agnostic Encoder Interface
25
26
Implementation-agnostic encoder with additional type information.
27
28
```scala { .api }
29
/**
30
* Implementation-agnostic encoder
31
* @tparam T The type being encoded
32
*/
33
trait AgnosticEncoder[T] extends Encoder[T] {
34
/** Whether this encoder represents a primitive type */
35
def isPrimitive: Boolean
36
37
/** Whether the encoded type can be null */
38
def nullable: Boolean
39
40
/** The Spark SQL data type for this encoder */
41
def dataType: DataType
42
43
/** Whether serialization allows lenient type conversion */
44
def lenientSerialization: Boolean
45
46
/** Whether this encoder represents a struct type */
47
def isStruct: Boolean
48
}
49
```
50
51
### Encoder Implementations
52
53
Specific encoder implementations for different data types.
54
55
```scala { .api }
56
/**
57
* Encoder for Option types
58
* @tparam E Element type encoder
59
*/
60
case class OptionEncoder[E](elementEncoder: AgnosticEncoder[E]) extends AgnosticEncoder[Option[E]]
61
62
/**
63
* Encoder for Array types
64
* @tparam E Element type encoder
65
*/
66
case class ArrayEncoder[E](
67
elementEncoder: AgnosticEncoder[E],
68
containsNull: Boolean = true
69
) extends AgnosticEncoder[Array[E]]
70
71
/**
72
* Encoder for Iterable collections
73
* @tparam C Collection type
74
* @tparam E Element type
75
*/
76
case class IterableEncoder[C <: Iterable[E], E](
77
elementEncoder: AgnosticEncoder[E],
78
containsNull: Boolean = true
79
) extends AgnosticEncoder[C]
80
81
/**
82
* Encoder for Map types
83
* @tparam C Map collection type
84
* @tparam K Key type
85
* @tparam V Value type
86
*/
87
case class MapEncoder[C <: Map[K, V], K, V](
88
keyEncoder: AgnosticEncoder[K],
89
valueEncoder: AgnosticEncoder[V],
90
valueContainsNull: Boolean = true
91
) extends AgnosticEncoder[C]
92
93
/**
94
* Represents a field in a struct encoder
95
*/
96
case class EncoderField(
97
name: String,
98
enc: AgnosticEncoder[_],
99
nullable: Boolean,
100
metadata: Metadata = Metadata.empty,
101
readMethod: Option[String] = None,
102
writeMethod: Option[String] = None
103
) {
104
/** Convert to StructField for schema representation */
105
def structField: StructField = StructField(name, enc.dataType, nullable, metadata)
106
}
107
```
108
109
### Row Encoder
110
111
Specialized encoder for Row objects.
112
113
```scala { .api }
114
/**
115
* Encoder for Row objects
116
*/
117
object RowEncoder {
118
/** Create encoder for the given schema */
119
def apply(schema: StructType): Encoder[Row]
120
}
121
```
122
123
## Usage Examples
124
125
**Working with primitive encoders:**
126
127
```scala
128
import org.apache.spark.sql.catalyst.encoders._
129
import org.apache.spark.sql.types._
130
131
// Primitive type encoders are typically provided by the system
132
// but you can work with their properties
133
134
def analyzeEncoder[T](encoder: AgnosticEncoder[T]): Unit = {
135
println(s"Is primitive: ${encoder.isPrimitive}")
136
println(s"Is nullable: ${encoder.nullable}")
137
println(s"Data type: ${encoder.dataType}")
138
println(s"Schema: ${encoder.schema}")
139
println(s"Is struct: ${encoder.isStruct}")
140
println(s"Lenient serialization: ${encoder.lenientSerialization}")
141
}
142
```
143
144
**Option encoder usage:**
145
146
```scala
147
// Working with Option types in encoders
148
case class UserProfile(
149
id: Long,
150
name: String,
151
email: Option[String], // Optional field
152
age: Option[Int] // Optional field
153
)
154
155
// The encoder system handles Option types automatically
156
// Option[String] becomes nullable StringType
157
// Option[Int] becomes nullable IntegerType
158
159
def processOptionalFields[T](optionEncoder: OptionEncoder[T]): Unit = {
160
val elementEncoder = optionEncoder.elementEncoder
161
println(s"Element type: ${elementEncoder.dataType}")
162
println(s"Option is nullable: ${optionEncoder.nullable}") // true
163
}
164
```
165
166
**Array encoder usage:**
167
168
```scala
169
// Working with Array types
170
case class Order(
171
id: String,
172
items: Array[String], // Array of strings
173
quantities: Array[Int], // Array of integers
174
tags: Array[Option[String]] // Array of optional strings
175
)
176
177
def processArrayEncoder[E](arrayEncoder: ArrayEncoder[E]): Unit = {
178
val elementEncoder = arrayEncoder.elementEncoder
179
println(s"Element type: ${elementEncoder.dataType}")
180
println(s"Contains null: ${arrayEncoder.containsNull}")
181
182
// Array schema will be ArrayType(elementType, containsNull)
183
val arrayType = arrayEncoder.dataType.asInstanceOf[ArrayType]
184
println(s"Array element type: ${arrayType.elementType}")
185
println(s"Array contains null: ${arrayType.containsNull}")
186
}
187
```
188
189
**Collection encoder usage:**
190
191
```scala
192
import scala.collection.mutable
193
194
// Working with different collection types
195
case class Analytics(
196
userIds: List[String], // List collection
197
scores: Vector[Double], // Vector collection
198
tags: Set[String], // Set collection
199
buffer: mutable.Buffer[Int] // Mutable collection
200
)
201
202
def processIterableEncoder[C <: Iterable[E], E](iterableEncoder: IterableEncoder[C, E]): Unit = {
203
val elementEncoder = iterableEncoder.elementEncoder
204
println(s"Collection element type: ${elementEncoder.dataType}")
205
println(s"Collection contains null: ${iterableEncoder.containsNull}")
206
}
207
```
208
209
**Map encoder usage:**
210
211
```scala
212
// Working with Map types
213
case class Configuration(
214
settings: Map[String, String], // String to String mapping
215
counters: Map[String, Int], // String to Int mapping
216
metadata: Map[String, Option[String]] // String to optional String
217
)
218
219
def processMapEncoder[C <: Map[K, V], K, V](mapEncoder: MapEncoder[C, K, V]): Unit = {
220
val keyEncoder = mapEncoder.keyEncoder
221
val valueEncoder = mapEncoder.valueEncoder
222
223
println(s"Key type: ${keyEncoder.dataType}")
224
println(s"Value type: ${valueEncoder.dataType}")
225
println(s"Value contains null: ${mapEncoder.valueContainsNull}")
226
227
// Map schema will be MapType(keyType, valueType, valueContainsNull)
228
val mapType = mapEncoder.dataType.asInstanceOf[MapType]
229
println(s"Map key type: ${mapType.keyType}")
230
println(s"Map value type: ${mapType.valueType}")
231
println(s"Map value contains null: ${mapType.valueContainsNull}")
232
}
233
```
234
235
**Struct field encoding:**
236
237
```scala
238
// Working with struct fields
239
case class Person(
240
id: Long,
241
name: String,
242
email: Option[String],
243
addresses: Array[Address],
244
metadata: Map[String, String]
245
)
246
247
case class Address(
248
street: String,
249
city: String,
250
zipCode: String
251
)
252
253
// Encoder fields represent the structure
254
val personFields = Array(
255
EncoderField("id", longEncoder, nullable = false),
256
EncoderField("name", stringEncoder, nullable = false),
257
EncoderField("email", optionStringEncoder, nullable = true),
258
EncoderField("addresses", addressArrayEncoder, nullable = false),
259
EncoderField("metadata", stringMapEncoder, nullable = false)
260
)
261
262
def processEncoderField(field: EncoderField): Unit = {
263
println(s"Field name: ${field.name}")
264
println(s"Field nullable: ${field.nullable}")
265
println(s"Field type: ${field.enc.dataType}")
266
267
// Access metadata if present
268
if (field.metadata != Metadata.empty) {
269
println(s"Field has metadata")
270
}
271
}
272
```
273
274
**Row encoder usage:**
275
276
```scala
277
import org.apache.spark.sql.{Row, Encoder}
278
import org.apache.spark.sql.catalyst.encoders.RowEncoder
279
import org.apache.spark.sql.types._
280
281
// Create schema for Row encoder
282
val schema = StructType(Array(
283
StructField("id", LongType, nullable = false),
284
StructField("name", StringType, nullable = false),
285
StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false),
286
StructField("metadata", MapType(StringType, StringType, valueContainsNull = true), nullable = true)
287
))
288
289
// Create Row encoder
290
val rowEncoder: Encoder[Row] = RowEncoder(schema)
291
292
// Use encoder properties
293
println(s"Row encoder schema: ${rowEncoder.schema}")
294
println(s"Row encoder class tag: ${rowEncoder.clsTag}")
295
296
// Create rows that match the schema
297
val row1 = Row(1L, "Alice", Array(95.5, 87.2, 92.0), Map("department" -> "Engineering"))
298
val row2 = Row(2L, "Bob", Array(88.0, 91.5), null) // null metadata
299
300
// The encoder ensures type safety for these rows
301
```
302
303
**Custom encoder patterns:**
304
305
```scala
306
// Working with encoders in custom functions
307
def processEncodedData[T](data: T, encoder: Encoder[T]): Unit = {
308
val schema = encoder.schema
309
println(s"Processing data with schema: ${schema.treeString}")
310
311
// Access schema fields
312
schema.fields.foreach { field =>
313
println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
314
}
315
}
316
317
// Type-safe encoder operations
318
def validateEncoder[T](encoder: AgnosticEncoder[T]): Boolean = {
319
val schema = encoder.schema
320
val dataType = encoder.dataType
321
322
// Ensure schema matches data type
323
schema.fields.length match {
324
case 0 if encoder.isPrimitive => true
325
case n if n > 0 && encoder.isStruct => true
326
case _ => false
327
}
328
}
329
```
330
331
**Encoder composition patterns:**
332
333
```scala
334
// Building complex encoders from simpler ones
335
case class NestedData(
336
simple: String,
337
optional: Option[Int],
338
collection: List[Double],
339
mapping: Map[String, Boolean],
340
nested: InnerData
341
)
342
343
case class InnerData(value: String, count: Int)
344
345
// Encoders compose naturally:
346
// - NestedData encoder contains:
347
// - String encoder for simple
348
// - OptionEncoder[Int] for optional
349
// - IterableEncoder[List[Double], Double] for collection
350
// - MapEncoder[Map[String, Boolean], String, Boolean] for mapping
351
// - Struct encoder for nested InnerData
352
353
def analyzeNestedEncoder(encoder: AgnosticEncoder[NestedData]): Unit = {
354
println(s"Nested data schema:")
355
println(encoder.schema.treeString)
356
357
// The schema will show the full nested structure
358
// with appropriate nullability and types
359
}
360
```