CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

schema-serialization.mddocs/

Schema and Serialization

Type-safe message serialization with built-in schemas, custom serialization formats, and comprehensive type system integration for robust message handling.

Capabilities

Schema Interface

Core interface for message serialization and deserialization with type safety.

/**
 * Schema interface for message serialization/deserialization
 * Provides type-safe conversion between domain objects and byte arrays
 */
interface Schema<T> {
    /** Encode object to byte array */
    byte[] encode(T message);
    
    /** Decode byte array to object */
    T decode(byte[] bytes);
    
    /** Decode with schema version */
    T decode(byte[] bytes, byte[] schemaVersion);
    
    /** Decode ByteBuffer to object */
    T decode(ByteBuffer data);
    
    /** Decode ByteBuffer with schema version */
    T decode(ByteBuffer data, byte[] schemaVersion);
    
    /** Get schema information */
    SchemaInfo getSchemaInfo();
    
    /** Check if schema fetching is required */
    boolean requireFetchingSchemaInfo();
    
    /** Configure schema information */
    void configureSchemaInfo(String topicName, String componentName, SchemaInfo schemaInfo);
    
    /** Clone schema instance */
    Schema<T> clone();
    
    /** Validate message against schema */
    void validate(byte[] message);
    
    /** Check if schema supports schema versioning */
    boolean supportSchemaVersioning();
    
    /** Configure schema information */
    void configureSchemaInfo(String topic, String componentName, SchemaInfo schemaInfo);
    
    /** Set schema info provider */
    void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider);
    
    /** Get native schema object */
    Optional<Object> getNativeSchema();
}

Built-in Schemas

Pre-defined schemas for common data types.

interface Schema<T> {
    /** Byte array schema (no serialization) */
    static final Schema<byte[]> BYTES = BytesSchema.of();
    
    /** ByteBuffer schema */
    static final Schema<ByteBuffer> BYTEBUFFER = ByteBufferSchema.of();
    
    /** String schema (UTF-8 encoding) */
    static final Schema<String> STRING = StringSchema.utf8();
    
    /** 8-bit integer schema */
    static final Schema<Byte> INT8 = ByteSchema.of();
    
    /** 16-bit integer schema */
    static final Schema<Short> INT16 = ShortSchema.of();
    
    /** 32-bit integer schema */
    static final Schema<Integer> INT32 = IntSchema.of();
    
    /** 64-bit integer schema */
    static final Schema<Long> INT64 = LongSchema.of();
    
    /** Single precision float schema */
    static final Schema<Float> FLOAT = FloatSchema.of();
    
    /** Double precision float schema */
    static final Schema<Double> DOUBLE = DoubleSchema.of();
    
    /** Boolean schema */
    static final Schema<Boolean> BOOL = BooleanSchema.of();
    
    /** Date schema */
    static final Schema<Date> DATE = DateSchema.of();
    
    /** Time schema */
    static final Schema<Time> TIME = TimeSchema.of();
    
    /** Timestamp schema */
    static final Schema<Timestamp> TIMESTAMP = TimestampSchema.of();
    
    /** Instant schema */
    static final Schema<Instant> INSTANT = InstantSchema.of();
    
    /** LocalDate schema */
    static final Schema<LocalDate> LOCAL_DATE = LocalDateSchema.of();
    
    /** LocalTime schema */
    static final Schema<LocalTime> LOCAL_TIME = LocalTimeSchema.of();
    
    /** LocalDateTime schema */
    static final Schema<LocalDateTime> LOCAL_DATE_TIME = LocalDateTimeSchema.of();
}

Basic Schema Usage:

import org.apache.pulsar.client.api.*;

// String producer/consumer
Producer<String> stringProducer = client.newProducer(Schema.STRING)
    .topic("string-topic")
    .create();

Consumer<String> stringConsumer = client.newConsumer(Schema.STRING)
    .topic("string-topic")
    .subscriptionName("string-sub")
    .subscribe();

// Integer producer/consumer
Producer<Integer> intProducer = client.newProducer(Schema.INT32)
    .topic("int-topic")
    .create();

Consumer<Integer> intConsumer = client.newConsumer(Schema.INT32)
    .topic("int-topic")
    .subscriptionName("int-sub")
    .subscribe();

// Send and receive typed messages
stringProducer.send("Hello World");
intProducer.send(42);

String stringMsg = stringConsumer.receive().getValue();
Integer intMsg = intConsumer.receive().getValue();

Complex Schema Factory Methods

Factory methods for creating schemas for complex data types.

interface Schema<T> {
    /** Create Avro schema for POJO */
    static <T> Schema<T> AVRO(Class<T> pojo);
    
    /** Create Avro schema with custom AvroSchema */
    static <T> Schema<T> AVRO(org.apache.avro.Schema avroSchema);
    
    /** Create JSON schema for POJO */
    static <T> Schema<T> JSON(Class<T> pojo);
    
    /** Create JSON schema with custom configuration */
    static <T> Schema<T> JSON(SchemaDefinition<T> schemaDefinition);
    
    /** Create Protobuf schema */
    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> protobufClass);
    
    /** Create native Protobuf schema */
    static <T> Schema<T> PROTOBUF_NATIVE(Class<T> protobufNativeClass);
    
    /** Create native Protobuf schema with descriptor */
    static <T> Schema<T> PROTOBUF_NATIVE(Class<T> clazz, com.google.protobuf.Descriptors.Descriptor descriptor);
    
    /** Create KeyValue schema with separate encoding */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema);
    
    /** Create KeyValue schema with specified encoding type */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> keySchema, Schema<V> valueSchema, KeyValueEncodingType keyValueEncodingType);
    
    /** Create KeyValue schema with custom configuration */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> keyClass, Class<V> valueClass, SchemaType type);
    
    /** Create generic Avro schema */
    static Schema<GenericRecord> GENERIC_AVRO(org.apache.avro.Schema avroSchema);
    
    /** Create auto-consuming schema */
    static Schema<GenericRecord> AUTO_CONSUME();
    
    /** Create auto-producing bytes schema */
    static Schema<byte[]> AUTO_PRODUCE_BYTES();
    
    /** Create auto-producing bytes schema with validator */
    static Schema<byte[]> AUTO_PRODUCE_BYTES(SchemaValidator<byte[]> validator);
}

Complex Schema Examples:

// AVRO schema for POJO
public class User {
    public String name;
    public int age;
    public String email;
}

Schema<User> userSchema = Schema.AVRO(User.class);
Producer<User> userProducer = client.newProducer(userSchema)
    .topic("user-topic")
    .create();

User user = new User();
user.name = "Alice";
user.age = 30;
user.email = "alice@example.com";
userProducer.send(user);

// JSON schema
Schema<User> jsonUserSchema = Schema.JSON(User.class);
Producer<User> jsonProducer = client.newProducer(jsonUserSchema)
    .topic("json-user-topic")
    .create();

// KeyValue schema
Schema<KeyValue<String, User>> kvSchema = Schema.KeyValue(
    Schema.STRING, 
    Schema.JSON(User.class)
);

Producer<KeyValue<String, User>> kvProducer = client.newProducer(kvSchema)
    .topic("kv-topic")
    .create();

KeyValue<String, User> kv = new KeyValue<>("user-123", user);
kvProducer.send(kv);

// Protobuf schema (assuming UserProto is generated protobuf class)
Schema<UserProto> protoSchema = Schema.PROTOBUF(UserProto.class);
Producer<UserProto> protoProducer = client.newProducer(protoSchema)
    .topic("proto-topic")
    .create();

SchemaInfo and Configuration

Schema metadata and configuration classes.

/**
 * Schema information metadata
 */
interface SchemaInfo {
    /** Get schema name */
    String getName();
    
    /** Get schema data */
    byte[] getSchema();
    
    /** Get schema type */
    SchemaType getType();
    
    /** Get schema properties */
    Map<String, String> getProperties();
    
    /** Get timestamp */
    long getTimestamp();
}

/**
 * Schema definition builder for custom configuration
 */
class SchemaDefinition<T> {
    /** Create builder */
    static <T> SchemaDefinitionBuilder<T> builder();
    
    /** Get POJO class */
    Class<T> getPojo();
    
    /** Get properties */
    Map<String, String> getProperties();
    
    /** Get JSON properties */
    String getJsonDef();
    
    /** Check if JSR310 conversion is enabled */
    boolean getJsr310ConversionEnabled();
    
    /** Get always allow null setting */
    boolean getAlwaysAllowNull();
    
    /** Get schema reader */
    SchemaReader<T> getSchemaReader();
    
    /** Get schema writer */
    SchemaWriter<T> getSchemaWriter();
    
    interface SchemaDefinitionBuilder<T> {
        /** Set POJO class */
        SchemaDefinitionBuilder<T> withPojo(Class<T> pojo);
        
        /** Add property */
        SchemaDefinitionBuilder<T> withProperty(String key, String value);
        
        /** Set properties */
        SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);
        
        /** Set JSON definition */
        SchemaDefinitionBuilder<T> withJsonDef(String jsonDef);
        
        /** Enable JSR310 conversion */
        SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);
        
        /** Set always allow null */
        SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
        
        /** Set schema reader */
        SchemaDefinitionBuilder<T> withSchemaReader(SchemaReader<T> schemaReader);
        
        /** Set schema writer */
        SchemaDefinitionBuilder<T> withSchemaWriter(SchemaWriter<T> schemaWriter);
        
        /** Build schema definition */
        SchemaDefinition<T> build();
    }
}

KeyValue Schema Support

Support for key-value pair messages with separate schemas for keys and values.

/**
 * KeyValue pair container
 */
class KeyValue<K, V> {
    /** Create KeyValue pair */
    static <K, V> KeyValue<K, V> of(K key, V value);
    
    /** Get key */
    K getKey();
    
    /** Get value */
    V getValue();
}

/**
 * KeyValue encoding types
 */
enum KeyValueEncodingType {
    /** Separate encoding for key and value */
    SEPARATED,
    /** Inline encoding */
    INLINE
}

KeyValue Schema Examples:

// KeyValue with separate encoding (default)
Schema<KeyValue<String, Integer>> kvSchema = Schema.KeyValue(
    Schema.STRING, 
    Schema.INT32
);

Producer<KeyValue<String, Integer>> producer = client.newProducer(kvSchema)
    .topic("kv-topic")
    .create();

KeyValue<String, Integer> kv = KeyValue.of("counter", 42);
producer.send(kv);

// KeyValue with inline encoding
Schema<KeyValue<String, User>> inlineKvSchema = Schema.KeyValue(
    Schema.STRING,
    Schema.JSON(User.class),
    KeyValueEncodingType.INLINE
);

// Complex nested KeyValue
Schema<KeyValue<User, List<String>>> nestedSchema = Schema.KeyValue(
    Schema.JSON(User.class),
    Schema.STRING // Will be used for JSON serialization of List<String>
);

Generic Record Support

Support for schema-less and generic record handling.

/**
 * Generic record interface for schema-less data
 */
interface GenericRecord {
    /** Get schema version */
    byte[] getSchemaVersion();
    
    /** Get all fields */
    List<Field> getFields();
    
    /** Get field by index */
    Object getField(int index);
    
    /** Get field by name */
    Object getField(String fieldName);
    
    /** Get native object */
    Object getNativeObject();
}

/**
 * Field definition
 */
interface Field {
    /** Get field name */
    String getName();
    
    /** Get field index */
    int getIndex();
}

Generic Record Examples:

// Auto-consuming schema
Schema<GenericRecord> autoSchema = Schema.AUTO_CONSUME();
Consumer<GenericRecord> consumer = client.newConsumer(autoSchema)
    .topic("mixed-schema-topic")
    .subscriptionName("auto-consumer")
    .subscribe();

// Handle messages with different schemas
Message<GenericRecord> message = consumer.receive();
GenericRecord record = message.getValue();

// Access fields generically
Object nameField = record.getField("name");
Object ageField = record.getField("age");

// Auto-producing schema
Schema<byte[]> autoProduceSchema = Schema.AUTO_PRODUCE_BYTES();
Producer<byte[]> producer = client.newProducer(autoProduceSchema)
    .topic("auto-produce-topic")
    .create();

Schema Validation

Schema validation and compatibility checking.

/**
 * Schema validator interface
 */
interface SchemaValidator<T> {
    /** Validate message against schema */
    void validate(T message);
}

/**
 * Schema compatibility strategy
 */
enum SchemaCompatibilityStrategy {
    /** Full compatibility (read/write with all versions) */
    FULL,
    /** Backward compatibility (read old, write new) */
    BACKWARD,
    /** Forward compatibility (read new, write old) */
    FORWARD,
    /** Full transitive compatibility */
    FULL_TRANSITIVE,
    /** Backward transitive compatibility */
    BACKWARD_TRANSITIVE,
    /** Forward transitive compatibility */
    FORWARD_TRANSITIVE,
    /** No compatibility checks */
    NONE
}

Supporting Types and Enums

enum SchemaType {
    NONE,
    STRING,
    JSON,
    PROTOBUF,
    AVRO,
    BOOLEAN,
    INT8,
    INT16,
    INT32,
    INT64,
    FLOAT,
    DOUBLE,
    DATE,
    TIME,
    TIMESTAMP,
    INSTANT,
    LOCAL_DATE,
    LOCAL_TIME,
    LOCAL_DATE_TIME,
    PROTOBUF_NATIVE,
    KEY_VALUE,
    BYTES,
    AUTO,
    AUTO_CONSUME,
    AUTO_PUBLISH
}

class SchemaSerializationException extends RuntimeException {
    SchemaSerializationException(String message);
    SchemaSerializationException(String message, Throwable cause);
}

interface SchemaInfoProvider {
    /** Get schema information by version */
    CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);
    
    /** Get latest schema information */
    CompletableFuture<SchemaInfo> getLatestSchema();
    
    /** Get topic name */
    String getTopicName();
}

interface SchemaReader<T> {
    /** Read object from input stream */
    T read(InputStream inputStream);
    
    /** Read object from byte array */
    T read(byte[] bytes);
}

interface SchemaWriter<T> {
    /** Write object to output stream */
    void write(T obj, OutputStream outputStream);
    
    /** Write object to byte array */
    byte[] write(T obj);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json