0
# Message Production
1
2
Publishing messages to topics with support for batching, compression, encryption, custom routing strategies, and advanced delivery options.
3
4
## Capabilities
5
6
### Producer Interface
7
8
Core interface for publishing messages to Pulsar topics.
9
10
```java { .api }
11
/**
12
* Interface for producing messages to topics
13
* Thread-safe and can be used concurrently from multiple threads
14
*/
15
interface Producer<T> extends Closeable {
16
/** Get producer name */
17
String getProducerName();
18
19
/** Get topic name */
20
String getTopic();
21
22
/** Send message synchronously */
23
MessageId send(T message) throws PulsarClientException;
24
25
/** Send message asynchronously */
26
CompletableFuture<MessageId> sendAsync(T message);
27
28
/** Create a typed message builder for advanced message options */
29
TypedMessageBuilder<T> newMessage();
30
31
/** Create a typed message builder with different schema */
32
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema);
33
34
/** Create a typed message builder for transactional messages */
35
TypedMessageBuilder<T> newMessage(Transaction txn);
36
37
/** Get last sequence ID sent by this producer */
38
long getLastSequenceId();
39
40
/** Get number of partitions for the topic */
41
int getNumOfPartitions();
42
43
/** Get producer statistics */
44
ProducerStats getStats();
45
46
/** Check if producer is connected to broker */
47
boolean isConnected();
48
49
/** Get timestamp of last disconnection */
50
long getLastDisconnectedTimestamp();
51
52
/** Flush all pending messages synchronously */
53
void flush() throws PulsarClientException;
54
55
/** Flush all pending messages asynchronously */
56
CompletableFuture<Void> flushAsync();
57
58
/** Close producer */
59
void close() throws PulsarClientException;
60
61
/** Close producer asynchronously */
62
CompletableFuture<Void> closeAsync();
63
}
64
```
65
66
**Usage Examples:**
67
68
```java
69
import org.apache.pulsar.client.api.*;
70
71
// Simple message sending
72
Producer<String> producer = client.newProducer(Schema.STRING)
73
.topic("my-topic")
74
.create();
75
76
MessageId msgId = producer.send("Hello World");
77
78
// Asynchronous sending
79
CompletableFuture<MessageId> future = producer.sendAsync("Async message");
80
future.thenAccept(messageId -> {
81
System.out.println("Message sent: " + messageId);
82
}).exceptionally(throwable -> {
83
System.err.println("Failed to send: " + throwable.getMessage());
84
return null;
85
});
86
87
// Flush pending messages
88
producer.flush();
89
90
// Get statistics
91
ProducerStats stats = producer.getStats();
92
System.out.println("Messages sent: " + stats.getNumMsgsSent());
93
```
94
95
### ProducerBuilder Configuration
96
97
Builder interface for configuring and creating Producer instances.
98
99
```java { .api }
100
/**
101
* Builder for configuring and creating Producer instances
102
*/
103
interface ProducerBuilder<T> extends Serializable, Cloneable {
104
/** Create the producer synchronously */
105
Producer<T> create() throws PulsarClientException;
106
107
/** Create the producer asynchronously */
108
CompletableFuture<Producer<T>> createAsync();
109
110
/** Clone the builder */
111
ProducerBuilder<T> clone();
112
113
/** Set topic name (required) */
114
ProducerBuilder<T> topic(String topicName);
115
116
/** Set producer name (optional, auto-generated if not set) */
117
ProducerBuilder<T> producerName(String producerName);
118
119
/** Set send timeout (default: 30 seconds) */
120
ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit);
121
122
/** Set max pending messages (default: 1000) */
123
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);
124
125
/** Set max pending messages across partitions */
126
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
127
128
/** Block if queue is full (default: false) */
129
ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull);
130
131
/** Set message routing mode */
132
ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode);
133
134
/** Set hashing scheme for message routing */
135
ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme);
136
137
/** Set custom message router */
138
ProducerBuilder<T> messageRouter(MessageRouter messageRouter);
139
140
/** Set compression type */
141
ProducerBuilder<T> compressionType(CompressionType compressionType);
142
143
/** Enable message batching (default: true) */
144
ProducerBuilder<T> enableBatching(boolean enableBatching);
145
146
/** Set batching max messages (default: 1000) */
147
ProducerBuilder<T> batchingMaxMessages(int batchingMaxMessages);
148
149
/** Set batching max publish delay */
150
ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);
151
152
/** Set batching max bytes */
153
ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes);
154
155
/** Set batching partition switch frequency */
156
ProducerBuilder<T> batchingPartitionSwitchFrequencyByPublishDelay(int frequency);
157
158
/** Set batch builder */
159
ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);
160
161
/** Set initial sequence ID */
162
ProducerBuilder<T> initialSequenceId(long initialSequenceId);
163
164
/** Add property */
165
ProducerBuilder<T> property(String key, String value);
166
167
/** Set properties */
168
ProducerBuilder<T> properties(Map<String, String> properties);
169
170
/** Add producer interceptor */
171
ProducerBuilder<T> intercept(ProducerInterceptor<T> interceptor);
172
173
/** Enable chunking for large messages */
174
ProducerBuilder<T> enableChunking(boolean enableChunking);
175
176
/** Set chunk max message size */
177
ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize);
178
179
/** Set producer access mode */
180
ProducerBuilder<T> accessMode(ProducerAccessMode accessMode);
181
182
/** Enable lazy start of producers */
183
ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean enableLazyStartPartitionedProducers);
184
185
/** Enable multi-schema support */
186
ProducerBuilder<T> enableMultiSchema(boolean enableMultiSchema);
187
}
188
```
189
190
### Encryption Configuration
191
192
Configure message encryption for producers.
193
194
```java { .api }
195
interface ProducerBuilder<T> {
196
/** Set crypto key reader */
197
ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
198
199
/** Add encryption key */
200
ProducerBuilder<T> addEncryptionKey(String key);
201
202
/** Set default crypto key reader using public key path */
203
ProducerBuilder<T> defaultCryptoKeyReader(String publicKeyPath);
204
205
/** Set default crypto key reader using key store */
206
ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys);
207
208
/** Set crypto failure action */
209
ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action);
210
}
211
```
212
213
**Producer Configuration Examples:**
214
215
```java
216
// Basic producer configuration
217
Producer<String> producer = client.newProducer(Schema.STRING)
218
.topic("my-topic")
219
.producerName("my-producer")
220
.sendTimeout(60, TimeUnit.SECONDS)
221
.compressionType(CompressionType.LZ4)
222
.create();
223
224
// Batching configuration
225
Producer<byte[]> producer = client.newProducer()
226
.topic("batch-topic")
227
.enableBatching(true)
228
.batchingMaxMessages(100)
229
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
230
.batchingMaxBytes(1024 * 1024)
231
.create();
232
233
// Custom routing
234
Producer<String> producer = client.newProducer(Schema.STRING)
235
.topic("partitioned-topic")
236
.messageRoutingMode(MessageRoutingMode.CustomPartition)
237
.messageRouter(new CustomMessageRouter())
238
.create();
239
240
// Encryption configuration
241
Producer<String> producer = client.newProducer(Schema.STRING)
242
.topic("encrypted-topic")
243
.addEncryptionKey("my-key")
244
.cryptoKeyReader(new MyCryptoKeyReader())
245
.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
246
.create();
247
```
248
249
### TypedMessageBuilder
250
251
Builder interface for creating messages with advanced options.
252
253
```java { .api }
254
/**
255
* Builder for creating typed messages with advanced properties
256
*/
257
interface TypedMessageBuilder<T> {
258
/** Send message synchronously and return MessageId */
259
MessageId send() throws PulsarClientException;
260
261
/** Send message asynchronously */
262
CompletableFuture<MessageId> sendAsync();
263
264
/** Set message key for routing and compaction */
265
TypedMessageBuilder<T> key(String key);
266
267
/** Set message key as bytes */
268
TypedMessageBuilder<T> keyBytes(byte[] key);
269
270
/** Set ordering key for ordered delivery */
271
TypedMessageBuilder<T> orderingKey(byte[] orderingKey);
272
273
/** Set message value */
274
TypedMessageBuilder<T> value(T value);
275
276
/** Add message property */
277
TypedMessageBuilder<T> property(String name, String value);
278
279
/** Set message properties */
280
TypedMessageBuilder<T> properties(Map<String, String> properties);
281
282
/** Set event time timestamp */
283
TypedMessageBuilder<T> eventTime(long timestamp);
284
285
/** Set sequence ID */
286
TypedMessageBuilder<T> sequenceId(long sequenceId);
287
288
/** Set replication clusters */
289
TypedMessageBuilder<T> replicationClusters(List<String> clusters);
290
291
/** Disable replication */
292
TypedMessageBuilder<T> disableReplication();
293
294
/** Set delivery time (delayed message) */
295
TypedMessageBuilder<T> deliverAt(long timestamp);
296
297
/** Set delivery delay */
298
TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit);
299
300
/** Load configuration from map */
301
TypedMessageBuilder<T> loadConf(Map<String, Object> config);
302
}
303
```
304
305
**Message Builder Examples:**
306
307
```java
308
// Simple message with key
309
MessageId msgId = producer.newMessage()
310
.key("user-123")
311
.value("User data")
312
.send();
313
314
// Message with properties and event time
315
MessageId msgId = producer.newMessage()
316
.value("Event data")
317
.property("source", "mobile-app")
318
.property("version", "1.0")
319
.eventTime(System.currentTimeMillis())
320
.send();
321
322
// Delayed message delivery
323
MessageId msgId = producer.newMessage()
324
.value("Delayed message")
325
.deliverAfter(5, TimeUnit.MINUTES)
326
.send();
327
328
// Async message with callback
329
producer.newMessage()
330
.value("Async message")
331
.sendAsync()
332
.thenAccept(messageId -> System.out.println("Sent: " + messageId))
333
.exceptionally(ex -> {
334
System.err.println("Failed: " + ex.getMessage());
335
return null;
336
});
337
```
338
339
### Producer Statistics
340
341
Interface for accessing producer statistics and metrics.
342
343
```java { .api }
344
/**
345
* Producer statistics interface
346
*/
347
interface ProducerStats {
348
/** Number of messages sent */
349
long getNumMsgsSent();
350
351
/** Number of bytes sent */
352
long getNumBytesSent();
353
354
/** Number of send failures */
355
long getNumSendFailed();
356
357
/** Number of acknowledgments received */
358
long getNumAcksReceived();
359
360
/** Send rate in messages per second */
361
double getSendMsgsRate();
362
363
/** Send rate in bytes per second */
364
double getSendBytesRate();
365
366
/** 50th percentile send latency in milliseconds */
367
double getSendLatencyMillis50pct();
368
369
/** 75th percentile send latency in milliseconds */
370
double getSendLatencyMillis75pct();
371
372
/** 95th percentile send latency in milliseconds */
373
double getSendLatencyMillis95pct();
374
375
/** 99th percentile send latency in milliseconds */
376
double getSendLatencyMillis99pct();
377
378
/** 99.9th percentile send latency in milliseconds */
379
double getSendLatencyMillis999pct();
380
381
/** Maximum send latency in milliseconds */
382
double getSendLatencyMillisMax();
383
384
/** Total messages sent since creation */
385
long getTotalMsgsSent();
386
387
/** Total bytes sent since creation */
388
long getTotalBytesSent();
389
390
/** Total send failures since creation */
391
long getTotalSendFailed();
392
393
/** Total acknowledgments received since creation */
394
long getTotalAcksReceived();
395
396
/** Current pending queue size */
397
int getPendingQueueSize();
398
}
399
```
400
401
## Supporting Types and Enums
402
403
```java { .api }
404
enum MessageRoutingMode {
405
/** Route to single partition */
406
SinglePartition,
407
/** Round-robin across partitions */
408
RoundRobinPartition,
409
/** Use custom partitioning logic */
410
CustomPartition
411
}
412
413
enum HashingScheme {
414
/** Java String hash */
415
JavaStringHash,
416
/** Murmur3 32-bit hash */
417
Murmur3_32Hash
418
}
419
420
enum CompressionType {
421
NONE,
422
LZ4,
423
ZLIB,
424
ZSTD,
425
SNAPPY
426
}
427
428
enum ProducerAccessMode {
429
/** Multiple producers allowed */
430
Shared,
431
/** Single producer only */
432
Exclusive,
433
/** Wait for exclusive access */
434
WaitForExclusive
435
}
436
437
enum ProducerCryptoFailureAction {
438
/** Fail the send operation */
439
FAIL,
440
/** Send message unencrypted */
441
SEND
442
}
443
444
interface MessageRouter {
445
/** Choose partition for message */
446
int choosePartition(Message<?> msg, TopicMetadata metadata);
447
}
448
449
interface BatcherBuilder {
450
/** Build batch message container */
451
BatchMessageContainer build();
452
}
453
```