or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

serialization.mddocs/

0

# Serialization

1

2

Serialization frameworks for efficient data transfer and storage in Spark, supporting Java serialization and Kryo for optimized performance across cluster nodes.

3

4

## Capabilities

5

6

### Serializer Base Class

7

8

Abstract base class for all serialization implementations in Spark.

9

10

```scala { .api }

11

/**

12

* Base class for serializers used by Spark

13

*/

14

abstract class Serializer {

15

/** Default buffer size for serialization */

16

def defaultBufferSize: Int = 65536

17

18

/** Create new serializer instance for thread-local use */

19

def newInstance(): SerializerInstance

20

21

/** Whether this serializer supports relocation of objects */

22

def supportsRelocationOfSerializedObjects: Boolean = true

23

}

24

25

/**

26

* Thread-local serializer instance

27

*/

28

abstract class SerializerInstance {

29

/** Serialize object to ByteBuffer */

30

def serialize[T: ClassTag](t: T): ByteBuffer

31

def serialize[T: ClassTag](t: T, buffer: ByteBuffer): ByteBuffer

32

33

/** Deserialize object from ByteBuffer */

34

def deserialize[T: ClassTag](bytes: ByteBuffer): T

35

def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T

36

37

/** Create serialization stream for multiple objects */

38

def serializeStream(s: OutputStream): SerializationStream

39

40

/** Create deserialization stream for multiple objects */

41

def deserializeStream(s: InputStream): DeserializationStream

42

}

43

```

44

45

### Serialization Streams

46

47

Streaming interfaces for serializing multiple objects efficiently.

48

49

```scala { .api }

50

/**

51

* Stream for writing serialized objects

52

*/

53

abstract class SerializationStream extends Closeable {

54

/** Write object to stream */

55

def writeObject[T: ClassTag](t: T): SerializationStream

56

57

/** Write key-value pair */

58

def writeKey[T: ClassTag](key: T): SerializationStream

59

def writeValue[T: ClassTag](value: T): SerializationStream

60

61

/** Write all objects from iterator */

62

def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream

63

64

/** Flush stream */

65

def flush(): Unit

66

67

/** Close stream */

68

def close(): Unit

69

}

70

71

/**

72

* Stream for reading serialized objects

73

*/

74

abstract class DeserializationStream extends Closeable {

75

/** Read object from stream */

76

def readObject[T: ClassTag](): T

77

78

/** Read key from stream */

79

def readKey[T: ClassTag](): T

80

81

/** Read value from stream */

82

def readValue[T: ClassTag](): T

83

84

/** Close stream */

85

def close(): Unit

86

87

/** Iterator over all objects in stream */

88

def asIterator: Iterator[Any]

89

def asKeyValueIterator: Iterator[(Any, Any)]

90

}

91

```

92

93

### Java Serializer

94

95

Default serializer using Java's built-in serialization mechanism.

96

97

```scala { .api }

98

/**

99

* Serializer using Java's built-in serialization

100

* @param conf Spark configuration

101

*/

102

class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {

103

override def newInstance(): SerializerInstance = {

104

val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)

105

new JavaSerializerInstance(counterReset, classLoader, this)

106

}

107

108

override def supportsRelocationOfSerializedObjects: Boolean = {

109

// Java serialization stores full class information, so objects can be relocated

110

true

111

}

112

113

private[spark] def writeExternal(out: ObjectOutput): Unit = { }

114

private[spark] def readExternal(in: ObjectInput): Unit = { }

115

}

116

117

/**

118

* Java serializer instance

119

*/

120

private[spark] class JavaSerializerInstance(

121

counterReset: Int,

122

defaultClassLoader: ClassLoader,

123

javaSerializer: JavaSerializer

124

) extends SerializerInstance {

125

126

override def serialize[T: ClassTag](t: T): ByteBuffer = {

127

val bos = new ByteArrayOutputStream()

128

val out = serializeStream(bos)

129

out.writeObject(t)

130

out.close()

131

ByteBuffer.wrap(bos.toByteArray)

132

}

133

134

override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

135

val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())

136

val in = deserializeStream(bis)

137

in.readObject[T]()

138

}

139

140

override def serializeStream(s: OutputStream): SerializationStream = {

141

new JavaSerializationStream(s, counterReset)

142

}

143

144

override def deserializeStream(s: InputStream): DeserializationStream = {

145

new JavaDeserializationStream(s, defaultClassLoader)

146

}

147

}

148

```

149

150

### Kryo Serializer

151

152

High-performance serializer using the Kryo serialization library.

153

154

```scala { .api }

155

/**

156

* Serializer using Kryo serialization library

157

* @param conf Spark configuration

158

*/

159

class KryoSerializer(conf: SparkConf) extends Serializer with Logging with Serializable {

160

161

override def newInstance(): SerializerInstance = {

162

this.synchronized {

163

new KryoSerializerInstance(this, useUnsafe, usePool)

164

}

165

}

166

167

override def supportsRelocationOfSerializedObjects: Boolean = {

168

// Kryo serialization may not include full class information

169

false

170

}

171

172

/** Buffer size for Kryo */

173

def bufferSize: Int = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)

174

175

/** Maximum buffer size for Kryo */

176

def maxBufferSize: Int = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE)

177

178

/** Whether to use unsafe I/O */

179

def useUnsafe: Boolean = conf.get(KRYO_USE_UNSAFE)

180

181

/** Whether to use object pools */

182

def usePool: Boolean = conf.get(KRYO_USE_POOL)

183

184

/** Custom Kryo registrator class */

185

def registratorClass: Option[Class[_]] = {

186

conf.getOption("spark.kryo.registrator").map { className =>

187

Class.forName(className, true, Thread.currentThread.getContextClassLoader)

188

}

189

}

190

191

/** Classes to register with Kryo */

192

def registrationRequired: Boolean = conf.get(KRYO_REGISTRATION_REQUIRED)

193

194

/** Create and configure Kryo instance */

195

def newKryo(): Kryo = {

196

val kryo = new Kryo()

197

198

// Configure Kryo settings

199

kryo.setRegistrationRequired(registrationRequired)

200

kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()))

201

202

// Register common Spark classes

203

kryo.register(classOf[Array[Any]])

204

kryo.register(classOf[Array[Object]])

205

kryo.register(classOf[Array[String]])

206

kryo.register(classOf[Array[Byte]])

207

kryo.register(classOf[Array[Int]])

208

kryo.register(classOf[Array[Long]])

209

kryo.register(classOf[Array[Double]])

210

211

// Apply custom registrator if specified

212

registratorClass.foreach { clazz =>

213

val registrator = clazz.newInstance().asInstanceOf[KryoRegistrator]

214

registrator.registerClasses(kryo)

215

}

216

217

kryo

218

}

219

}

220

221

/**

222

* Interface for custom Kryo registration

223

*/

224

trait KryoRegistrator {

225

def registerClasses(kryo: Kryo): Unit

226

}

227

228

/**

229

* Kryo serializer instance

230

*/

231

private[spark] class KryoSerializerInstance(

232

ks: KryoSerializer,

233

useUnsafe: Boolean,

234

usePool: Boolean

235

) extends SerializerInstance {

236

237

override def serialize[T: ClassTag](t: T): ByteBuffer = {

238

output.reset()

239

kryo.writeClassAndObject(output, t)

240

ByteBuffer.wrap(output.toBytes)

241

}

242

243

override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

244

input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())

245

kryo.readClassAndObject(input).asInstanceOf[T]

246

}

247

248

override def serializeStream(s: OutputStream): SerializationStream = {

249

new KryoSerializationStream(ks, s, useUnsafe)

250

}

251

252

override def deserializeStream(s: InputStream): DeserializationStream = {

253

new KryoDeserializationStream(ks, s, useUnsafe)

254

}

255

256

private[this] var kryo: Kryo = ks.newKryo()

257

private[this] var output: Output = ks.newKryoOutput()

258

private[this] var input: Input = ks.newKryoInput()

259

}

260

```

261

262

### Serializer Configuration

263

264

Configuration options for different serialization strategies.

265

266

```scala { .api }

267

/**

268

* Spark configuration keys for serialization

269

*/

270

object SerializationConfig {

271

// Serializer class

272

val SERIALIZER = ConfigBuilder("spark.serializer")

273

.version("0.5.0")

274

.stringConf

275

.createWithDefault("org.apache.spark.serializer.JavaSerializer")

276

277

// Kryo configurations

278

val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")

279

.version("0.5.0")

280

.bytesConf(ByteUnit.KiB)

281

.createWithDefaultString("64k")

282

283

val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")

284

.version("0.5.0")

285

.bytesConf(ByteUnit.MiB)

286

.createWithDefaultString("64m")

287

288

val KRYO_USE_UNSAFE = ConfigBuilder("spark.kryo.unsafe")

289

.version("2.1.0")

290

.booleanConf

291

.createWithDefault(false)

292

293

val KRYO_USE_POOL = ConfigBuilder("spark.kryo.pool")

294

.version("3.0.0")

295

.booleanConf

296

.createWithDefault(true)

297

298

val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")

299

.version("1.1.0")

300

.booleanConf

301

.createWithDefault(false)

302

}

303

```

304

305

### Custom Serializers

306

307

Creating custom serializers for specific data types or performance requirements.

308

309

```scala { .api }

310

/**

311

* Example: Custom serializer for specific domain objects

312

*/

313

class CustomDomainSerializer(conf: SparkConf) extends Serializer {

314

override def newInstance(): SerializerInstance = new CustomDomainSerializerInstance()

315

}

316

317

class CustomDomainSerializerInstance extends SerializerInstance {

318

override def serialize[T: ClassTag](t: T): ByteBuffer = t match {

319

case customObj: CustomDomainObject =>

320

// Custom serialization logic

321

val buffer = ByteBuffer.allocate(1024)

322

buffer.putInt(customObj.id)

323

buffer.put(customObj.name.getBytes("UTF-8"))

324

buffer.flip()

325

buffer

326

case _ =>

327

// Fall back to Java serialization

328

val bos = new ByteArrayOutputStream()

329

val oos = new ObjectOutputStream(bos)

330

oos.writeObject(t)

331

oos.close()

332

ByteBuffer.wrap(bos.toByteArray)

333

}

334

335

override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

336

val classTag = implicitly[ClassTag[T]]

337

if (classTag.runtimeClass == classOf[CustomDomainObject]) {

338

// Custom deserialization logic

339

val id = bytes.getInt()

340

val nameBytes = new Array[Byte](bytes.remaining())

341

bytes.get(nameBytes)

342

val name = new String(nameBytes, "UTF-8")

343

CustomDomainObject(id, name).asInstanceOf[T]

344

} else {

345

// Fall back to Java deserialization

346

val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())

347

val ois = new ObjectInputStream(bis)

348

ois.readObject().asInstanceOf[T]

349

}

350

}

351

352

override def serializeStream(s: OutputStream): SerializationStream = ???

353

override def deserializeStream(s: InputStream): DeserializationStream = ???

354

}

355

356

case class CustomDomainObject(id: Int, name: String)

357

```

358

359

### Kryo Registration Examples

360

361

```scala { .api }

362

/**

363

* Custom Kryo registrator for application-specific classes

364

*/

365

class MyKryoRegistrator extends KryoRegistrator {

366

override def registerClasses(kryo: Kryo): Unit = {

367

// Register custom classes

368

kryo.register(classOf[CustomDomainObject])

369

kryo.register(classOf[Array[CustomDomainObject]])

370

371

// Register with custom serializer

372

kryo.register(classOf[ComplexObject], new ComplexObjectSerializer())

373

374

// Register collection types

375

kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]])

376

kryo.register(classOf[scala.collection.immutable.Map[_, _]])

377

378

// Register common third-party classes

379

kryo.register(classOf[org.joda.time.DateTime])

380

kryo.register(classOf[java.util.UUID])

381

}

382

}

383

384

/**

385

* Custom Kryo serializer for complex objects

386

*/

387

class ComplexObjectSerializer extends com.esotericsoftware.kryo.Serializer[ComplexObject] {

388

override def write(kryo: Kryo, output: Output, obj: ComplexObject): Unit = {

389

output.writeInt(obj.id)

390

output.writeString(obj.data)

391

kryo.writeObject(output, obj.metadata)

392

}

393

394

override def read(kryo: Kryo, input: Input, clazz: Class[ComplexObject]): ComplexObject = {

395

val id = input.readInt()

396

val data = input.readString()

397

val metadata = kryo.readObject(input, classOf[Map[String, String]])

398

ComplexObject(id, data, metadata)

399

}

400

}

401

402

case class ComplexObject(id: Int, data: String, metadata: Map[String, String])

403

```

404

405

**Usage Examples:**

406

407

```scala

408

import org.apache.spark.{SparkContext, SparkConf}

409

import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}

410

411

// Java Serializer (default)

412

val javaConf = new SparkConf()

413

.setAppName("Java Serialization Example")

414

.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

415

416

val sc1 = new SparkContext(javaConf)

417

418

// Kryo Serializer (recommended for performance)

419

val kryoConf = new SparkConf()

420

.setAppName("Kryo Serialization Example")

421

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

422

.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")

423

.set("spark.kryoserializer.buffer", "128k")

424

.set("spark.kryoserializer.buffer.max", "128m")

425

.set("spark.kryo.registrationRequired", "true")

426

427

val sc2 = new SparkContext(kryoConf)

428

429

// Example with custom objects

430

case class Person(id: Int, name: String, age: Int)

431

432

val people = sc2.parallelize(Array(

433

Person(1, "Alice", 25),

434

Person(2, "Bob", 30),

435

Person(3, "Charlie", 35)

436

))

437

438

// Serialization happens automatically during shuffles

439

val grouped = people.groupBy(_.age / 10) // Triggers serialization

440

val result = grouped.collect()

441

442

// Broadcasting also uses serialization

443

val lookup = Map(1 -> "Manager", 2 -> "Developer", 3 -> "Analyst")

444

val broadcastLookup = sc2.broadcast(lookup)

445

446

val enriched = people.map { person =>

447

val roles = broadcastLookup.value

448

(person, roles.getOrElse(person.id, "Unknown"))

449

}

450

451

sc1.stop()

452

sc2.stop()

453

```

454

455

**Java Examples:**

456

457

```java

458

import org.apache.spark.SparkConf;

459

import org.apache.spark.api.java.JavaSparkContext;

460

import org.apache.spark.serializer.KryoRegistrator;

461

import com.esotericsoftware.kryo.Kryo;

462

463

// Kryo configuration in Java

464

SparkConf conf = new SparkConf()

465

.setAppName("Java Kryo Example")

466

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

467

.set("spark.kryo.registrator", MyJavaKryoRegistrator.class.getName());

468

469

JavaSparkContext sc = new JavaSparkContext(conf);

470

471

// Custom Kryo registrator in Java

472

public class MyJavaKryoRegistrator implements KryoRegistrator {

473

@Override

474

public void registerClasses(Kryo kryo) {

475

kryo.register(Person.class);

476

kryo.register(Person[].class);

477

}

478

}

479

480

public class Person implements Serializable {

481

private int id;

482

private String name;

483

484

// Constructors, getters, setters...

485

}

486

```

487

488

## Performance Considerations

489

490

### Java vs Kryo Serialization

491

492

**Java Serialization:**

493

- Pros: Built-in, handles complex object graphs, version compatibility

494

- Cons: Slower, larger serialized size, reflection overhead

495

496

**Kryo Serialization:**

497

- Pros: 2-10x faster, more compact, less CPU overhead

498

- Cons: Requires class registration, less compatible with schema evolution

499

500

### Configuration Best Practices

501

502

```scala

503

// Optimal Kryo configuration

504

val conf = new SparkConf()

505

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

506

.set("spark.kryo.registrator", "com.mycompany.MyKryoRegistrator")

507

.set("spark.kryoserializer.buffer", "256k") // Increase if needed

508

.set("spark.kryoserializer.buffer.max", "256m") // Increase for large objects

509

.set("spark.kryo.unsafe", "true") // Use unsafe I/O for performance

510

.set("spark.kryo.registrationRequired", "false") // Set true only after thorough testing

511

```

512

513

### Common Serialization Issues

514

515

```scala

516

// Avoid: Non-serializable objects in closures

517

val nonSerializable = new Database() // Not serializable

518

rdd.map(x => nonSerializable.lookup(x)) // Will fail!

519

520

// Solution: Use broadcast variables or create inside map

521

val config = Map("url" -> "jdbc:...")

522

val broadcastConfig = sc.broadcast(config)

523

rdd.map { x =>

524

val db = new Database(broadcastConfig.value("url"))

525

db.lookup(x)

526

}

527

528

// Avoid: Large objects in closures

529

val largeLookup = loadLargeMap() // Will be serialized with every task

530

rdd.map(x => largeLookup.get(x))

531

532

// Solution: Use broadcast variables

533

val broadcastLookup = sc.broadcast(largeLookup)

534

rdd.map(x => broadcastLookup.value.get(x))

535

```

536

537

Proper serialization configuration is crucial for Spark performance, especially in shuffle-heavy workloads and when using broadcast variables or accumulators.