Apache Pulsar Java client library for distributed pub-sub messaging platform
npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-client@4.0.0Apache Pulsar Java client library for distributed pub-sub messaging platform with flexible messaging models and intuitive client APIs. Provides comprehensive APIs for producing and consuming messages from Pulsar topics, supporting advanced features like schema validation, message routing, batching, compression, authentication, and transactions.
pom.xml:<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>4.0.6</version>
</dependency>For Gradle:
implementation 'org.apache.pulsar:pulsar-client:4.0.6'import org.apache.pulsar.client.api.*;Specific imports:
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;import org.apache.pulsar.client.api.*;
// Create client
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create producer
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
// Send message
MessageId msgId = producer.send("Hello Pulsar!");
// Create consumer
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
// Receive message
Message<String> message = consumer.receive();
System.out.println("Received: " + message.getValue());
consumer.acknowledge(message);
// Clean up
producer.close();
consumer.close();
client.close();Apache Pulsar Java client is built around several key components:
Core client creation, configuration, and lifecycle management. Essential for establishing connections to Pulsar brokers.
interface PulsarClient extends Closeable {
static ClientBuilder builder();
void close() throws PulsarClientException;
CompletableFuture<Void> closeAsync();
boolean isClosed();
}
interface ClientBuilder extends Serializable, Cloneable {
PulsarClient build() throws PulsarClientException;
ClientBuilder serviceUrl(String serviceUrl);
ClientBuilder authentication(Authentication authentication);
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
}Publishing messages to topics with support for batching, compression, encryption, and custom routing strategies.
interface Producer<T> extends Closeable {
MessageId send(T message) throws PulsarClientException;
CompletableFuture<MessageId> sendAsync(T message);
TypedMessageBuilder<T> newMessage();
String getTopic();
String getProducerName();
}
interface ProducerBuilder<T> extends Serializable, Cloneable {
Producer<T> create() throws PulsarClientException;
CompletableFuture<Producer<T>> createAsync();
ProducerBuilder<T> topic(String topicName);
ProducerBuilder<T> producerName(String producerName);
}Subscribing to topics with various subscription types, acknowledgment patterns, and message processing strategies.
interface Consumer<T> extends Closeable {
Message<T> receive() throws PulsarClientException;
CompletableFuture<Message<T>> receiveAsync();
void acknowledge(Message<?> message) throws PulsarClientException;
void acknowledge(MessageId messageId) throws PulsarClientException;
CompletableFuture<Void> acknowledgeAsync(Message<?> message);
}
interface ConsumerBuilder<T> extends Serializable, Cloneable {
Consumer<T> subscribe() throws PulsarClientException;
CompletableFuture<Consumer<T>> subscribeAsync();
ConsumerBuilder<T> topic(String... topicNames);
ConsumerBuilder<T> subscriptionName(String subscriptionName);
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
}Low-level message reading with manual positioning for replay scenarios and custom consumption patterns.
interface Reader<T> extends Closeable {
Message<T> readNext() throws PulsarClientException;
CompletableFuture<Message<T>> readNextAsync();
void seek(MessageId messageId) throws PulsarClientException;
CompletableFuture<Void> seekAsync(MessageId messageId);
boolean hasMessageAvailable() throws PulsarClientException;
}
interface ReaderBuilder<T> extends Serializable, Cloneable {
Reader<T> create() throws PulsarClientException;
CompletableFuture<Reader<T>> createAsync();
ReaderBuilder<T> topic(String topicName);
ReaderBuilder<T> startMessageId(MessageId startMessageId);
}Type-safe message serialization with built-in schemas and support for custom serialization formats.
interface Schema<T> {
byte[] encode(T message);
T decode(byte[] bytes);
SchemaInfo getSchemaInfo();
Schema<T> clone();
// Built-in schemas
static final Schema<byte[]> BYTES;
static final Schema<String> STRING;
static final Schema<Integer> INT32;
static final Schema<Long> INT64;
static final Schema<Boolean> BOOL;
}Authentication mechanisms, TLS configuration, and message encryption for secure messaging.
interface Authentication extends Serializable, Closeable {
String getAuthMethodName();
AuthenticationDataProvider getAuthData() throws PulsarClientException;
void configure(String encodedAuthParamString);
void configure(Map<String, String> authParams);
}
class AuthenticationFactory {
static Authentication TLS(String certFilePath, String keyFilePath);
static Authentication token(String token);
static Authentication create(String authPluginClassName, String encodedAuthParamString);
}Transactional messaging for exactly-once semantics and multi-topic atomic operations.
interface TransactionBuilder {
CompletableFuture<Transaction> build();
TransactionBuilder withTransactionTimeout(long timeout, TimeUnit timeUnit);
}
enum TransactionIsolationLevel {
READ_COMMITTED,
READ_UNCOMMITTED
}Real-time key-value view of compacted topics with automatic updates for caching and lookup scenarios.
interface TableView<T> extends AutoCloseable {
int size();
boolean isEmpty();
boolean containsKey(String key);
T get(String key);
Set<String> keySet();
Collection<T> values();
Set<Map.Entry<String, T>> entrySet();
void forEach(BiConsumer<String, T> action);
CompletableFuture<Void> closeAsync();
}
interface TableViewBuilder<T> {
TableView<T> create() throws PulsarClientException;
CompletableFuture<TableView<T>> createAsync();
TableViewBuilder<T> topic(String topic);
TableViewBuilder<T> subscriptionName(String subscriptionName);
}class PulsarClientException extends Exception {
// Nested exception classes
static class AuthenticationException extends PulsarClientException;
static class AuthorizationException extends PulsarClientException;
static class ConnectException extends PulsarClientException;
static class TimeoutException extends PulsarClientException;
static class TopicDoesNotExistException extends PulsarClientException;
static class ProducerBusyException extends PulsarClientException;
static class ConsumerBusyException extends PulsarClientException;
static class InvalidMessageException extends PulsarClientException;
static class InvalidConfigurationException extends PulsarClientException;
static class AlreadyClosedException extends PulsarClientException;
static class TopicTerminatedException extends PulsarClientException;
static class LookupException extends PulsarClientException;
static class TooManyRequestsException extends PulsarClientException;
static class BrokerPersistenceException extends PulsarClientException;
static class BrokerMetadataException extends PulsarClientException;
static class ProducerQueueIsFullError extends PulsarClientException;
static class MessageAcknowledgeException extends PulsarClientException;
static class ConsumerAssignException extends PulsarClientException;
static class TransactionConflictException extends PulsarClientException;
static class ProducerFencedException extends PulsarClientException;
static class MemoryBufferIsFullError extends PulsarClientException;
static class NotAllowedException extends PulsarClientException;
}interface Message<T> {
T getValue();
byte[] getData();
int size();
MessageId getMessageId();
long getPublishTime();
long getEventTime();
long getSequenceId();
String getProducerName();
String getKey();
boolean hasKey();
byte[] getKeyBytes();
boolean hasBase64EncodedKey();
boolean hasOrderingKey();
byte[] getOrderingKey();
String getTopicName();
Map<String, String> getProperties();
boolean hasProperty(String name);
String getProperty(String name);
byte[] getSchemaVersion();
int getRedeliveryCount();
boolean isReplicated();
String getReplicatedFrom();
boolean hasBrokerPublishTime();
Optional<Long> getBrokerPublishTime();
boolean hasIndex();
Optional<Long> getIndex();
Optional<EncryptionContext> getEncryptionCtx();
void release();
}
interface MessageId extends Comparable<MessageId>, Serializable {
byte[] toByteArray();
static MessageId fromByteArray(byte[] data) throws IOException;
static MessageId fromByteArrayWithTopic(byte[] data, String topicName) throws IOException;
static final MessageId earliest;
static final MessageId latest;
}
interface TopicMessageId extends MessageId {
String getTopicPartitionName();
MessageId getInnerMessageId();
long getLedgerId();
long getEntryId();
int getPartitionIndex();
}
interface TypedMessageBuilder<T> {
MessageId send() throws PulsarClientException;
CompletableFuture<MessageId> sendAsync();
TypedMessageBuilder<T> key(String key);
TypedMessageBuilder<T> value(T value);
TypedMessageBuilder<T> property(String name, String value);
TypedMessageBuilder<T> eventTime(long timestamp);
}enum SubscriptionType {
Exclusive,
Shared,
Failover,
Key_Shared
}
enum SubscriptionMode {
Durable,
NonDurable
}
enum CompressionType {
NONE,
LZ4,
ZLIB,
ZSTD,
SNAPPY
}
enum MessageRoutingMode {
SinglePartition,
RoundRobinPartition,
CustomPartition
}
interface Messages<T> extends Iterable<Message<T>>, AutoCloseable {
int size();
List<T> stream();
Iterator<Message<T>> iterator();
}
interface EncryptionContext {
Map<String, EncryptionKey> getKeys();
byte[] getParam();
CompressionType getCompressionType();
int getUncompressedMessageSize();
interface EncryptionKey {
String getKeyName();
byte[] getKeyValue();
Map<String, String> getMetadata();
}
}