or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

serialization.mddocs/

0

# Serialization and Deserialization

1

2

Kinesis-specific serialization interfaces that provide access to stream metadata during deserialization and allow custom target stream specification during serialization.

3

4

## Capabilities

5

6

### Kinesis Deserialization Schema

7

8

Extended deserialization interface that provides access to Kinesis record metadata including partition key, sequence number, approximate arrival timestamp, stream name, and shard ID.

9

10

```java { .api }

11

@PublicEvolving

12

public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

13

14

/**

15

* Initialize the deserialization schema with context.

16

*

17

* @param context Initialization context with runtime information

18

* @throws Exception On initialization errors

19

*/

20

default void open(DeserializationSchema.InitializationContext context) throws Exception;

21

22

/**

23

* Deserialize a Kinesis record with full metadata access.

24

*

25

* @param recordValue Raw record data bytes

26

* @param partitionKey Partition key used for the record

27

* @param seqNum Sequence number of the record within the shard

28

* @param approxArrivalTimestamp Approximate arrival timestamp in milliseconds

29

* @param stream Name of the Kinesis stream

30

* @param shardId ID of the shard containing this record

31

* @return Deserialized object of type T

32

* @throws IOException On deserialization errors

33

*/

34

T deserialize(byte[] recordValue, String partitionKey, String seqNum,

35

long approxArrivalTimestamp, String stream, String shardId) throws IOException;

36

}

37

```

38

39

### Kinesis Serialization Schema

40

41

Extended serialization interface that allows specifying the target stream for each record, enabling dynamic stream routing based on record content.

42

43

```java { .api }

44

@PublicEvolving

45

public interface KinesisSerializationSchema<T> extends Serializable {

46

47

/**

48

* Initialize the serialization schema with context.

49

*

50

* @param context Initialization context with runtime information

51

* @throws Exception On initialization errors

52

*/

53

default void open(InitializationContext context) throws Exception;

54

55

/**

56

* Serialize an object to ByteBuffer for Kinesis.

57

*

58

* @param element Object to serialize

59

* @return Serialized data as ByteBuffer

60

*/

61

ByteBuffer serialize(T element);

62

63

/**

64

* Determine the target stream for this record.

65

*

66

* @param element Object to determine stream for

67

* @return Target stream name

68

*/

69

String getTargetStream(T element);

70

}

71

```

72

73

### Schema Wrapper

74

75

Internal wrapper that adapts standard Flink DeserializationSchema to KinesisDeserializationSchema, automatically handling the conversion from Kinesis-specific parameters to standard deserialization.

76

77

```java { .api }

78

@Internal

79

public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> {

80

81

/**

82

* Create wrapper around standard deserialization schema.

83

*

84

* @param deserializationSchema Standard Flink deserialization schema to wrap

85

*/

86

public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);

87

88

/**

89

* Deserialize using wrapped schema, ignoring Kinesis metadata.

90

*

91

* @param recordValue Raw record data bytes

92

* @param partitionKey Partition key (ignored by wrapper)

93

* @param seqNum Sequence number (ignored by wrapper)

94

* @param approxArrivalTimestamp Arrival timestamp (ignored by wrapper)

95

* @param stream Stream name (ignored by wrapper)

96

* @param shardId Shard ID (ignored by wrapper)

97

* @return Deserialized object using wrapped schema

98

* @throws IOException On deserialization errors

99

*/

100

@Override

101

public T deserialize(byte[] recordValue, String partitionKey, String seqNum,

102

long approxArrivalTimestamp, String stream, String shardId) throws IOException;

103

104

/**

105

* Get type information from wrapped schema.

106

*

107

* @return Type information from wrapped deserialization schema

108

*/

109

@Override

110

public TypeInformation<T> getProducedType();

111

112

/**

113

* Open the wrapped deserialization schema.

114

*

115

* @param context Initialization context

116

* @throws Exception On initialization errors

117

*/

118

@Override

119

public void open(DeserializationSchema.InitializationContext context) throws Exception;

120

}

121

```

122

123

## Usage Examples

124

125

### Custom Deserialization with Metadata

126

127

```java

128

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

129

import org.apache.flink.api.common.typeinfo.TypeInformation;

130

import com.fasterxml.jackson.databind.ObjectMapper;

131

import java.nio.charset.StandardCharsets;

132

133

public class EventDeserializationSchema implements KinesisDeserializationSchema<EventWithMetadata> {

134

private transient ObjectMapper objectMapper;

135

136

@Override

137

public void open(DeserializationSchema.InitializationContext context) throws Exception {

138

objectMapper = new ObjectMapper();

139

}

140

141

@Override

142

public EventWithMetadata deserialize(byte[] recordValue, String partitionKey, String seqNum,

143

long approxArrivalTimestamp, String stream, String shardId)

144

throws IOException {

145

// Parse the JSON payload

146

String json = new String(recordValue, StandardCharsets.UTF_8);

147

Event event = objectMapper.readValue(json, Event.class);

148

149

// Create enriched event with Kinesis metadata

150

EventWithMetadata enrichedEvent = new EventWithMetadata();

151

enrichedEvent.setEvent(event);

152

enrichedEvent.setPartitionKey(partitionKey);

153

enrichedEvent.setSequenceNumber(seqNum);

154

enrichedEvent.setArrivalTimestamp(approxArrivalTimestamp);

155

enrichedEvent.setStreamName(stream);

156

enrichedEvent.setShardId(shardId);

157

158

return enrichedEvent;

159

}

160

161

@Override

162

public TypeInformation<EventWithMetadata> getProducedType() {

163

return TypeInformation.of(EventWithMetadata.class);

164

}

165

}

166

```

167

168

### Event-Time Extraction from Metadata

169

170

```java

171

public class TimestampedEventSchema implements KinesisDeserializationSchema<TimestampedEvent> {

172

private transient ObjectMapper objectMapper;

173

174

@Override

175

public void open(DeserializationSchema.InitializationContext context) throws Exception {

176

objectMapper = new ObjectMapper();

177

}

178

179

@Override

180

public TimestampedEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,

181

long approxArrivalTimestamp, String stream, String shardId)

182

throws IOException {

183

String json = new String(recordValue, StandardCharsets.UTF_8);

184

JsonNode node = objectMapper.readTree(json);

185

186

TimestampedEvent event = new TimestampedEvent();

187

event.setData(node.get("data").asText());

188

189

// Use event timestamp if available, otherwise use Kinesis arrival time

190

if (node.has("timestamp")) {

191

event.setEventTime(node.get("timestamp").asLong());

192

} else {

193

event.setEventTime(approxArrivalTimestamp);

194

}

195

196

// Add processing metadata

197

event.setProcessingTime(System.currentTimeMillis());

198

event.setPartitionKey(partitionKey);

199

200

return event;

201

}

202

203

@Override

204

public TypeInformation<TimestampedEvent> getProducedType() {

205

return TypeInformation.of(TimestampedEvent.class);

206

}

207

}

208

```

209

210

### Multi-Format Deserialization

211

212

```java

213

public class MultiFormatDeserializationSchema implements KinesisDeserializationSchema<GenericRecord> {

214

private transient ObjectMapper jsonMapper;

215

private transient AvroDeserializer avroDeserializer;

216

217

@Override

218

public void open(DeserializationSchema.InitializationContext context) throws Exception {

219

jsonMapper = new ObjectMapper();

220

avroDeserializer = new AvroDeserializer();

221

}

222

223

@Override

224

public GenericRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,

225

long approxArrivalTimestamp, String stream, String shardId)

226

throws IOException {

227

// Detect format based on stream name or content

228

if (stream.endsWith("-json")) {

229

return deserializeJson(recordValue);

230

} else if (stream.endsWith("-avro")) {

231

return deserializeAvro(recordValue);

232

} else {

233

// Auto-detect based on content

234

return autoDetectAndDeserialize(recordValue);

235

}

236

}

237

238

private GenericRecord deserializeJson(byte[] data) throws IOException {

239

String json = new String(data, StandardCharsets.UTF_8);

240

return jsonMapper.readValue(json, GenericRecord.class);

241

}

242

243

private GenericRecord deserializeAvro(byte[] data) throws IOException {

244

return avroDeserializer.deserialize(data);

245

}

246

247

private GenericRecord autoDetectAndDeserialize(byte[] data) throws IOException {

248

// Simple heuristic: JSON starts with '{' or '['

249

if (data.length > 0 && (data[0] == '{' || data[0] == '[')) {

250

return deserializeJson(data);

251

} else {

252

return deserializeAvro(data);

253

}

254

}

255

256

@Override

257

public TypeInformation<GenericRecord> getProducedType() {

258

return TypeInformation.of(GenericRecord.class);

259

}

260

}

261

```

262

263

### Custom Serialization with Stream Routing

264

265

```java

266

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;

267

import java.nio.ByteBuffer;

268

import java.nio.charset.StandardCharsets;

269

270

public class EventSerializationSchema implements KinesisSerializationSchema<Event> {

271

private transient ObjectMapper objectMapper;

272

273

@Override

274

public void open(InitializationContext context) throws Exception {

275

objectMapper = new ObjectMapper();

276

}

277

278

@Override

279

public ByteBuffer serialize(Event element) {

280

try {

281

// Add processing timestamp

282

element.setProcessedAt(System.currentTimeMillis());

283

284

// Serialize to JSON

285

String json = objectMapper.writeValueAsString(element);

286

return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));

287

} catch (Exception e) {

288

throw new RuntimeException("Failed to serialize event", e);

289

}

290

}

291

292

@Override

293

public String getTargetStream(Event element) {

294

// Route to different streams based on event properties

295

if (element.isHighPriority()) {

296

return "high-priority-events";

297

} else if (element.getEventType().equals("ERROR")) {

298

return "error-events";

299

} else {

300

return "normal-events";

301

}

302

}

303

}

304

```

305

306

### Dynamic Stream Routing with Tenant Isolation

307

308

```java

309

public class TenantAwareSerializationSchema implements KinesisSerializationSchema<TenantEvent> {

310

private transient ObjectMapper objectMapper;

311

312

@Override

313

public void open(InitializationContext context) throws Exception {

314

objectMapper = new ObjectMapper();

315

}

316

317

@Override

318

public ByteBuffer serialize(TenantEvent element) {

319

try {

320

// Enrich with metadata

321

element.setIngestionTime(System.currentTimeMillis());

322

element.setProcessingRegion(System.getProperty("aws.region", "unknown"));

323

324

String json = objectMapper.writeValueAsString(element);

325

return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));

326

} catch (Exception e) {

327

throw new RuntimeException("Failed to serialize tenant event", e);

328

}

329

}

330

331

@Override

332

public String getTargetStream(TenantEvent element) {

333

// Route to tenant-specific streams for isolation

334

String tenantId = element.getTenantId();

335

String eventType = element.getEventType();

336

337

return String.format("tenant-%s-events-%s", tenantId, eventType.toLowerCase());

338

}

339

}

340

```

341

342

### Binary Data Serialization

343

344

```java

345

public class BinaryDataSchema implements KinesisSerializationSchema<BinaryDataEvent> {

346

347

@Override

348

public ByteBuffer serialize(BinaryDataEvent element) {

349

// Handle binary data directly

350

ByteBuffer buffer = ByteBuffer.allocate(element.getDataSize() + 8);

351

352

// Add header with timestamp

353

buffer.putLong(element.getTimestamp());

354

355

// Add binary payload

356

buffer.put(element.getBinaryData());

357

358

buffer.flip();

359

return buffer;

360

}

361

362

@Override

363

public String getTargetStream(BinaryDataEvent element) {

364

// Route based on data type

365

return "binary-data-" + element.getDataType();

366

}

367

}

368

```

369

370

### Compression Support

371

372

```java

373

import java.io.ByteArrayOutputStream;

374

import java.util.zip.GZIPOutputStream;

375

import java.util.zip.GZIPInputStream;

376

377

public class CompressedSerializationSchema implements KinesisSerializationSchema<LargeEvent> {

378

private transient ObjectMapper objectMapper;

379

380

@Override

381

public void open(InitializationContext context) throws Exception {

382

objectMapper = new ObjectMapper();

383

}

384

385

@Override

386

public ByteBuffer serialize(LargeEvent element) {

387

try {

388

// Serialize to JSON first

389

String json = objectMapper.writeValueAsString(element);

390

byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);

391

392

// Compress if data is large

393

if (jsonBytes.length > 1024) {

394

return ByteBuffer.wrap(compress(jsonBytes));

395

} else {

396

return ByteBuffer.wrap(jsonBytes);

397

}

398

} catch (Exception e) {

399

throw new RuntimeException("Failed to serialize and compress event", e);

400

}

401

}

402

403

private byte[] compress(byte[] data) throws IOException {

404

ByteArrayOutputStream baos = new ByteArrayOutputStream();

405

try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {

406

gzipOut.write(data);

407

}

408

return baos.toByteArray();

409

}

410

411

@Override

412

public String getTargetStream(LargeEvent element) {

413

return element.getStreamName();

414

}

415

}

416

```

417

418

## Error Handling in Serialization

419

420

### Robust Deserialization with Fallbacks

421

422

```java

423

public class RobustDeserializationSchema implements KinesisDeserializationSchema<Either<Event, ErrorRecord>> {

424

private transient ObjectMapper objectMapper;

425

private transient Logger logger;

426

427

@Override

428

public void open(DeserializationSchema.InitializationContext context) throws Exception {

429

objectMapper = new ObjectMapper();

430

logger = LoggerFactory.getLogger(getClass());

431

}

432

433

@Override

434

public Either<Event, ErrorRecord> deserialize(byte[] recordValue, String partitionKey, String seqNum,

435

long approxArrivalTimestamp, String stream, String shardId) {

436

try {

437

String json = new String(recordValue, StandardCharsets.UTF_8);

438

Event event = objectMapper.readValue(json, Event.class);

439

return Either.left(event);

440

} catch (Exception e) {

441

logger.warn("Failed to deserialize record from stream {} shard {}: {}",

442

stream, shardId, e.getMessage());

443

444

ErrorRecord errorRecord = new ErrorRecord();

445

errorRecord.setRawData(recordValue);

446

errorRecord.setError(e.getMessage());

447

errorRecord.setStreamName(stream);

448

errorRecord.setShardId(shardId);

449

errorRecord.setSequenceNumber(seqNum);

450

errorRecord.setTimestamp(approxArrivalTimestamp);

451

452

return Either.right(errorRecord);

453

}

454

}

455

456

@Override

457

public TypeInformation<Either<Event, ErrorRecord>> getProducedType() {

458

return new EitherTypeInfo<>(

459

TypeInformation.of(Event.class),

460

TypeInformation.of(ErrorRecord.class)

461

);

462

}

463

}

464

```

465

466

## Performance Considerations

467

468

### Efficient Object Reuse

469

470

```java

471

public class EfficientDeserializationSchema implements KinesisDeserializationSchema<Event> {

472

private transient ObjectMapper objectMapper;

473

private transient Event reusableEvent; // Reuse objects to reduce GC pressure

474

475

@Override

476

public void open(DeserializationSchema.InitializationContext context) throws Exception {

477

objectMapper = new ObjectMapper();

478

reusableEvent = new Event();

479

}

480

481

@Override

482

public Event deserialize(byte[] recordValue, String partitionKey, String seqNum,

483

long approxArrivalTimestamp, String stream, String shardId) throws IOException {

484

// Reuse object and reset fields

485

reusableEvent.reset();

486

487

// Efficient parsing without creating intermediate objects

488

JsonParser parser = objectMapper.getFactory().createParser(recordValue);

489

// ... parse fields directly into reusableEvent

490

491

return reusableEvent.copy(); // Return copy for thread safety

492

}

493

494

@Override

495

public TypeInformation<Event> getProducedType() {

496

return TypeInformation.of(Event.class);

497

}

498

}

499

```

500

501

### Batch Serialization Optimization

502

503

```java

504

public class BatchOptimizedSchema implements KinesisSerializationSchema<List<Event>> {

505

private transient ObjectMapper objectMapper;

506

private transient ByteArrayOutputStream buffer;

507

508

@Override

509

public void open(InitializationContext context) throws Exception {

510

objectMapper = new ObjectMapper();

511

buffer = new ByteArrayOutputStream(4096); // Pre-allocated buffer

512

}

513

514

@Override

515

public ByteBuffer serialize(List<Event> elements) {

516

try {

517

buffer.reset(); // Reuse buffer

518

519

// Efficient batch serialization

520

objectMapper.writeValue(buffer, elements);

521

522

return ByteBuffer.wrap(buffer.toByteArray());

523

} catch (Exception e) {

524

throw new RuntimeException("Failed to serialize event batch", e);

525

}

526

}

527

528

@Override

529

public String getTargetStream(List<Event> elements) {

530

// Route based on first event in batch

531

return elements.isEmpty() ? "default-stream" : elements.get(0).getStreamName();

532

}

533

}