Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Type-safe message serialization with built-in schemas, custom serialization formats, and comprehensive type system integration for robust message handling.
Core interface for message serialization and deserialization with type safety.
/**
* Schema interface for message serialization/deserialization
* Provides type-safe conversion between domain objects and byte arrays
*/
interface Schema<T> {
/** Encode object to byte array */
byte[] encode(T message);
/** Decode byte array to object */
T decode(byte[] bytes);
/** Decode with schema version */
T decode(byte[] bytes, byte[] schemaVersion);
/** Decode ByteBuffer to object */
T decode(ByteBuffer data);
/** Decode ByteBuffer with schema version */
T decode(ByteBuffer data, byte[] schemaVersion);
/** Get schema information */
SchemaInfo getSchemaInfo();
/** Check if schema fetching is required */
boolean requireFetchingSchemaInfo();
/** Configure schema information */
void configureSchemaInfo(String topicName, String componentName, SchemaInfo schemaInfo);
/** Clone schema instance */
Schema<T> clone();
/** Validate message against schema */
void validate(byte[] message);
/** Check if schema supports schema versioning */
boolean supportSchemaVersioning();
/** Configure schema information */
void configureSchemaInfo(String topic, String componentName, SchemaInfo schemaInfo);
/** Set schema info provider */
void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider);
/** Get native schema object */
Optional<Object> getNativeSchema();
}Pre-defined schemas for common data types.
interface Schema<T> {
/** Byte array schema (no serialization) */
static final Schema<byte[]> BYTES = BytesSchema.of();
/** ByteBuffer schema */
static final Schema<ByteBuffer> BYTEBUFFER = ByteBufferSchema.of();
/** String schema (UTF-8 encoding) */
static final Schema<String> STRING = StringSchema.utf8();
/** 8-bit integer schema */
static final Schema<Byte> INT8 = ByteSchema.of();
/** 16-bit integer schema */
static final Schema<Short> INT16 = ShortSchema.of();
/** 32-bit integer schema */
static final Schema<Integer> INT32 = IntSchema.of();
/** 64-bit integer schema */
static final Schema<Long> INT64 = LongSchema.of();
/** Single precision float schema */
static final Schema<Float> FLOAT = FloatSchema.of();
/** Double precision float schema */
static final Schema<Double> DOUBLE = DoubleSchema.of();
/** Boolean schema */
static final Schema<Boolean> BOOL = BooleanSchema.of();
/** Date schema */
static final Schema<Date> DATE = DateSchema.of();
/** Time schema */
static final Schema<Time> TIME = TimeSchema.of();
/** Timestamp schema */
static final Schema<Timestamp> TIMESTAMP = TimestampSchema.of();
/** Instant schema */
static final Schema<Instant> INSTANT = InstantSchema.of();
/** LocalDate schema */
static final Schema<LocalDate> LOCAL_DATE = LocalDateSchema.of();
/** LocalTime schema */
static final Schema<LocalTime> LOCAL_TIME = LocalTimeSchema.of();
/** LocalDateTime schema */
static final Schema<LocalDateTime> LOCAL_DATE_TIME = LocalDateTimeSchema.of();
}Basic Schema Usage:
import org.apache.pulsar.client.api.*;
// String producer/consumer
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("string-topic")
.create();
Consumer<String> stringConsumer = client.newConsumer(Schema.STRING)
.topic("string-topic")
.subscriptionName("string-sub")
.subscribe();
// Integer producer/consumer
Producer<Integer> intProducer = client.newProducer(Schema.INT32)
.topic("int-topic")
.create();
Consumer<Integer> intConsumer = client.newConsumer(Schema.INT32)
.topic("int-topic")
.subscriptionName("int-sub")
.subscribe();
// Send and receive typed messages
stringProducer.send("Hello World");
intProducer.send(42);
String stringMsg = stringConsumer.receive().getValue();
Integer intMsg = intConsumer.receive().getValue();Factory methods for creating schemas for complex data types.
interface Schema<T> {
/** Create Avro schema for POJO */
static <T> Schema<T> AVRO(Class<T> pojo);
/** Create Avro schema with custom AvroSchema */
static <T> Schema<T> AVRO(org.apache.avro.Schema avroSchema);
/** Create JSON schema for POJO */
static <T> Schema<T> JSON(Class<T> pojo);
/** Create JSON schema with custom configuration */
static <T> Schema<T> JSON(SchemaDefinition<T> schemaDefinition);
/** Create Protobuf schema */
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> protobufClass);
/** Create native Protobuf schema */
static <T> Schema<T> PROTOBUF_NATIVE(Class<T> protobufNativeClass);
/** Create native Protobuf schema with descriptor */
static <T> Schema<T> PROTOBUF_NATIVE(Class<T> clazz, com.google.protobuf.Descriptors.Descriptor descriptor);
/** Create KeyValue schema with separate encoding */
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema);
/** Create KeyValue schema with specified encoding type */
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema, KeyValueEncodingType keyValueEncodingType);
/** Create KeyValue schema with custom configuration */
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> keyClass, Class<V> valueClass, SchemaType type);
/** Create generic Avro schema */
static Schema<GenericRecord> GENERIC_AVRO(org.apache.avro.Schema avroSchema);
/** Create auto-consuming schema */
static Schema<GenericRecord> AUTO_CONSUME();
/** Create auto-producing bytes schema */
static Schema<byte[]> AUTO_PRODUCE_BYTES();
/** Create auto-producing bytes schema with validator */
static Schema<byte[]> AUTO_PRODUCE_BYTES(SchemaValidator<byte[]> validator);
}Complex Schema Examples:
// AVRO schema for POJO
public class User {
public String name;
public int age;
public String email;
}
Schema<User> userSchema = Schema.AVRO(User.class);
Producer<User> userProducer = client.newProducer(userSchema)
.topic("user-topic")
.create();
User user = new User();
user.name = "Alice";
user.age = 30;
user.email = "alice@example.com";
userProducer.send(user);
// JSON schema
Schema<User> jsonUserSchema = Schema.JSON(User.class);
Producer<User> jsonProducer = client.newProducer(jsonUserSchema)
.topic("json-user-topic")
.create();
// KeyValue schema
Schema<KeyValue<String, User>> kvSchema = Schema.KeyValue(
Schema.STRING,
Schema.JSON(User.class)
);
Producer<KeyValue<String, User>> kvProducer = client.newProducer(kvSchema)
.topic("kv-topic")
.create();
KeyValue<String, User> kv = new KeyValue<>("user-123", user);
kvProducer.send(kv);
// Protobuf schema (assuming UserProto is generated protobuf class)
Schema<UserProto> protoSchema = Schema.PROTOBUF(UserProto.class);
Producer<UserProto> protoProducer = client.newProducer(protoSchema)
.topic("proto-topic")
.create();Schema metadata and configuration classes.
/**
* Schema information metadata
*/
interface SchemaInfo {
/** Get schema name */
String getName();
/** Get schema data */
byte[] getSchema();
/** Get schema type */
SchemaType getType();
/** Get schema properties */
Map<String, String> getProperties();
/** Get timestamp */
long getTimestamp();
}
/**
* Schema definition builder for custom configuration
*/
class SchemaDefinition<T> {
/** Create builder */
static <T> SchemaDefinitionBuilder<T> builder();
/** Get POJO class */
Class<T> getPojo();
/** Get properties */
Map<String, String> getProperties();
/** Get JSON properties */
String getJsonDef();
/** Check if JSR310 conversion is enabled */
boolean getJsr310ConversionEnabled();
/** Get always allow null setting */
boolean getAlwaysAllowNull();
/** Get schema reader */
SchemaReader<T> getSchemaReader();
/** Get schema writer */
SchemaWriter<T> getSchemaWriter();
interface SchemaDefinitionBuilder<T> {
/** Set POJO class */
SchemaDefinitionBuilder<T> withPojo(Class<T> pojo);
/** Add property */
SchemaDefinitionBuilder<T> withProperty(String key, String value);
/** Set properties */
SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);
/** Set JSON definition */
SchemaDefinitionBuilder<T> withJsonDef(String jsonDef);
/** Enable JSR310 conversion */
SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);
/** Set always allow null */
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
/** Set schema reader */
SchemaDefinitionBuilder<T> withSchemaReader(SchemaReader<T> schemaReader);
/** Set schema writer */
SchemaDefinitionBuilder<T> withSchemaWriter(SchemaWriter<T> schemaWriter);
/** Build schema definition */
SchemaDefinition<T> build();
}
}Support for key-value pair messages with separate schemas for keys and values.
/**
* KeyValue pair container
*/
class KeyValue<K, V> {
/** Create KeyValue pair */
static <K, V> KeyValue<K, V> of(K key, V value);
/** Get key */
K getKey();
/** Get value */
V getValue();
}
/**
* KeyValue encoding types
*/
enum KeyValueEncodingType {
/** Separate encoding for key and value */
SEPARATED,
/** Inline encoding */
INLINE
}KeyValue Schema Examples:
// KeyValue with separate encoding (default)
Schema<KeyValue<String, Integer>> kvSchema = Schema.KeyValue(
Schema.STRING,
Schema.INT32
);
Producer<KeyValue<String, Integer>> producer = client.newProducer(kvSchema)
.topic("kv-topic")
.create();
KeyValue<String, Integer> kv = KeyValue.of("counter", 42);
producer.send(kv);
// KeyValue with inline encoding
Schema<KeyValue<String, User>> inlineKvSchema = Schema.KeyValue(
Schema.STRING,
Schema.JSON(User.class),
KeyValueEncodingType.INLINE
);
// Complex nested KeyValue
Schema<KeyValue<User, List<String>>> nestedSchema = Schema.KeyValue(
Schema.JSON(User.class),
Schema.STRING // Will be used for JSON serialization of List<String>
);Support for schema-less and generic record handling.
/**
* Generic record interface for schema-less data
*/
interface GenericRecord {
/** Get schema version */
byte[] getSchemaVersion();
/** Get all fields */
List<Field> getFields();
/** Get field by index */
Object getField(int index);
/** Get field by name */
Object getField(String fieldName);
/** Get native object */
Object getNativeObject();
}
/**
* Field definition
*/
interface Field {
/** Get field name */
String getName();
/** Get field index */
int getIndex();
}Generic Record Examples:
// Auto-consuming schema
Schema<GenericRecord> autoSchema = Schema.AUTO_CONSUME();
Consumer<GenericRecord> consumer = client.newConsumer(autoSchema)
.topic("mixed-schema-topic")
.subscriptionName("auto-consumer")
.subscribe();
// Handle messages with different schemas
Message<GenericRecord> message = consumer.receive();
GenericRecord record = message.getValue();
// Access fields generically
Object nameField = record.getField("name");
Object ageField = record.getField("age");
// Auto-producing schema
Schema<byte[]> autoProduceSchema = Schema.AUTO_PRODUCE_BYTES();
Producer<byte[]> producer = client.newProducer(autoProduceSchema)
.topic("auto-produce-topic")
.create();Schema validation and compatibility checking.
/**
* Schema validator interface
*/
interface SchemaValidator<T> {
/** Validate message against schema */
void validate(T message);
}
/**
* Schema compatibility strategy
*/
enum SchemaCompatibilityStrategy {
/** Full compatibility (read/write with all versions) */
FULL,
/** Backward compatibility (read old, write new) */
BACKWARD,
/** Forward compatibility (read new, write old) */
FORWARD,
/** Full transitive compatibility */
FULL_TRANSITIVE,
/** Backward transitive compatibility */
BACKWARD_TRANSITIVE,
/** Forward transitive compatibility */
FORWARD_TRANSITIVE,
/** No compatibility checks */
NONE
}enum SchemaType {
NONE,
STRING,
JSON,
PROTOBUF,
AVRO,
BOOLEAN,
INT8,
INT16,
INT32,
INT64,
FLOAT,
DOUBLE,
DATE,
TIME,
TIMESTAMP,
INSTANT,
LOCAL_DATE,
LOCAL_TIME,
LOCAL_DATE_TIME,
PROTOBUF_NATIVE,
KEY_VALUE,
BYTES,
AUTO,
AUTO_CONSUME,
AUTO_PUBLISH
}
class SchemaSerializationException extends RuntimeException {
SchemaSerializationException(String message);
SchemaSerializationException(String message, Throwable cause);
}
interface SchemaInfoProvider {
/** Get schema information by version */
CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);
/** Get latest schema information */
CompletableFuture<SchemaInfo> getLatestSchema();
/** Get topic name */
String getTopicName();
}
interface SchemaReader<T> {
/** Read object from input stream */
T read(InputStream inputStream);
/** Read object from byte array */
T read(byte[] bytes);
}
interface SchemaWriter<T> {
/** Write object to output stream */
void write(T obj, OutputStream outputStream);
/** Write object to byte array */
byte[] write(T obj);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client