CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

message-production.mddocs/

Message Production

Publishing messages to topics with support for batching, compression, encryption, custom routing strategies, and advanced delivery options.

Capabilities

Producer Interface

Core interface for publishing messages to Pulsar topics.

/**
 * Interface for producing messages to topics
 * Thread-safe and can be used concurrently from multiple threads
 */
interface Producer<T> extends Closeable {
    /** Get producer name */
    String getProducerName();
    
    /** Get topic name */
    String getTopic();
    
    /** Send message synchronously */
    MessageId send(T message) throws PulsarClientException;
    
    /** Send message asynchronously */
    CompletableFuture<MessageId> sendAsync(T message);
    
    /** Create a typed message builder for advanced message options */
    TypedMessageBuilder<T> newMessage();
    
    /** Create a typed message builder with different schema */
    <V> TypedMessageBuilder<V> newMessage(Schema<V> schema);
    
    /** Create a typed message builder for transactional messages */
    TypedMessageBuilder<T> newMessage(Transaction txn);
    
    /** Get last sequence ID sent by this producer */
    long getLastSequenceId();
    
    /** Get number of partitions for the topic */
    int getNumOfPartitions();
    
    /** Get producer statistics */
    ProducerStats getStats();
    
    /** Check if producer is connected to broker */
    boolean isConnected();
    
    /** Get timestamp of last disconnection */
    long getLastDisconnectedTimestamp();
    
    /** Flush all pending messages synchronously */
    void flush() throws PulsarClientException;
    
    /** Flush all pending messages asynchronously */
    CompletableFuture<Void> flushAsync();
    
    /** Close producer */
    void close() throws PulsarClientException;
    
    /** Close producer asynchronously */
    CompletableFuture<Void> closeAsync();
}

Usage Examples:

import org.apache.pulsar.client.api.*;

// Simple message sending
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();

MessageId msgId = producer.send("Hello World");

// Asynchronous sending
CompletableFuture<MessageId> future = producer.sendAsync("Async message");
future.thenAccept(messageId -> {
    System.out.println("Message sent: " + messageId);
}).exceptionally(throwable -> {
    System.err.println("Failed to send: " + throwable.getMessage());
    return null;
});

// Flush pending messages
producer.flush();

// Get statistics
ProducerStats stats = producer.getStats();
System.out.println("Messages sent: " + stats.getNumMsgsSent());

ProducerBuilder Configuration

Builder interface for configuring and creating Producer instances.

/**
 * Builder for configuring and creating Producer instances
 */
interface ProducerBuilder<T> extends Serializable, Cloneable {
    /** Create the producer synchronously */
    Producer<T> create() throws PulsarClientException;
    
    /** Create the producer asynchronously */
    CompletableFuture<Producer<T>> createAsync();
    
    /** Clone the builder */
    ProducerBuilder<T> clone();
    
    /** Set topic name (required) */
    ProducerBuilder<T> topic(String topicName);
    
    /** Set producer name (optional, auto-generated if not set) */
    ProducerBuilder<T> producerName(String producerName);
    
    /** Set send timeout (default: 30 seconds) */
    ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit);
    
    /** Set max pending messages (default: 1000) */
    ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);
    
    /** Set max pending messages across partitions */
    ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
    
    /** Block if queue is full (default: false) */
    ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull);
    
    /** Set message routing mode */
    ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode);
    
    /** Set hashing scheme for message routing */
    ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme);
    
    /** Set custom message router */
    ProducerBuilder<T> messageRouter(MessageRouter messageRouter);
    
    /** Set compression type */
    ProducerBuilder<T> compressionType(CompressionType compressionType);
    
    /** Enable message batching (default: true) */
    ProducerBuilder<T> enableBatching(boolean enableBatching);
    
    /** Set batching max messages (default: 1000) */
    ProducerBuilder<T> batchingMaxMessages(int batchingMaxMessages);
    
    /** Set batching max publish delay */
    ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);
    
    /** Set batching max bytes */
    ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes);
    
    /** Set batching partition switch frequency */
    ProducerBuilder<T> batchingPartitionSwitchFrequencyByPublishDelay(int frequency);
    
    /** Set batch builder */
    ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);
    
    /** Set initial sequence ID */
    ProducerBuilder<T> initialSequenceId(long initialSequenceId);
    
    /** Add property */
    ProducerBuilder<T> property(String key, String value);
    
    /** Set properties */
    ProducerBuilder<T> properties(Map<String, String> properties);
    
    /** Add producer interceptor */
    ProducerBuilder<T> intercept(ProducerInterceptor<T> interceptor);
    
    /** Enable chunking for large messages */
    ProducerBuilder<T> enableChunking(boolean enableChunking);
    
    /** Set chunk max message size */
    ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize);
    
    /** Set producer access mode */
    ProducerBuilder<T> accessMode(ProducerAccessMode accessMode);
    
    /** Enable lazy start of producers */
    ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean enableLazyStartPartitionedProducers);
    
    /** Enable multi-schema support */
    ProducerBuilder<T> enableMultiSchema(boolean enableMultiSchema);
}

Encryption Configuration

Configure message encryption for producers.

interface ProducerBuilder<T> {
    /** Set crypto key reader */
    ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
    
    /** Add encryption key */
    ProducerBuilder<T> addEncryptionKey(String key);
    
    /** Set default crypto key reader using public key path */
    ProducerBuilder<T> defaultCryptoKeyReader(String publicKeyPath);
    
    /** Set default crypto key reader using key store */
    ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys);
    
    /** Set crypto failure action */
    ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action);
}

Producer Configuration Examples:

// Basic producer configuration
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .producerName("my-producer")
    .sendTimeout(60, TimeUnit.SECONDS)
    .compressionType(CompressionType.LZ4)
    .create();

// Batching configuration
Producer<byte[]> producer = client.newProducer()
    .topic("batch-topic")
    .enableBatching(true)
    .batchingMaxMessages(100)
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    .batchingMaxBytes(1024 * 1024)
    .create();

// Custom routing
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("partitioned-topic")
    .messageRoutingMode(MessageRoutingMode.CustomPartition)
    .messageRouter(new CustomMessageRouter())
    .create();

// Encryption configuration
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("encrypted-topic")
    .addEncryptionKey("my-key")
    .cryptoKeyReader(new MyCryptoKeyReader())
    .cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
    .create();

TypedMessageBuilder

Builder interface for creating messages with advanced options.

/**
 * Builder for creating typed messages with advanced properties
 */
interface TypedMessageBuilder<T> {
    /** Send message synchronously and return MessageId */
    MessageId send() throws PulsarClientException;
    
    /** Send message asynchronously */
    CompletableFuture<MessageId> sendAsync();
    
    /** Set message key for routing and compaction */  
    TypedMessageBuilder<T> key(String key);
    
    /** Set message key as bytes */
    TypedMessageBuilder<T> keyBytes(byte[] key);
    
    /** Set ordering key for ordered delivery */
    TypedMessageBuilder<T> orderingKey(byte[] orderingKey);
    
    /** Set message value */
    TypedMessageBuilder<T> value(T value);
    
    /** Add message property */
    TypedMessageBuilder<T> property(String name, String value);
    
    /** Set message properties */
    TypedMessageBuilder<T> properties(Map<String, String> properties);
    
    /** Set event time timestamp */
    TypedMessageBuilder<T> eventTime(long timestamp);
    
    /** Set sequence ID */
    TypedMessageBuilder<T> sequenceId(long sequenceId);
    
    /** Set replication clusters */
    TypedMessageBuilder<T> replicationClusters(List<String> clusters);
    
    /** Disable replication */
    TypedMessageBuilder<T> disableReplication();
    
    /** Set delivery time (delayed message) */
    TypedMessageBuilder<T> deliverAt(long timestamp);
    
    /** Set delivery delay */
    TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit);
    
    /** Load configuration from map */
    TypedMessageBuilder<T> loadConf(Map<String, Object> config);
}

Message Builder Examples:

// Simple message with key
MessageId msgId = producer.newMessage()
    .key("user-123")
    .value("User data")
    .send();

// Message with properties and event time
MessageId msgId = producer.newMessage()
    .value("Event data")
    .property("source", "mobile-app")
    .property("version", "1.0")
    .eventTime(System.currentTimeMillis())
    .send();

// Delayed message delivery
MessageId msgId = producer.newMessage()
    .value("Delayed message")
    .deliverAfter(5, TimeUnit.MINUTES)
    .send();

// Async message with callback
producer.newMessage()
    .value("Async message")
    .sendAsync()
    .thenAccept(messageId -> System.out.println("Sent: " + messageId))
    .exceptionally(ex -> {
        System.err.println("Failed: " + ex.getMessage());
        return null;
    });

Producer Statistics

Interface for accessing producer statistics and metrics.

/**
 * Producer statistics interface
 */
interface ProducerStats {
    /** Number of messages sent */
    long getNumMsgsSent();
    
    /** Number of bytes sent */
    long getNumBytesSent();
    
    /** Number of send failures */
    long getNumSendFailed();
    
    /** Number of acknowledgments received */
    long getNumAcksReceived();
    
    /** Send rate in messages per second */
    double getSendMsgsRate();
    
    /** Send rate in bytes per second */
    double getSendBytesRate();
    
    /** 50th percentile send latency in milliseconds */
    double getSendLatencyMillis50pct();
    
    /** 75th percentile send latency in milliseconds */
    double getSendLatencyMillis75pct();
    
    /** 95th percentile send latency in milliseconds */
    double getSendLatencyMillis95pct();
    
    /** 99th percentile send latency in milliseconds */
    double getSendLatencyMillis99pct();
    
    /** 99.9th percentile send latency in milliseconds */
    double getSendLatencyMillis999pct();
    
    /** Maximum send latency in milliseconds */
    double getSendLatencyMillisMax();
    
    /** Total messages sent since creation */
    long getTotalMsgsSent();
    
    /** Total bytes sent since creation */
    long getTotalBytesSent();
    
    /** Total send failures since creation */
    long getTotalSendFailed();
    
    /** Total acknowledgments received since creation */
    long getTotalAcksReceived();
    
    /** Current pending queue size */
    int getPendingQueueSize();
}

Supporting Types and Enums

enum MessageRoutingMode {
    /** Route to single partition */
    SinglePartition,
    /** Round-robin across partitions */
    RoundRobinPartition,
    /** Use custom partitioning logic */
    CustomPartition
}

enum HashingScheme {
    /** Java String hash */
    JavaStringHash,
    /** Murmur3 32-bit hash */
    Murmur3_32Hash
}

enum CompressionType {
    NONE,
    LZ4,
    ZLIB,
    ZSTD,
    SNAPPY
}

enum ProducerAccessMode {
    /** Multiple producers allowed */
    Shared,
    /** Single producer only */
    Exclusive,
    /** Wait for exclusive access */
    WaitForExclusive
}

enum ProducerCryptoFailureAction {
    /** Fail the send operation */
    FAIL,
    /** Send message unencrypted */
    SEND
}

interface MessageRouter {
    /** Choose partition for message */
    int choosePartition(Message<?> msg, TopicMetadata metadata);
}

interface BatcherBuilder {
    /** Build batch message container */
    BatchMessageContainer build();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json