0
# Apache Pulsar Java Client
1
2
Apache Pulsar Java client library for distributed pub-sub messaging platform with flexible messaging models and intuitive client APIs. Provides comprehensive APIs for producing and consuming messages from Pulsar topics, supporting advanced features like schema validation, message routing, batching, compression, authentication, and transactions.
3
4
## Package Information
5
6
- **Package Name**: org.apache.pulsar:pulsar-client
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.pulsar</groupId>
14
<artifactId>pulsar-client</artifactId>
15
<version>4.0.6</version>
16
</dependency>
17
```
18
19
For Gradle:
20
21
```gradle
22
implementation 'org.apache.pulsar:pulsar-client:4.0.6'
23
```
24
25
## Core Imports
26
27
```java
28
import org.apache.pulsar.client.api.*;
29
```
30
31
Specific imports:
32
33
```java
34
import org.apache.pulsar.client.api.PulsarClient;
35
import org.apache.pulsar.client.api.ClientBuilder;
36
import org.apache.pulsar.client.api.Producer;
37
import org.apache.pulsar.client.api.Consumer;
38
import org.apache.pulsar.client.api.Reader;
39
import org.apache.pulsar.client.api.Message;
40
import org.apache.pulsar.client.api.MessageId;
41
import org.apache.pulsar.client.api.Schema;
42
```
43
44
## Basic Usage
45
46
```java
47
import org.apache.pulsar.client.api.*;
48
49
// Create client
50
PulsarClient client = PulsarClient.builder()
51
.serviceUrl("pulsar://localhost:6650")
52
.build();
53
54
// Create producer
55
Producer<String> producer = client.newProducer(Schema.STRING)
56
.topic("my-topic")
57
.create();
58
59
// Send message
60
MessageId msgId = producer.send("Hello Pulsar!");
61
62
// Create consumer
63
Consumer<String> consumer = client.newConsumer(Schema.STRING)
64
.topic("my-topic")
65
.subscriptionName("my-subscription")
66
.subscribe();
67
68
// Receive message
69
Message<String> message = consumer.receive();
70
System.out.println("Received: " + message.getValue());
71
consumer.acknowledge(message);
72
73
// Clean up
74
producer.close();
75
consumer.close();
76
client.close();
77
```
78
79
## Architecture
80
81
Apache Pulsar Java client is built around several key components:
82
83
- **PulsarClient**: Main entry point for all operations, manages connections and resources
84
- **Producer**: Publishes messages to topics with configurable routing, batching, and compression
85
- **Consumer**: Subscribes to topics with various subscription types and acknowledgment patterns
86
- **Reader**: Low-level interface for manual positioning and reading from topics
87
- **Schema**: Type-safe serialization/deserialization for messages
88
- **Builder Pattern**: Extensive use of builders for flexible configuration
89
- **Async Operations**: CompletableFuture-based asynchronous APIs throughout
90
91
## Capabilities
92
93
### Client Management
94
95
Core client creation, configuration, and lifecycle management. Essential for establishing connections to Pulsar brokers.
96
97
```java { .api }
98
interface PulsarClient extends Closeable {
99
static ClientBuilder builder();
100
void close() throws PulsarClientException;
101
CompletableFuture<Void> closeAsync();
102
boolean isClosed();
103
}
104
105
interface ClientBuilder extends Serializable, Cloneable {
106
PulsarClient build() throws PulsarClientException;
107
ClientBuilder serviceUrl(String serviceUrl);
108
ClientBuilder authentication(Authentication authentication);
109
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
110
}
111
```
112
113
[Client Management](./client-management.md)
114
115
### Message Production
116
117
Publishing messages to topics with support for batching, compression, encryption, and custom routing strategies.
118
119
```java { .api }
120
interface Producer<T> extends Closeable {
121
MessageId send(T message) throws PulsarClientException;
122
CompletableFuture<MessageId> sendAsync(T message);
123
TypedMessageBuilder<T> newMessage();
124
String getTopic();
125
String getProducerName();
126
}
127
128
interface ProducerBuilder<T> extends Serializable, Cloneable {
129
Producer<T> create() throws PulsarClientException;
130
CompletableFuture<Producer<T>> createAsync();
131
ProducerBuilder<T> topic(String topicName);
132
ProducerBuilder<T> producerName(String producerName);
133
}
134
```
135
136
[Message Production](./message-production.md)
137
138
### Message Consumption
139
140
Subscribing to topics with various subscription types, acknowledgment patterns, and message processing strategies.
141
142
```java { .api }
143
interface Consumer<T> extends Closeable {
144
Message<T> receive() throws PulsarClientException;
145
CompletableFuture<Message<T>> receiveAsync();
146
void acknowledge(Message<?> message) throws PulsarClientException;
147
void acknowledge(MessageId messageId) throws PulsarClientException;
148
CompletableFuture<Void> acknowledgeAsync(Message<?> message);
149
}
150
151
interface ConsumerBuilder<T> extends Serializable, Cloneable {
152
Consumer<T> subscribe() throws PulsarClientException;
153
CompletableFuture<Consumer<T>> subscribeAsync();
154
ConsumerBuilder<T> topic(String... topicNames);
155
ConsumerBuilder<T> subscriptionName(String subscriptionName);
156
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
157
}
158
```
159
160
[Message Consumption](./message-consumption.md)
161
162
### Message Reading
163
164
Low-level message reading with manual positioning for replay scenarios and custom consumption patterns.
165
166
```java { .api }
167
interface Reader<T> extends Closeable {
168
Message<T> readNext() throws PulsarClientException;
169
CompletableFuture<Message<T>> readNextAsync();
170
void seek(MessageId messageId) throws PulsarClientException;
171
CompletableFuture<Void> seekAsync(MessageId messageId);
172
boolean hasMessageAvailable() throws PulsarClientException;
173
}
174
175
interface ReaderBuilder<T> extends Serializable, Cloneable {
176
Reader<T> create() throws PulsarClientException;
177
CompletableFuture<Reader<T>> createAsync();
178
ReaderBuilder<T> topic(String topicName);
179
ReaderBuilder<T> startMessageId(MessageId startMessageId);
180
}
181
```
182
183
[Message Reading](./message-reading.md)
184
185
### Schema and Serialization
186
187
Type-safe message serialization with built-in schemas and support for custom serialization formats.
188
189
```java { .api }
190
interface Schema<T> {
191
byte[] encode(T message);
192
T decode(byte[] bytes);
193
SchemaInfo getSchemaInfo();
194
Schema<T> clone();
195
196
// Built-in schemas
197
static final Schema<byte[]> BYTES;
198
static final Schema<String> STRING;
199
static final Schema<Integer> INT32;
200
static final Schema<Long> INT64;
201
static final Schema<Boolean> BOOL;
202
}
203
```
204
205
[Schema and Serialization](./schema-serialization.md)
206
207
### Authentication and Security
208
209
Authentication mechanisms, TLS configuration, and message encryption for secure messaging.
210
211
```java { .api }
212
interface Authentication extends Serializable, Closeable {
213
String getAuthMethodName();
214
AuthenticationDataProvider getAuthData() throws PulsarClientException;
215
void configure(String encodedAuthParamString);
216
void configure(Map<String, String> authParams);
217
}
218
219
class AuthenticationFactory {
220
static Authentication TLS(String certFilePath, String keyFilePath);
221
static Authentication token(String token);
222
static Authentication create(String authPluginClassName, String encodedAuthParamString);
223
}
224
```
225
226
[Authentication and Security](./authentication-security.md)
227
228
### Transaction Support
229
230
Transactional messaging for exactly-once semantics and multi-topic atomic operations.
231
232
```java { .api }
233
interface TransactionBuilder {
234
CompletableFuture<Transaction> build();
235
TransactionBuilder withTransactionTimeout(long timeout, TimeUnit timeUnit);
236
}
237
238
enum TransactionIsolationLevel {
239
READ_COMMITTED,
240
READ_UNCOMMITTED
241
}
242
```
243
244
[Transaction Support](./transaction-support.md)
245
246
### Table Views
247
248
Real-time key-value view of compacted topics with automatic updates for caching and lookup scenarios.
249
250
```java { .api }
251
interface TableView<T> extends AutoCloseable {
252
int size();
253
boolean isEmpty();
254
boolean containsKey(String key);
255
T get(String key);
256
Set<String> keySet();
257
Collection<T> values();
258
Set<Map.Entry<String, T>> entrySet();
259
void forEach(BiConsumer<String, T> action);
260
CompletableFuture<Void> closeAsync();
261
}
262
263
interface TableViewBuilder<T> {
264
TableView<T> create() throws PulsarClientException;
265
CompletableFuture<TableView<T>> createAsync();
266
TableViewBuilder<T> topic(String topic);
267
TableViewBuilder<T> subscriptionName(String subscriptionName);
268
}
269
```
270
271
## Exception Handling
272
273
```java { .api }
274
class PulsarClientException extends Exception {
275
// Nested exception classes
276
static class AuthenticationException extends PulsarClientException;
277
static class AuthorizationException extends PulsarClientException;
278
static class ConnectException extends PulsarClientException;
279
static class TimeoutException extends PulsarClientException;
280
static class TopicDoesNotExistException extends PulsarClientException;
281
static class ProducerBusyException extends PulsarClientException;
282
static class ConsumerBusyException extends PulsarClientException;
283
static class InvalidMessageException extends PulsarClientException;
284
static class InvalidConfigurationException extends PulsarClientException;
285
static class AlreadyClosedException extends PulsarClientException;
286
static class TopicTerminatedException extends PulsarClientException;
287
static class LookupException extends PulsarClientException;
288
static class TooManyRequestsException extends PulsarClientException;
289
static class BrokerPersistenceException extends PulsarClientException;
290
static class BrokerMetadataException extends PulsarClientException;
291
static class ProducerQueueIsFullError extends PulsarClientException;
292
static class MessageAcknowledgeException extends PulsarClientException;
293
static class ConsumerAssignException extends PulsarClientException;
294
static class TransactionConflictException extends PulsarClientException;
295
static class ProducerFencedException extends PulsarClientException;
296
static class MemoryBufferIsFullError extends PulsarClientException;
297
static class NotAllowedException extends PulsarClientException;
298
}
299
```
300
301
## Core Types
302
303
```java { .api }
304
interface Message<T> {
305
T getValue();
306
byte[] getData();
307
int size();
308
MessageId getMessageId();
309
long getPublishTime();
310
long getEventTime();
311
long getSequenceId();
312
String getProducerName();
313
String getKey();
314
boolean hasKey();
315
byte[] getKeyBytes();
316
boolean hasBase64EncodedKey();
317
boolean hasOrderingKey();
318
byte[] getOrderingKey();
319
String getTopicName();
320
Map<String, String> getProperties();
321
boolean hasProperty(String name);
322
String getProperty(String name);
323
byte[] getSchemaVersion();
324
int getRedeliveryCount();
325
boolean isReplicated();
326
String getReplicatedFrom();
327
boolean hasBrokerPublishTime();
328
Optional<Long> getBrokerPublishTime();
329
boolean hasIndex();
330
Optional<Long> getIndex();
331
Optional<EncryptionContext> getEncryptionCtx();
332
void release();
333
}
334
335
interface MessageId extends Comparable<MessageId>, Serializable {
336
byte[] toByteArray();
337
static MessageId fromByteArray(byte[] data) throws IOException;
338
static MessageId fromByteArrayWithTopic(byte[] data, String topicName) throws IOException;
339
static final MessageId earliest;
340
static final MessageId latest;
341
}
342
343
interface TopicMessageId extends MessageId {
344
String getTopicPartitionName();
345
MessageId getInnerMessageId();
346
long getLedgerId();
347
long getEntryId();
348
int getPartitionIndex();
349
}
350
351
interface TypedMessageBuilder<T> {
352
MessageId send() throws PulsarClientException;
353
CompletableFuture<MessageId> sendAsync();
354
TypedMessageBuilder<T> key(String key);
355
TypedMessageBuilder<T> value(T value);
356
TypedMessageBuilder<T> property(String name, String value);
357
TypedMessageBuilder<T> eventTime(long timestamp);
358
}
359
```
360
361
## Enums and Configuration
362
363
```java { .api }
364
enum SubscriptionType {
365
Exclusive,
366
Shared,
367
Failover,
368
Key_Shared
369
}
370
371
enum SubscriptionMode {
372
Durable,
373
NonDurable
374
}
375
376
enum CompressionType {
377
NONE,
378
LZ4,
379
ZLIB,
380
ZSTD,
381
SNAPPY
382
}
383
384
enum MessageRoutingMode {
385
SinglePartition,
386
RoundRobinPartition,
387
CustomPartition
388
}
389
390
interface Messages<T> extends Iterable<Message<T>>, AutoCloseable {
391
int size();
392
List<T> stream();
393
Iterator<Message<T>> iterator();
394
}
395
396
interface EncryptionContext {
397
Map<String, EncryptionKey> getKeys();
398
byte[] getParam();
399
CompressionType getCompressionType();
400
int getUncompressedMessageSize();
401
402
interface EncryptionKey {
403
String getKeyName();
404
byte[] getKeyValue();
405
Map<String, String> getMetadata();
406
}
407
}
408
```