or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

schema-serialization.mddocs/

0

# Schema and Serialization

1

2

Type-safe message serialization with built-in schemas, custom serialization formats, and comprehensive type system integration for robust message handling.

3

4

## Capabilities

5

6

### Schema Interface

7

8

Core interface for message serialization and deserialization with type safety.

9

10

```java { .api }

11

/**

12

* Schema interface for message serialization/deserialization

13

* Provides type-safe conversion between domain objects and byte arrays

14

*/

15

interface Schema<T> {

16

/** Encode object to byte array */

17

byte[] encode(T message);

18

19

/** Decode byte array to object */

20

T decode(byte[] bytes);

21

22

/** Decode with schema version */

23

T decode(byte[] bytes, byte[] schemaVersion);

24

25

/** Decode ByteBuffer to object */

26

T decode(ByteBuffer data);

27

28

/** Decode ByteBuffer with schema version */

29

T decode(ByteBuffer data, byte[] schemaVersion);

30

31

/** Get schema information */

32

SchemaInfo getSchemaInfo();

33

34

/** Check if schema fetching is required */

35

boolean requireFetchingSchemaInfo();

36

37

/** Configure schema information */

38

void configureSchemaInfo(String topicName, String componentName, SchemaInfo schemaInfo);

39

40

/** Clone schema instance */

41

Schema<T> clone();

42

43

/** Validate message against schema */

44

void validate(byte[] message);

45

46

/** Check if schema supports schema versioning */

47

boolean supportSchemaVersioning();

48

49

/** Configure schema information */

50

void configureSchemaInfo(String topic, String componentName, SchemaInfo schemaInfo);

51

52

/** Set schema info provider */

53

void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider);

54

55

/** Get native schema object */

56

Optional<Object> getNativeSchema();

57

}

58

```

59

60

### Built-in Schemas

61

62

Pre-defined schemas for common data types.

63

64

```java { .api }

65

interface Schema<T> {

66

/** Byte array schema (no serialization) */

67

static final Schema<byte[]> BYTES = BytesSchema.of();

68

69

/** ByteBuffer schema */

70

static final Schema<ByteBuffer> BYTEBUFFER = ByteBufferSchema.of();

71

72

/** String schema (UTF-8 encoding) */

73

static final Schema<String> STRING = StringSchema.utf8();

74

75

/** 8-bit integer schema */

76

static final Schema<Byte> INT8 = ByteSchema.of();

77

78

/** 16-bit integer schema */

79

static final Schema<Short> INT16 = ShortSchema.of();

80

81

/** 32-bit integer schema */

82

static final Schema<Integer> INT32 = IntSchema.of();

83

84

/** 64-bit integer schema */

85

static final Schema<Long> INT64 = LongSchema.of();

86

87

/** Single precision float schema */

88

static final Schema<Float> FLOAT = FloatSchema.of();

89

90

/** Double precision float schema */

91

static final Schema<Double> DOUBLE = DoubleSchema.of();

92

93

/** Boolean schema */

94

static final Schema<Boolean> BOOL = BooleanSchema.of();

95

96

/** Date schema */

97

static final Schema<Date> DATE = DateSchema.of();

98

99

/** Time schema */

100

static final Schema<Time> TIME = TimeSchema.of();

101

102

/** Timestamp schema */

103

static final Schema<Timestamp> TIMESTAMP = TimestampSchema.of();

104

105

/** Instant schema */

106

static final Schema<Instant> INSTANT = InstantSchema.of();

107

108

/** LocalDate schema */

109

static final Schema<LocalDate> LOCAL_DATE = LocalDateSchema.of();

110

111

/** LocalTime schema */

112

static final Schema<LocalTime> LOCAL_TIME = LocalTimeSchema.of();

113

114

/** LocalDateTime schema */

115

static final Schema<LocalDateTime> LOCAL_DATE_TIME = LocalDateTimeSchema.of();

116

}

117

```

118

119

**Basic Schema Usage:**

120

121

```java

122

import org.apache.pulsar.client.api.*;

123

124

// String producer/consumer

125

Producer<String> stringProducer = client.newProducer(Schema.STRING)

126

.topic("string-topic")

127

.create();

128

129

Consumer<String> stringConsumer = client.newConsumer(Schema.STRING)

130

.topic("string-topic")

131

.subscriptionName("string-sub")

132

.subscribe();

133

134

// Integer producer/consumer

135

Producer<Integer> intProducer = client.newProducer(Schema.INT32)

136

.topic("int-topic")

137

.create();

138

139

Consumer<Integer> intConsumer = client.newConsumer(Schema.INT32)

140

.topic("int-topic")

141

.subscriptionName("int-sub")

142

.subscribe();

143

144

// Send and receive typed messages

145

stringProducer.send("Hello World");

146

intProducer.send(42);

147

148

String stringMsg = stringConsumer.receive().getValue();

149

Integer intMsg = intConsumer.receive().getValue();

150

```

151

152

### Complex Schema Factory Methods

153

154

Factory methods for creating schemas for complex data types.

155

156

```java { .api }

157

interface Schema<T> {

158

/** Create Avro schema for POJO */

159

static <T> Schema<T> AVRO(Class<T> pojo);

160

161

/** Create Avro schema with custom AvroSchema */

162

static <T> Schema<T> AVRO(org.apache.avro.Schema avroSchema);

163

164

/** Create JSON schema for POJO */

165

static <T> Schema<T> JSON(Class<T> pojo);

166

167

/** Create JSON schema with custom configuration */

168

static <T> Schema<T> JSON(SchemaDefinition<T> schemaDefinition);

169

170

/** Create Protobuf schema */

171

static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> protobufClass);

172

173

/** Create native Protobuf schema */

174

static <T> Schema<T> PROTOBUF_NATIVE(Class<T> protobufNativeClass);

175

176

/** Create native Protobuf schema with descriptor */

177

static <T> Schema<T> PROTOBUF_NATIVE(Class<T> clazz, com.google.protobuf.Descriptors.Descriptor descriptor);

178

179

/** Create KeyValue schema with separate encoding */

180

static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema);

181

182

/** Create KeyValue schema with specified encoding type */

183

static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema, KeyValueEncodingType keyValueEncodingType);

184

185

/** Create KeyValue schema with custom configuration */

186

static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> keyClass, Class<V> valueClass, SchemaType type);

187

188

/** Create generic Avro schema */

189

static Schema<GenericRecord> GENERIC_AVRO(org.apache.avro.Schema avroSchema);

190

191

/** Create auto-consuming schema */

192

static Schema<GenericRecord> AUTO_CONSUME();

193

194

/** Create auto-producing bytes schema */

195

static Schema<byte[]> AUTO_PRODUCE_BYTES();

196

197

/** Create auto-producing bytes schema with validator */

198

static Schema<byte[]> AUTO_PRODUCE_BYTES(SchemaValidator<byte[]> validator);

199

}

200

```

201

202

**Complex Schema Examples:**

203

204

```java

205

// AVRO schema for POJO

206

public class User {

207

public String name;

208

public int age;

209

public String email;

210

}

211

212

Schema<User> userSchema = Schema.AVRO(User.class);

213

Producer<User> userProducer = client.newProducer(userSchema)

214

.topic("user-topic")

215

.create();

216

217

User user = new User();

218

user.name = "Alice";

219

user.age = 30;

220

user.email = "alice@example.com";

221

userProducer.send(user);

222

223

// JSON schema

224

Schema<User> jsonUserSchema = Schema.JSON(User.class);

225

Producer<User> jsonProducer = client.newProducer(jsonUserSchema)

226

.topic("json-user-topic")

227

.create();

228

229

// KeyValue schema

230

Schema<KeyValue<String, User>> kvSchema = Schema.KeyValue(

231

Schema.STRING,

232

Schema.JSON(User.class)

233

);

234

235

Producer<KeyValue<String, User>> kvProducer = client.newProducer(kvSchema)

236

.topic("kv-topic")

237

.create();

238

239

KeyValue<String, User> kv = new KeyValue<>("user-123", user);

240

kvProducer.send(kv);

241

242

// Protobuf schema (assuming UserProto is generated protobuf class)

243

Schema<UserProto> protoSchema = Schema.PROTOBUF(UserProto.class);

244

Producer<UserProto> protoProducer = client.newProducer(protoSchema)

245

.topic("proto-topic")

246

.create();

247

```

248

249

### SchemaInfo and Configuration

250

251

Schema metadata and configuration classes.

252

253

```java { .api }

254

/**

255

* Schema information metadata

256

*/

257

interface SchemaInfo {

258

/** Get schema name */

259

String getName();

260

261

/** Get schema data */

262

byte[] getSchema();

263

264

/** Get schema type */

265

SchemaType getType();

266

267

/** Get schema properties */

268

Map<String, String> getProperties();

269

270

/** Get timestamp */

271

long getTimestamp();

272

}

273

274

/**

275

* Schema definition builder for custom configuration

276

*/

277

class SchemaDefinition<T> {

278

/** Create builder */

279

static <T> SchemaDefinitionBuilder<T> builder();

280

281

/** Get POJO class */

282

Class<T> getPojo();

283

284

/** Get properties */

285

Map<String, String> getProperties();

286

287

/** Get JSON properties */

288

String getJsonDef();

289

290

/** Check if JSR310 conversion is enabled */

291

boolean getJsr310ConversionEnabled();

292

293

/** Get always allow null setting */

294

boolean getAlwaysAllowNull();

295

296

/** Get schema reader */

297

SchemaReader<T> getSchemaReader();

298

299

/** Get schema writer */

300

SchemaWriter<T> getSchemaWriter();

301

302

interface SchemaDefinitionBuilder<T> {

303

/** Set POJO class */

304

SchemaDefinitionBuilder<T> withPojo(Class<T> pojo);

305

306

/** Add property */

307

SchemaDefinitionBuilder<T> withProperty(String key, String value);

308

309

/** Set properties */

310

SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);

311

312

/** Set JSON definition */

313

SchemaDefinitionBuilder<T> withJsonDef(String jsonDef);

314

315

/** Enable JSR310 conversion */

316

SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);

317

318

/** Set always allow null */

319

SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);

320

321

/** Set schema reader */

322

SchemaDefinitionBuilder<T> withSchemaReader(SchemaReader<T> schemaReader);

323

324

/** Set schema writer */

325

SchemaDefinitionBuilder<T> withSchemaWriter(SchemaWriter<T> schemaWriter);

326

327

/** Build schema definition */

328

SchemaDefinition<T> build();

329

}

330

}

331

```

332

333

### KeyValue Schema Support

334

335

Support for key-value pair messages with separate schemas for keys and values.

336

337

```java { .api }

338

/**

339

* KeyValue pair container

340

*/

341

class KeyValue<K, V> {

342

/** Create KeyValue pair */

343

static <K, V> KeyValue<K, V> of(K key, V value);

344

345

/** Get key */

346

K getKey();

347

348

/** Get value */

349

V getValue();

350

}

351

352

/**

353

* KeyValue encoding types

354

*/

355

enum KeyValueEncodingType {

356

/** Separate encoding for key and value */

357

SEPARATED,

358

/** Inline encoding */

359

INLINE

360

}

361

```

362

363

**KeyValue Schema Examples:**

364

365

```java

366

// KeyValue with separate encoding (default)

367

Schema<KeyValue<String, Integer>> kvSchema = Schema.KeyValue(

368

Schema.STRING,

369

Schema.INT32

370

);

371

372

Producer<KeyValue<String, Integer>> producer = client.newProducer(kvSchema)

373

.topic("kv-topic")

374

.create();

375

376

KeyValue<String, Integer> kv = KeyValue.of("counter", 42);

377

producer.send(kv);

378

379

// KeyValue with inline encoding

380

Schema<KeyValue<String, User>> inlineKvSchema = Schema.KeyValue(

381

Schema.STRING,

382

Schema.JSON(User.class),

383

KeyValueEncodingType.INLINE

384

);

385

386

// Complex nested KeyValue

387

Schema<KeyValue<User, List<String>>> nestedSchema = Schema.KeyValue(

388

Schema.JSON(User.class),

389

Schema.STRING // Will be used for JSON serialization of List<String>

390

);

391

```

392

393

### Generic Record Support

394

395

Support for schema-less and generic record handling.

396

397

```java { .api }

398

/**

399

* Generic record interface for schema-less data

400

*/

401

interface GenericRecord {

402

/** Get schema version */

403

byte[] getSchemaVersion();

404

405

/** Get all fields */

406

List<Field> getFields();

407

408

/** Get field by index */

409

Object getField(int index);

410

411

/** Get field by name */

412

Object getField(String fieldName);

413

414

/** Get native object */

415

Object getNativeObject();

416

}

417

418

/**

419

* Field definition

420

*/

421

interface Field {

422

/** Get field name */

423

String getName();

424

425

/** Get field index */

426

int getIndex();

427

}

428

```

429

430

**Generic Record Examples:**

431

432

```java

433

// Auto-consuming schema

434

Schema<GenericRecord> autoSchema = Schema.AUTO_CONSUME();

435

Consumer<GenericRecord> consumer = client.newConsumer(autoSchema)

436

.topic("mixed-schema-topic")

437

.subscriptionName("auto-consumer")

438

.subscribe();

439

440

// Handle messages with different schemas

441

Message<GenericRecord> message = consumer.receive();

442

GenericRecord record = message.getValue();

443

444

// Access fields generically

445

Object nameField = record.getField("name");

446

Object ageField = record.getField("age");

447

448

// Auto-producing schema

449

Schema<byte[]> autoProduceSchema = Schema.AUTO_PRODUCE_BYTES();

450

Producer<byte[]> producer = client.newProducer(autoProduceSchema)

451

.topic("auto-produce-topic")

452

.create();

453

```

454

455

### Schema Validation

456

457

Schema validation and compatibility checking.

458

459

```java { .api }

460

/**

461

* Schema validator interface

462

*/

463

interface SchemaValidator<T> {

464

/** Validate message against schema */

465

void validate(T message);

466

}

467

468

/**

469

* Schema compatibility strategy

470

*/

471

enum SchemaCompatibilityStrategy {

472

/** Full compatibility (read/write with all versions) */

473

FULL,

474

/** Backward compatibility (read old, write new) */

475

BACKWARD,

476

/** Forward compatibility (read new, write old) */

477

FORWARD,

478

/** Full transitive compatibility */

479

FULL_TRANSITIVE,

480

/** Backward transitive compatibility */

481

BACKWARD_TRANSITIVE,

482

/** Forward transitive compatibility */

483

FORWARD_TRANSITIVE,

484

/** No compatibility checks */

485

NONE

486

}

487

```

488

489

## Supporting Types and Enums

490

491

```java { .api }

492

enum SchemaType {

493

NONE,

494

STRING,

495

JSON,

496

PROTOBUF,

497

AVRO,

498

BOOLEAN,

499

INT8,

500

INT16,

501

INT32,

502

INT64,

503

FLOAT,

504

DOUBLE,

505

DATE,

506

TIME,

507

TIMESTAMP,

508

INSTANT,

509

LOCAL_DATE,

510

LOCAL_TIME,

511

LOCAL_DATE_TIME,

512

PROTOBUF_NATIVE,

513

KEY_VALUE,

514

BYTES,

515

AUTO,

516

AUTO_CONSUME,

517

AUTO_PUBLISH

518

}

519

520

class SchemaSerializationException extends RuntimeException {

521

SchemaSerializationException(String message);

522

SchemaSerializationException(String message, Throwable cause);

523

}

524

525

interface SchemaInfoProvider {

526

/** Get schema information by version */

527

CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);

528

529

/** Get latest schema information */

530

CompletableFuture<SchemaInfo> getLatestSchema();

531

532

/** Get topic name */

533

String getTopicName();

534

}

535

536

interface SchemaReader<T> {

537

/** Read object from input stream */

538

T read(InputStream inputStream);

539

540

/** Read object from byte array */

541

T read(byte[] bytes);

542

}

543

544

interface SchemaWriter<T> {

545

/** Write object to output stream */

546

void write(T obj, OutputStream outputStream);

547

548

/** Write object to byte array */

549

byte[] write(T obj);

550

}

551

```