0
# Schema and Serialization
1
2
Type-safe message serialization with built-in schemas, custom serialization formats, and comprehensive type system integration for robust message handling.
3
4
## Capabilities
5
6
### Schema Interface
7
8
Core interface for message serialization and deserialization with type safety.
9
10
```java { .api }
11
/**
12
* Schema interface for message serialization/deserialization
13
* Provides type-safe conversion between domain objects and byte arrays
14
*/
15
interface Schema<T> {
16
/** Encode object to byte array */
17
byte[] encode(T message);
18
19
/** Decode byte array to object */
20
T decode(byte[] bytes);
21
22
/** Decode with schema version */
23
T decode(byte[] bytes, byte[] schemaVersion);
24
25
/** Decode ByteBuffer to object */
26
T decode(ByteBuffer data);
27
28
/** Decode ByteBuffer with schema version */
29
T decode(ByteBuffer data, byte[] schemaVersion);
30
31
/** Get schema information */
32
SchemaInfo getSchemaInfo();
33
34
/** Check if schema fetching is required */
35
boolean requireFetchingSchemaInfo();
36
37
/** Configure schema information */
38
void configureSchemaInfo(String topicName, String componentName, SchemaInfo schemaInfo);
39
40
/** Clone schema instance */
41
Schema<T> clone();
42
43
/** Validate message against schema */
44
void validate(byte[] message);
45
46
/** Check if schema supports schema versioning */
47
boolean supportSchemaVersioning();
48
49
/** Configure schema information */
50
void configureSchemaInfo(String topic, String componentName, SchemaInfo schemaInfo);
51
52
/** Set schema info provider */
53
void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider);
54
55
/** Get native schema object */
56
Optional<Object> getNativeSchema();
57
}
58
```
59
60
### Built-in Schemas
61
62
Pre-defined schemas for common data types.
63
64
```java { .api }
65
interface Schema<T> {
66
/** Byte array schema (no serialization) */
67
static final Schema<byte[]> BYTES = BytesSchema.of();
68
69
/** ByteBuffer schema */
70
static final Schema<ByteBuffer> BYTEBUFFER = ByteBufferSchema.of();
71
72
/** String schema (UTF-8 encoding) */
73
static final Schema<String> STRING = StringSchema.utf8();
74
75
/** 8-bit integer schema */
76
static final Schema<Byte> INT8 = ByteSchema.of();
77
78
/** 16-bit integer schema */
79
static final Schema<Short> INT16 = ShortSchema.of();
80
81
/** 32-bit integer schema */
82
static final Schema<Integer> INT32 = IntSchema.of();
83
84
/** 64-bit integer schema */
85
static final Schema<Long> INT64 = LongSchema.of();
86
87
/** Single precision float schema */
88
static final Schema<Float> FLOAT = FloatSchema.of();
89
90
/** Double precision float schema */
91
static final Schema<Double> DOUBLE = DoubleSchema.of();
92
93
/** Boolean schema */
94
static final Schema<Boolean> BOOL = BooleanSchema.of();
95
96
/** Date schema */
97
static final Schema<Date> DATE = DateSchema.of();
98
99
/** Time schema */
100
static final Schema<Time> TIME = TimeSchema.of();
101
102
/** Timestamp schema */
103
static final Schema<Timestamp> TIMESTAMP = TimestampSchema.of();
104
105
/** Instant schema */
106
static final Schema<Instant> INSTANT = InstantSchema.of();
107
108
/** LocalDate schema */
109
static final Schema<LocalDate> LOCAL_DATE = LocalDateSchema.of();
110
111
/** LocalTime schema */
112
static final Schema<LocalTime> LOCAL_TIME = LocalTimeSchema.of();
113
114
/** LocalDateTime schema */
115
static final Schema<LocalDateTime> LOCAL_DATE_TIME = LocalDateTimeSchema.of();
116
}
117
```
118
119
**Basic Schema Usage:**
120
121
```java
122
import org.apache.pulsar.client.api.*;
123
124
// String producer/consumer
125
Producer<String> stringProducer = client.newProducer(Schema.STRING)
126
.topic("string-topic")
127
.create();
128
129
Consumer<String> stringConsumer = client.newConsumer(Schema.STRING)
130
.topic("string-topic")
131
.subscriptionName("string-sub")
132
.subscribe();
133
134
// Integer producer/consumer
135
Producer<Integer> intProducer = client.newProducer(Schema.INT32)
136
.topic("int-topic")
137
.create();
138
139
Consumer<Integer> intConsumer = client.newConsumer(Schema.INT32)
140
.topic("int-topic")
141
.subscriptionName("int-sub")
142
.subscribe();
143
144
// Send and receive typed messages
145
stringProducer.send("Hello World");
146
intProducer.send(42);
147
148
String stringMsg = stringConsumer.receive().getValue();
149
Integer intMsg = intConsumer.receive().getValue();
150
```
151
152
### Complex Schema Factory Methods
153
154
Factory methods for creating schemas for complex data types.
155
156
```java { .api }
157
interface Schema<T> {
158
/** Create Avro schema for POJO */
159
static <T> Schema<T> AVRO(Class<T> pojo);
160
161
/** Create Avro schema with custom AvroSchema */
162
static <T> Schema<T> AVRO(org.apache.avro.Schema avroSchema);
163
164
/** Create JSON schema for POJO */
165
static <T> Schema<T> JSON(Class<T> pojo);
166
167
/** Create JSON schema with custom configuration */
168
static <T> Schema<T> JSON(SchemaDefinition<T> schemaDefinition);
169
170
/** Create Protobuf schema */
171
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> protobufClass);
172
173
/** Create native Protobuf schema */
174
static <T> Schema<T> PROTOBUF_NATIVE(Class<T> protobufNativeClass);
175
176
/** Create native Protobuf schema with descriptor */
177
static <T> Schema<T> PROTOBUF_NATIVE(Class<T> clazz, com.google.protobuf.Descriptors.Descriptor descriptor);
178
179
/** Create KeyValue schema with separate encoding */
180
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema);
181
182
/** Create KeyValue schema with specified encoding type */
183
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema, KeyValueEncodingType keyValueEncodingType);
184
185
/** Create KeyValue schema with custom configuration */
186
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> keyClass, Class<V> valueClass, SchemaType type);
187
188
/** Create generic Avro schema */
189
static Schema<GenericRecord> GENERIC_AVRO(org.apache.avro.Schema avroSchema);
190
191
/** Create auto-consuming schema */
192
static Schema<GenericRecord> AUTO_CONSUME();
193
194
/** Create auto-producing bytes schema */
195
static Schema<byte[]> AUTO_PRODUCE_BYTES();
196
197
/** Create auto-producing bytes schema with validator */
198
static Schema<byte[]> AUTO_PRODUCE_BYTES(SchemaValidator<byte[]> validator);
199
}
200
```
201
202
**Complex Schema Examples:**
203
204
```java
205
// AVRO schema for POJO
206
public class User {
207
public String name;
208
public int age;
209
public String email;
210
}
211
212
Schema<User> userSchema = Schema.AVRO(User.class);
213
Producer<User> userProducer = client.newProducer(userSchema)
214
.topic("user-topic")
215
.create();
216
217
User user = new User();
218
user.name = "Alice";
219
user.age = 30;
220
user.email = "alice@example.com";
221
userProducer.send(user);
222
223
// JSON schema
224
Schema<User> jsonUserSchema = Schema.JSON(User.class);
225
Producer<User> jsonProducer = client.newProducer(jsonUserSchema)
226
.topic("json-user-topic")
227
.create();
228
229
// KeyValue schema
230
Schema<KeyValue<String, User>> kvSchema = Schema.KeyValue(
231
Schema.STRING,
232
Schema.JSON(User.class)
233
);
234
235
Producer<KeyValue<String, User>> kvProducer = client.newProducer(kvSchema)
236
.topic("kv-topic")
237
.create();
238
239
KeyValue<String, User> kv = new KeyValue<>("user-123", user);
240
kvProducer.send(kv);
241
242
// Protobuf schema (assuming UserProto is generated protobuf class)
243
Schema<UserProto> protoSchema = Schema.PROTOBUF(UserProto.class);
244
Producer<UserProto> protoProducer = client.newProducer(protoSchema)
245
.topic("proto-topic")
246
.create();
247
```
248
249
### SchemaInfo and Configuration
250
251
Schema metadata and configuration classes.
252
253
```java { .api }
254
/**
255
* Schema information metadata
256
*/
257
interface SchemaInfo {
258
/** Get schema name */
259
String getName();
260
261
/** Get schema data */
262
byte[] getSchema();
263
264
/** Get schema type */
265
SchemaType getType();
266
267
/** Get schema properties */
268
Map<String, String> getProperties();
269
270
/** Get timestamp */
271
long getTimestamp();
272
}
273
274
/**
275
* Schema definition builder for custom configuration
276
*/
277
class SchemaDefinition<T> {
278
/** Create builder */
279
static <T> SchemaDefinitionBuilder<T> builder();
280
281
/** Get POJO class */
282
Class<T> getPojo();
283
284
/** Get properties */
285
Map<String, String> getProperties();
286
287
/** Get JSON properties */
288
String getJsonDef();
289
290
/** Check if JSR310 conversion is enabled */
291
boolean getJsr310ConversionEnabled();
292
293
/** Get always allow null setting */
294
boolean getAlwaysAllowNull();
295
296
/** Get schema reader */
297
SchemaReader<T> getSchemaReader();
298
299
/** Get schema writer */
300
SchemaWriter<T> getSchemaWriter();
301
302
interface SchemaDefinitionBuilder<T> {
303
/** Set POJO class */
304
SchemaDefinitionBuilder<T> withPojo(Class<T> pojo);
305
306
/** Add property */
307
SchemaDefinitionBuilder<T> withProperty(String key, String value);
308
309
/** Set properties */
310
SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);
311
312
/** Set JSON definition */
313
SchemaDefinitionBuilder<T> withJsonDef(String jsonDef);
314
315
/** Enable JSR310 conversion */
316
SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);
317
318
/** Set always allow null */
319
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
320
321
/** Set schema reader */
322
SchemaDefinitionBuilder<T> withSchemaReader(SchemaReader<T> schemaReader);
323
324
/** Set schema writer */
325
SchemaDefinitionBuilder<T> withSchemaWriter(SchemaWriter<T> schemaWriter);
326
327
/** Build schema definition */
328
SchemaDefinition<T> build();
329
}
330
}
331
```
332
333
### KeyValue Schema Support
334
335
Support for key-value pair messages with separate schemas for keys and values.
336
337
```java { .api }
338
/**
339
* KeyValue pair container
340
*/
341
class KeyValue<K, V> {
342
/** Create KeyValue pair */
343
static <K, V> KeyValue<K, V> of(K key, V value);
344
345
/** Get key */
346
K getKey();
347
348
/** Get value */
349
V getValue();
350
}
351
352
/**
353
* KeyValue encoding types
354
*/
355
enum KeyValueEncodingType {
356
/** Separate encoding for key and value */
357
SEPARATED,
358
/** Inline encoding */
359
INLINE
360
}
361
```
362
363
**KeyValue Schema Examples:**
364
365
```java
366
// KeyValue with separate encoding (default)
367
Schema<KeyValue<String, Integer>> kvSchema = Schema.KeyValue(
368
Schema.STRING,
369
Schema.INT32
370
);
371
372
Producer<KeyValue<String, Integer>> producer = client.newProducer(kvSchema)
373
.topic("kv-topic")
374
.create();
375
376
KeyValue<String, Integer> kv = KeyValue.of("counter", 42);
377
producer.send(kv);
378
379
// KeyValue with inline encoding
380
Schema<KeyValue<String, User>> inlineKvSchema = Schema.KeyValue(
381
Schema.STRING,
382
Schema.JSON(User.class),
383
KeyValueEncodingType.INLINE
384
);
385
386
// Complex nested KeyValue
387
Schema<KeyValue<User, List<String>>> nestedSchema = Schema.KeyValue(
388
Schema.JSON(User.class),
389
Schema.STRING // Will be used for JSON serialization of List<String>
390
);
391
```
392
393
### Generic Record Support
394
395
Support for schema-less and generic record handling.
396
397
```java { .api }
398
/**
399
* Generic record interface for schema-less data
400
*/
401
interface GenericRecord {
402
/** Get schema version */
403
byte[] getSchemaVersion();
404
405
/** Get all fields */
406
List<Field> getFields();
407
408
/** Get field by index */
409
Object getField(int index);
410
411
/** Get field by name */
412
Object getField(String fieldName);
413
414
/** Get native object */
415
Object getNativeObject();
416
}
417
418
/**
419
* Field definition
420
*/
421
interface Field {
422
/** Get field name */
423
String getName();
424
425
/** Get field index */
426
int getIndex();
427
}
428
```
429
430
**Generic Record Examples:**
431
432
```java
433
// Auto-consuming schema
434
Schema<GenericRecord> autoSchema = Schema.AUTO_CONSUME();
435
Consumer<GenericRecord> consumer = client.newConsumer(autoSchema)
436
.topic("mixed-schema-topic")
437
.subscriptionName("auto-consumer")
438
.subscribe();
439
440
// Handle messages with different schemas
441
Message<GenericRecord> message = consumer.receive();
442
GenericRecord record = message.getValue();
443
444
// Access fields generically
445
Object nameField = record.getField("name");
446
Object ageField = record.getField("age");
447
448
// Auto-producing schema
449
Schema<byte[]> autoProduceSchema = Schema.AUTO_PRODUCE_BYTES();
450
Producer<byte[]> producer = client.newProducer(autoProduceSchema)
451
.topic("auto-produce-topic")
452
.create();
453
```
454
455
### Schema Validation
456
457
Schema validation and compatibility checking.
458
459
```java { .api }
460
/**
461
* Schema validator interface
462
*/
463
interface SchemaValidator<T> {
464
/** Validate message against schema */
465
void validate(T message);
466
}
467
468
/**
469
* Schema compatibility strategy
470
*/
471
enum SchemaCompatibilityStrategy {
472
/** Full compatibility (read/write with all versions) */
473
FULL,
474
/** Backward compatibility (read old, write new) */
475
BACKWARD,
476
/** Forward compatibility (read new, write old) */
477
FORWARD,
478
/** Full transitive compatibility */
479
FULL_TRANSITIVE,
480
/** Backward transitive compatibility */
481
BACKWARD_TRANSITIVE,
482
/** Forward transitive compatibility */
483
FORWARD_TRANSITIVE,
484
/** No compatibility checks */
485
NONE
486
}
487
```
488
489
## Supporting Types and Enums
490
491
```java { .api }
492
enum SchemaType {
493
NONE,
494
STRING,
495
JSON,
496
PROTOBUF,
497
AVRO,
498
BOOLEAN,
499
INT8,
500
INT16,
501
INT32,
502
INT64,
503
FLOAT,
504
DOUBLE,
505
DATE,
506
TIME,
507
TIMESTAMP,
508
INSTANT,
509
LOCAL_DATE,
510
LOCAL_TIME,
511
LOCAL_DATE_TIME,
512
PROTOBUF_NATIVE,
513
KEY_VALUE,
514
BYTES,
515
AUTO,
516
AUTO_CONSUME,
517
AUTO_PUBLISH
518
}
519
520
class SchemaSerializationException extends RuntimeException {
521
SchemaSerializationException(String message);
522
SchemaSerializationException(String message, Throwable cause);
523
}
524
525
interface SchemaInfoProvider {
526
/** Get schema information by version */
527
CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);
528
529
/** Get latest schema information */
530
CompletableFuture<SchemaInfo> getLatestSchema();
531
532
/** Get topic name */
533
String getTopicName();
534
}
535
536
interface SchemaReader<T> {
537
/** Read object from input stream */
538
T read(InputStream inputStream);
539
540
/** Read object from byte array */
541
T read(byte[] bytes);
542
}
543
544
interface SchemaWriter<T> {
545
/** Write object to output stream */
546
void write(T obj, OutputStream outputStream);
547
548
/** Write object to byte array */
549
byte[] write(T obj);
550
}
551
```