Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Publishing messages to topics with support for batching, compression, encryption, custom routing strategies, and advanced delivery options.
Core interface for publishing messages to Pulsar topics.
/**
* Interface for producing messages to topics
* Thread-safe and can be used concurrently from multiple threads
*/
interface Producer<T> extends Closeable {
/** Get producer name */
String getProducerName();
/** Get topic name */
String getTopic();
/** Send message synchronously */
MessageId send(T message) throws PulsarClientException;
/** Send message asynchronously */
CompletableFuture<MessageId> sendAsync(T message);
/** Create a typed message builder for advanced message options */
TypedMessageBuilder<T> newMessage();
/** Create a typed message builder with different schema */
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema);
/** Create a typed message builder for transactional messages */
TypedMessageBuilder<T> newMessage(Transaction txn);
/** Get last sequence ID sent by this producer */
long getLastSequenceId();
/** Get number of partitions for the topic */
int getNumOfPartitions();
/** Get producer statistics */
ProducerStats getStats();
/** Check if producer is connected to broker */
boolean isConnected();
/** Get timestamp of last disconnection */
long getLastDisconnectedTimestamp();
/** Flush all pending messages synchronously */
void flush() throws PulsarClientException;
/** Flush all pending messages asynchronously */
CompletableFuture<Void> flushAsync();
/** Close producer */
void close() throws PulsarClientException;
/** Close producer asynchronously */
CompletableFuture<Void> closeAsync();
}Usage Examples:
import org.apache.pulsar.client.api.*;
// Simple message sending
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
MessageId msgId = producer.send("Hello World");
// Asynchronous sending
CompletableFuture<MessageId> future = producer.sendAsync("Async message");
future.thenAccept(messageId -> {
System.out.println("Message sent: " + messageId);
}).exceptionally(throwable -> {
System.err.println("Failed to send: " + throwable.getMessage());
return null;
});
// Flush pending messages
producer.flush();
// Get statistics
ProducerStats stats = producer.getStats();
System.out.println("Messages sent: " + stats.getNumMsgsSent());Builder interface for configuring and creating Producer instances.
/**
* Builder for configuring and creating Producer instances
*/
interface ProducerBuilder<T> extends Serializable, Cloneable {
/** Create the producer synchronously */
Producer<T> create() throws PulsarClientException;
/** Create the producer asynchronously */
CompletableFuture<Producer<T>> createAsync();
/** Clone the builder */
ProducerBuilder<T> clone();
/** Set topic name (required) */
ProducerBuilder<T> topic(String topicName);
/** Set producer name (optional, auto-generated if not set) */
ProducerBuilder<T> producerName(String producerName);
/** Set send timeout (default: 30 seconds) */
ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit);
/** Set max pending messages (default: 1000) */
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);
/** Set max pending messages across partitions */
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
/** Block if queue is full (default: false) */
ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull);
/** Set message routing mode */
ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode);
/** Set hashing scheme for message routing */
ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme);
/** Set custom message router */
ProducerBuilder<T> messageRouter(MessageRouter messageRouter);
/** Set compression type */
ProducerBuilder<T> compressionType(CompressionType compressionType);
/** Enable message batching (default: true) */
ProducerBuilder<T> enableBatching(boolean enableBatching);
/** Set batching max messages (default: 1000) */
ProducerBuilder<T> batchingMaxMessages(int batchingMaxMessages);
/** Set batching max publish delay */
ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);
/** Set batching max bytes */
ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes);
/** Set batching partition switch frequency */
ProducerBuilder<T> batchingPartitionSwitchFrequencyByPublishDelay(int frequency);
/** Set batch builder */
ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);
/** Set initial sequence ID */
ProducerBuilder<T> initialSequenceId(long initialSequenceId);
/** Add property */
ProducerBuilder<T> property(String key, String value);
/** Set properties */
ProducerBuilder<T> properties(Map<String, String> properties);
/** Add producer interceptor */
ProducerBuilder<T> intercept(ProducerInterceptor<T> interceptor);
/** Enable chunking for large messages */
ProducerBuilder<T> enableChunking(boolean enableChunking);
/** Set chunk max message size */
ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize);
/** Set producer access mode */
ProducerBuilder<T> accessMode(ProducerAccessMode accessMode);
/** Enable lazy start of producers */
ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean enableLazyStartPartitionedProducers);
/** Enable multi-schema support */
ProducerBuilder<T> enableMultiSchema(boolean enableMultiSchema);
}Configure message encryption for producers.
interface ProducerBuilder<T> {
/** Set crypto key reader */
ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
/** Add encryption key */
ProducerBuilder<T> addEncryptionKey(String key);
/** Set default crypto key reader using public key path */
ProducerBuilder<T> defaultCryptoKeyReader(String publicKeyPath);
/** Set default crypto key reader using key store */
ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys);
/** Set crypto failure action */
ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action);
}Producer Configuration Examples:
// Basic producer configuration
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.producerName("my-producer")
.sendTimeout(60, TimeUnit.SECONDS)
.compressionType(CompressionType.LZ4)
.create();
// Batching configuration
Producer<byte[]> producer = client.newProducer()
.topic("batch-topic")
.enableBatching(true)
.batchingMaxMessages(100)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.batchingMaxBytes(1024 * 1024)
.create();
// Custom routing
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("partitioned-topic")
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new CustomMessageRouter())
.create();
// Encryption configuration
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("encrypted-topic")
.addEncryptionKey("my-key")
.cryptoKeyReader(new MyCryptoKeyReader())
.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
.create();Builder interface for creating messages with advanced options.
/**
* Builder for creating typed messages with advanced properties
*/
interface TypedMessageBuilder<T> {
/** Send message synchronously and return MessageId */
MessageId send() throws PulsarClientException;
/** Send message asynchronously */
CompletableFuture<MessageId> sendAsync();
/** Set message key for routing and compaction */
TypedMessageBuilder<T> key(String key);
/** Set message key as bytes */
TypedMessageBuilder<T> keyBytes(byte[] key);
/** Set ordering key for ordered delivery */
TypedMessageBuilder<T> orderingKey(byte[] orderingKey);
/** Set message value */
TypedMessageBuilder<T> value(T value);
/** Add message property */
TypedMessageBuilder<T> property(String name, String value);
/** Set message properties */
TypedMessageBuilder<T> properties(Map<String, String> properties);
/** Set event time timestamp */
TypedMessageBuilder<T> eventTime(long timestamp);
/** Set sequence ID */
TypedMessageBuilder<T> sequenceId(long sequenceId);
/** Set replication clusters */
TypedMessageBuilder<T> replicationClusters(List<String> clusters);
/** Disable replication */
TypedMessageBuilder<T> disableReplication();
/** Set delivery time (delayed message) */
TypedMessageBuilder<T> deliverAt(long timestamp);
/** Set delivery delay */
TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit);
/** Load configuration from map */
TypedMessageBuilder<T> loadConf(Map<String, Object> config);
}Message Builder Examples:
// Simple message with key
MessageId msgId = producer.newMessage()
.key("user-123")
.value("User data")
.send();
// Message with properties and event time
MessageId msgId = producer.newMessage()
.value("Event data")
.property("source", "mobile-app")
.property("version", "1.0")
.eventTime(System.currentTimeMillis())
.send();
// Delayed message delivery
MessageId msgId = producer.newMessage()
.value("Delayed message")
.deliverAfter(5, TimeUnit.MINUTES)
.send();
// Async message with callback
producer.newMessage()
.value("Async message")
.sendAsync()
.thenAccept(messageId -> System.out.println("Sent: " + messageId))
.exceptionally(ex -> {
System.err.println("Failed: " + ex.getMessage());
return null;
});Interface for accessing producer statistics and metrics.
/**
* Producer statistics interface
*/
interface ProducerStats {
/** Number of messages sent */
long getNumMsgsSent();
/** Number of bytes sent */
long getNumBytesSent();
/** Number of send failures */
long getNumSendFailed();
/** Number of acknowledgments received */
long getNumAcksReceived();
/** Send rate in messages per second */
double getSendMsgsRate();
/** Send rate in bytes per second */
double getSendBytesRate();
/** 50th percentile send latency in milliseconds */
double getSendLatencyMillis50pct();
/** 75th percentile send latency in milliseconds */
double getSendLatencyMillis75pct();
/** 95th percentile send latency in milliseconds */
double getSendLatencyMillis95pct();
/** 99th percentile send latency in milliseconds */
double getSendLatencyMillis99pct();
/** 99.9th percentile send latency in milliseconds */
double getSendLatencyMillis999pct();
/** Maximum send latency in milliseconds */
double getSendLatencyMillisMax();
/** Total messages sent since creation */
long getTotalMsgsSent();
/** Total bytes sent since creation */
long getTotalBytesSent();
/** Total send failures since creation */
long getTotalSendFailed();
/** Total acknowledgments received since creation */
long getTotalAcksReceived();
/** Current pending queue size */
int getPendingQueueSize();
}enum MessageRoutingMode {
/** Route to single partition */
SinglePartition,
/** Round-robin across partitions */
RoundRobinPartition,
/** Use custom partitioning logic */
CustomPartition
}
enum HashingScheme {
/** Java String hash */
JavaStringHash,
/** Murmur3 32-bit hash */
Murmur3_32Hash
}
enum CompressionType {
NONE,
LZ4,
ZLIB,
ZSTD,
SNAPPY
}
enum ProducerAccessMode {
/** Multiple producers allowed */
Shared,
/** Single producer only */
Exclusive,
/** Wait for exclusive access */
WaitForExclusive
}
enum ProducerCryptoFailureAction {
/** Fail the send operation */
FAIL,
/** Send message unencrypted */
SEND
}
interface MessageRouter {
/** Choose partition for message */
int choosePartition(Message<?> msg, TopicMetadata metadata);
}
interface BatcherBuilder {
/** Build batch message container */
BatchMessageContainer build();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client