or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md

serialization.mddocs/

0

# Serialization

1

2

Apache Spark provides a pluggable serialization framework supporting Java serialization and Kryo for optimized network communication and storage. The serialization system handles object serialization for RDD storage, shuffle operations, and broadcast variables.

3

4

## Core Serialization Interfaces

5

6

### Serializer

7

```scala { .api }

8

abstract class Serializer {

9

def newInstance(): SerializerInstance

10

def supportsRelocationOfSerializedObjects: Boolean

11

}

12

```

13

14

### SerializerInstance

15

```scala { .api }

16

abstract class SerializerInstance {

17

def serialize[T: ClassTag](t: T): ByteBuffer

18

def deserialize[T: ClassTag](bytes: ByteBuffer): T

19

def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T

20

def serializeStream(s: OutputStream): SerializationStream

21

def deserializeStream(s: InputStream): DeserializationStream

22

}

23

```

24

25

### Serialization Streams

26

```scala { .api }

27

abstract class SerializationStream {

28

def writeObject[T: ClassTag](t: T): SerializationStream

29

def flush(): Unit

30

def close(): Unit

31

def writeKey[T: ClassTag](key: T): SerializationStream

32

def writeValue[T: ClassTag](value: T): SerializationStream

33

def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream

34

}

35

36

abstract class DeserializationStream {

37

def readObject[T: ClassTag](): T

38

def readKey[T: ClassTag](): T

39

def readValue[T: ClassTag](): T

40

def close(): Unit

41

def asIterator: Iterator[Any]

42

def asKeyValueIterator: Iterator[(Any, Any)]

43

}

44

```

45

46

## Built-in Serializer Implementations

47

48

### JavaSerializer

49

```scala { .api }

50

class JavaSerializer(conf: SparkConf) extends Serializer {

51

def newInstance(): SerializerInstance

52

def supportsRelocationOfSerializedObjects: Boolean = true

53

54

private[spark] override def toString: String = "JavaSerializer"

55

}

56

57

class JavaSerializerInstance(counterReset: Int, extraDebugInfo: Boolean) extends SerializerInstance {

58

def serialize[T: ClassTag](t: T): ByteBuffer

59

def deserialize[T: ClassTag](bytes: ByteBuffer): T

60

def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T

61

def serializeStream(s: OutputStream): SerializationStream

62

def deserializeStream(s: InputStream): DeserializationStream

63

}

64

```

65

66

### KryoSerializer

67

```scala { .api }

68

class KryoSerializer(conf: SparkConf) extends Serializer with Logging {

69

def newInstance(): SerializerInstance

70

def supportsRelocationOfSerializedObjects: Boolean = false

71

72

private[spark] override def toString: String = "KryoSerializer"

73

}

74

75

class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance {

76

def serialize[T: ClassTag](t: T): ByteBuffer

77

def deserialize[T: ClassTag](bytes: ByteBuffer): T

78

def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T

79

def serializeStream(s: OutputStream): SerializationStream

80

def deserializeStream(s: InputStream): DeserializationStream

81

}

82

```

83

84

## Kryo Registration

85

86

### KryoRegistrator

87

```scala { .api }

88

trait KryoRegistrator {

89

def registerClasses(kryo: Kryo): Unit

90

}

91

```

92

93

### Built-in Registrators

94

```scala { .api }

95

class SparkKryoRegistrator extends KryoRegistrator {

96

def registerClasses(kryo: Kryo): Unit

97

}

98

```

99

100

## Serialization Configuration

101

102

Key configuration properties for controlling serialization behavior:

103

104

### General Serialization

105

- `spark.serializer` - Serializer class to use (Java or Kryo)

106

- `spark.closure.serializer` - Serializer for closures (Java only)

107

- `spark.serializer.objectStreamReset` - Reset frequency for Java serialization

108

109

### Kryo Configuration

110

- `spark.kryo.classesToRegister` - Comma-separated list of classes to register

111

- `spark.kryo.registrator` - Custom KryoRegistrator class

112

- `spark.kryo.registrationRequired` - Require class registration

113

- `spark.kryo.referenceTracking` - Enable reference tracking

114

- `spark.kryo.unsafe` - Use unsafe-based Kryo serialization

115

116

### Buffer Configuration

117

- `spark.kryoserializer.buffer` - Initial buffer size for Kryo

118

- `spark.kryoserializer.buffer.max` - Maximum buffer size for Kryo

119

- `spark.serializer.buffer.size` - Buffer size for other serializers

120

121

## Usage Examples

122

123

### Basic Kryo Configuration

124

```scala

125

import org.apache.spark.{SparkConf, SparkContext}

126

127

val conf = new SparkConf()

128

.setAppName("Kryo Example")

129

.setMaster("local[*]")

130

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

131

.set("spark.kryo.registrationRequired", "false")

132

.set("spark.kryoserializer.buffer", "64k")

133

.set("spark.kryoserializer.buffer.max", "64m")

134

135

val sc = new SparkContext(conf)

136

```

137

138

### Custom Kryo Registrator

139

```scala

140

import com.esotericsoftware.kryo.Kryo

141

import org.apache.spark.serializer.KryoRegistrator

142

143

// Define custom classes

144

case class Person(name: String, age: Int, addresses: List[String])

145

case class Company(name: String, employees: List[Person])

146

147

// Custom registrator

148

class MyKryoRegistrator extends KryoRegistrator {

149

override def registerClasses(kryo: Kryo): Unit = {

150

kryo.register(classOf[Person])

151

kryo.register(classOf[Company])

152

kryo.register(classOf[scala.collection.immutable.List[_]])

153

kryo.register(classOf[scala.collection.immutable.$colon$colon[_]])

154

kryo.register(classOf[scala.collection.immutable.Nil$])

155

}

156

}

157

158

// Configure Spark to use custom registrator

159

val conf = new SparkConf()

160

.setAppName("Custom Kryo")

161

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

162

.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")

163

.set("spark.kryo.registrationRequired", "true") // Require registration

164

165

val sc = new SparkContext(conf)

166

167

// Use custom objects

168

val people = sc.parallelize(Seq(

169

Person("Alice", 25, List("123 Main St", "456 Oak Ave")),

170

Person("Bob", 30, List("789 Pine St"))

171

))

172

173

val companies = sc.parallelize(Seq(

174

Company("TechCorp", List(

175

Person("Alice", 25, List("123 Main St")),

176

Person("Bob", 30, List("789 Pine St"))

177

))

178

))

179

180

val result = companies.flatMap(_.employees).collect()

181

```

182

183

### Class Registration via Configuration

184

```scala

185

// Register classes via configuration instead of custom registrator

186

val conf = new SparkConf()

187

.setAppName("Class Registration")

188

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

189

.set("spark.kryo.classesToRegister",

190

"com.example.Person," +

191

"com.example.Company," +

192

"scala.collection.immutable.List," +

193

"scala.collection.immutable.$colon$colon," +

194

"scala.collection.immutable.Nil$"

195

)

196

197

val sc = new SparkContext(conf)

198

```

199

200

### Performance Comparison

201

```scala

202

import scala.util.Random

203

204

case class LargeObject(

205

id: Long,

206

name: String,

207

data: Array[Double],

208

metadata: Map[String, String]

209

)

210

211

// Generate test data

212

val random = new Random(42)

213

val testData = (1 to 10000).map { i =>

214

LargeObject(

215

id = i,

216

name = s"object_$i",

217

data = Array.fill(100)(random.nextDouble()),

218

metadata = Map(

219

"type" -> "test",

220

"version" -> "1.0",

221

"created" -> System.currentTimeMillis().toString

222

)

223

)

224

}

225

226

// Test with Java serialization

227

def testJavaSerialization(): Long = {

228

val javaConf = new SparkConf()

229

.setAppName("Java Serialization Test")

230

.setMaster("local[*]")

231

.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

232

233

val javaSc = new SparkContext(javaConf)

234

235

val start = System.currentTimeMillis()

236

val rdd = javaSc.parallelize(testData)

237

rdd.cache()

238

val count = rdd.count()

239

val end = System.currentTimeMillis()

240

241

javaSc.stop()

242

end - start

243

}

244

245

// Test with Kryo serialization

246

def testKryoSerialization(): Long = {

247

val kryoConf = new SparkConf()

248

.setAppName("Kryo Serialization Test")

249

.setMaster("local[*]")

250

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

251

.set("spark.kryo.registrationRequired", "false")

252

253

val kryoSc = new SparkContext(kryoConf)

254

255

val start = System.currentTimeMillis()

256

val rdd = kryoSc.parallelize(testData)

257

rdd.cache()

258

val count = rdd.count()

259

val end = System.currentTimeMillis()

260

261

kryoSc.stop()

262

end - start

263

}

264

265

// Compare performance

266

val javaTime = testJavaSerialization()

267

val kryoTime = testKryoSerialization()

268

269

println(s"Java serialization time: ${javaTime}ms")

270

println(s"Kryo serialization time: ${kryoTime}ms")

271

println(s"Kryo speedup: ${javaTime.toDouble / kryoTime}x")

272

```

273

274

### Custom Serialization for Complex Types

275

```scala

276

import com.esotericsoftware.kryo.{Kryo, Serializer}

277

import com.esotericsoftware.kryo.io.{Input, Output}

278

import java.time.LocalDateTime

279

import java.time.format.DateTimeFormatter

280

281

// Custom class with complex serialization needs

282

case class TimestampedEvent(

283

timestamp: LocalDateTime,

284

eventType: String,

285

data: Map[String, Any],

286

metadata: Option[String]

287

)

288

289

// Custom Kryo serializer for LocalDateTime

290

class LocalDateTimeSerializer extends Serializer[LocalDateTime] {

291

private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME

292

293

override def write(kryo: Kryo, output: Output, dateTime: LocalDateTime): Unit = {

294

output.writeString(dateTime.format(formatter))

295

}

296

297

override def read(kryo: Kryo, input: Input, `type`: Class[LocalDateTime]): LocalDateTime = {

298

LocalDateTime.parse(input.readString(), formatter)

299

}

300

}

301

302

// Registrator with custom serializers

303

class AdvancedKryoRegistrator extends KryoRegistrator {

304

override def registerClasses(kryo: Kryo): Unit = {

305

kryo.register(classOf[TimestampedEvent])

306

kryo.register(classOf[LocalDateTime], new LocalDateTimeSerializer)

307

kryo.register(classOf[scala.collection.immutable.Map[_, _]])

308

kryo.register(classOf[scala.Some[_]])

309

kryo.register(classOf[scala.None$])

310

311

// Configure Kryo settings

312

kryo.setReferences(true) // Enable object reference tracking

313

kryo.setRegistrationRequired(false) // Allow unregistered classes

314

}

315

}

316

317

val conf = new SparkConf()

318

.setAppName("Advanced Kryo")

319

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

320

.set("spark.kryo.registrator", "com.example.AdvancedKryoRegistrator")

321

322

val sc = new SparkContext(conf)

323

324

// Use complex objects

325

val events = sc.parallelize(Seq(

326

TimestampedEvent(

327

LocalDateTime.now(),

328

"user_login",

329

Map("userId" -> 123, "sessionId" -> "abc123"),

330

Some("Mobile app login")

331

),

332

TimestampedEvent(

333

LocalDateTime.now().minusHours(1),

334

"page_view",

335

Map("page" -> "/home", "duration" -> 30.5),

336

None

337

)

338

))

339

340

val result = events.filter(_.eventType == "user_login").collect()

341

```

342

343

### Broadcast Variable Serialization

344

```scala

345

// Large lookup table that benefits from efficient serialization

346

val largeLookupTable = (1 to 100000).map { i =>

347

i -> s"value_$i"

348

}.toMap

349

350

// Configure for optimal broadcast performance

351

val conf = new SparkConf()

352

.setAppName("Broadcast Serialization")

353

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

354

.set("spark.broadcast.compress", "true")

355

.set("spark.io.compression.codec", "lz4")

356

357

val sc = new SparkContext(conf)

358

359

// Broadcast with efficient serialization

360

val broadcastLookup = sc.broadcast(largeLookupTable)

361

362

val data = sc.parallelize(1 to 10000)

363

val enrichedData = data.map { id =>

364

val lookup = broadcastLookup.value

365

(id, lookup.getOrElse(id, "unknown"))

366

}

367

368

val result = enrichedData.collect()

369

broadcastLookup.unpersist()

370

```

371

372

## Best Practices

373

374

### Serializer Selection

375

1. **Use Kryo for production**: Generally 2-10x faster than Java serialization

376

2. **Register frequently used classes**: Improves performance and reduces storage

377

3. **Benchmark your workload**: Test both serializers with your specific data types

378

4. **Consider compression**: Enable compression for large objects

379

5. **Monitor serialization overhead**: Use Spark UI to identify bottlenecks

380

381

### Kryo Configuration

382

1. **Increase buffer sizes**: Set appropriate buffer sizes for large objects

383

2. **Enable registration requirement**: Use `registrationRequired=true` to catch unregistered classes

384

3. **Use custom serializers**: Implement efficient serialization for complex types

385

4. **Consider reference tracking**: Enable for object graphs with shared references

386

5. **Test thoroughly**: Verify serialization/deserialization of all data types

387

388

### Performance Optimization

389

1. **Avoid nested collections**: Flatten complex nested structures when possible

390

2. **Use primitive collections**: Prefer specialized collections for primitives

391

3. **Minimize object creation**: Reuse objects in serialization-heavy operations

392

4. **Profile serialization costs**: Identify expensive serialization operations

393

5. **Consider data formats**: Use efficient formats like Avro or Parquet for storage

394

395

### Common Pitfalls

396

1. **Serialization failures**: Always test serialization of custom classes

397

2. **Class loading issues**: Ensure all required classes are available on workers

398

3. **Version compatibility**: Be careful with serialized data across Spark versions

399

4. **Lambda serialization**: Avoid capturing large objects in closures

400

5. **Singleton objects**: Be careful with serializing singleton instances