or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

message-production.mddocs/

0

# Message Production

1

2

Publishing messages to topics with support for batching, compression, encryption, custom routing strategies, and advanced delivery options.

3

4

## Capabilities

5

6

### Producer Interface

7

8

Core interface for publishing messages to Pulsar topics.

9

10

```java { .api }

11

/**

12

* Interface for producing messages to topics

13

* Thread-safe and can be used concurrently from multiple threads

14

*/

15

interface Producer<T> extends Closeable {

16

/** Get producer name */

17

String getProducerName();

18

19

/** Get topic name */

20

String getTopic();

21

22

/** Send message synchronously */

23

MessageId send(T message) throws PulsarClientException;

24

25

/** Send message asynchronously */

26

CompletableFuture<MessageId> sendAsync(T message);

27

28

/** Create a typed message builder for advanced message options */

29

TypedMessageBuilder<T> newMessage();

30

31

/** Create a typed message builder with different schema */

32

<V> TypedMessageBuilder<V> newMessage(Schema<V> schema);

33

34

/** Create a typed message builder for transactional messages */

35

TypedMessageBuilder<T> newMessage(Transaction txn);

36

37

/** Get last sequence ID sent by this producer */

38

long getLastSequenceId();

39

40

/** Get number of partitions for the topic */

41

int getNumOfPartitions();

42

43

/** Get producer statistics */

44

ProducerStats getStats();

45

46

/** Check if producer is connected to broker */

47

boolean isConnected();

48

49

/** Get timestamp of last disconnection */

50

long getLastDisconnectedTimestamp();

51

52

/** Flush all pending messages synchronously */

53

void flush() throws PulsarClientException;

54

55

/** Flush all pending messages asynchronously */

56

CompletableFuture<Void> flushAsync();

57

58

/** Close producer */

59

void close() throws PulsarClientException;

60

61

/** Close producer asynchronously */

62

CompletableFuture<Void> closeAsync();

63

}

64

```

65

66

**Usage Examples:**

67

68

```java

69

import org.apache.pulsar.client.api.*;

70

71

// Simple message sending

72

Producer<String> producer = client.newProducer(Schema.STRING)

73

.topic("my-topic")

74

.create();

75

76

MessageId msgId = producer.send("Hello World");

77

78

// Asynchronous sending

79

CompletableFuture<MessageId> future = producer.sendAsync("Async message");

80

future.thenAccept(messageId -> {

81

System.out.println("Message sent: " + messageId);

82

}).exceptionally(throwable -> {

83

System.err.println("Failed to send: " + throwable.getMessage());

84

return null;

85

});

86

87

// Flush pending messages

88

producer.flush();

89

90

// Get statistics

91

ProducerStats stats = producer.getStats();

92

System.out.println("Messages sent: " + stats.getNumMsgsSent());

93

```

94

95

### ProducerBuilder Configuration

96

97

Builder interface for configuring and creating Producer instances.

98

99

```java { .api }

100

/**

101

* Builder for configuring and creating Producer instances

102

*/

103

interface ProducerBuilder<T> extends Serializable, Cloneable {

104

/** Create the producer synchronously */

105

Producer<T> create() throws PulsarClientException;

106

107

/** Create the producer asynchronously */

108

CompletableFuture<Producer<T>> createAsync();

109

110

/** Clone the builder */

111

ProducerBuilder<T> clone();

112

113

/** Set topic name (required) */

114

ProducerBuilder<T> topic(String topicName);

115

116

/** Set producer name (optional, auto-generated if not set) */

117

ProducerBuilder<T> producerName(String producerName);

118

119

/** Set send timeout (default: 30 seconds) */

120

ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit);

121

122

/** Set max pending messages (default: 1000) */

123

ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);

124

125

/** Set max pending messages across partitions */

126

ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);

127

128

/** Block if queue is full (default: false) */

129

ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull);

130

131

/** Set message routing mode */

132

ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode);

133

134

/** Set hashing scheme for message routing */

135

ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme);

136

137

/** Set custom message router */

138

ProducerBuilder<T> messageRouter(MessageRouter messageRouter);

139

140

/** Set compression type */

141

ProducerBuilder<T> compressionType(CompressionType compressionType);

142

143

/** Enable message batching (default: true) */

144

ProducerBuilder<T> enableBatching(boolean enableBatching);

145

146

/** Set batching max messages (default: 1000) */

147

ProducerBuilder<T> batchingMaxMessages(int batchingMaxMessages);

148

149

/** Set batching max publish delay */

150

ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);

151

152

/** Set batching max bytes */

153

ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes);

154

155

/** Set batching partition switch frequency */

156

ProducerBuilder<T> batchingPartitionSwitchFrequencyByPublishDelay(int frequency);

157

158

/** Set batch builder */

159

ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);

160

161

/** Set initial sequence ID */

162

ProducerBuilder<T> initialSequenceId(long initialSequenceId);

163

164

/** Add property */

165

ProducerBuilder<T> property(String key, String value);

166

167

/** Set properties */

168

ProducerBuilder<T> properties(Map<String, String> properties);

169

170

/** Add producer interceptor */

171

ProducerBuilder<T> intercept(ProducerInterceptor<T> interceptor);

172

173

/** Enable chunking for large messages */

174

ProducerBuilder<T> enableChunking(boolean enableChunking);

175

176

/** Set chunk max message size */

177

ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize);

178

179

/** Set producer access mode */

180

ProducerBuilder<T> accessMode(ProducerAccessMode accessMode);

181

182

/** Enable lazy start of producers */

183

ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean enableLazyStartPartitionedProducers);

184

185

/** Enable multi-schema support */

186

ProducerBuilder<T> enableMultiSchema(boolean enableMultiSchema);

187

}

188

```

189

190

### Encryption Configuration

191

192

Configure message encryption for producers.

193

194

```java { .api }

195

interface ProducerBuilder<T> {

196

/** Set crypto key reader */

197

ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

198

199

/** Add encryption key */

200

ProducerBuilder<T> addEncryptionKey(String key);

201

202

/** Set default crypto key reader using public key path */

203

ProducerBuilder<T> defaultCryptoKeyReader(String publicKeyPath);

204

205

/** Set default crypto key reader using key store */

206

ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys);

207

208

/** Set crypto failure action */

209

ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action);

210

}

211

```

212

213

**Producer Configuration Examples:**

214

215

```java

216

// Basic producer configuration

217

Producer<String> producer = client.newProducer(Schema.STRING)

218

.topic("my-topic")

219

.producerName("my-producer")

220

.sendTimeout(60, TimeUnit.SECONDS)

221

.compressionType(CompressionType.LZ4)

222

.create();

223

224

// Batching configuration

225

Producer<byte[]> producer = client.newProducer()

226

.topic("batch-topic")

227

.enableBatching(true)

228

.batchingMaxMessages(100)

229

.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)

230

.batchingMaxBytes(1024 * 1024)

231

.create();

232

233

// Custom routing

234

Producer<String> producer = client.newProducer(Schema.STRING)

235

.topic("partitioned-topic")

236

.messageRoutingMode(MessageRoutingMode.CustomPartition)

237

.messageRouter(new CustomMessageRouter())

238

.create();

239

240

// Encryption configuration

241

Producer<String> producer = client.newProducer(Schema.STRING)

242

.topic("encrypted-topic")

243

.addEncryptionKey("my-key")

244

.cryptoKeyReader(new MyCryptoKeyReader())

245

.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)

246

.create();

247

```

248

249

### TypedMessageBuilder

250

251

Builder interface for creating messages with advanced options.

252

253

```java { .api }

254

/**

255

* Builder for creating typed messages with advanced properties

256

*/

257

interface TypedMessageBuilder<T> {

258

/** Send message synchronously and return MessageId */

259

MessageId send() throws PulsarClientException;

260

261

/** Send message asynchronously */

262

CompletableFuture<MessageId> sendAsync();

263

264

/** Set message key for routing and compaction */

265

TypedMessageBuilder<T> key(String key);

266

267

/** Set message key as bytes */

268

TypedMessageBuilder<T> keyBytes(byte[] key);

269

270

/** Set ordering key for ordered delivery */

271

TypedMessageBuilder<T> orderingKey(byte[] orderingKey);

272

273

/** Set message value */

274

TypedMessageBuilder<T> value(T value);

275

276

/** Add message property */

277

TypedMessageBuilder<T> property(String name, String value);

278

279

/** Set message properties */

280

TypedMessageBuilder<T> properties(Map<String, String> properties);

281

282

/** Set event time timestamp */

283

TypedMessageBuilder<T> eventTime(long timestamp);

284

285

/** Set sequence ID */

286

TypedMessageBuilder<T> sequenceId(long sequenceId);

287

288

/** Set replication clusters */

289

TypedMessageBuilder<T> replicationClusters(List<String> clusters);

290

291

/** Disable replication */

292

TypedMessageBuilder<T> disableReplication();

293

294

/** Set delivery time (delayed message) */

295

TypedMessageBuilder<T> deliverAt(long timestamp);

296

297

/** Set delivery delay */

298

TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit);

299

300

/** Load configuration from map */

301

TypedMessageBuilder<T> loadConf(Map<String, Object> config);

302

}

303

```

304

305

**Message Builder Examples:**

306

307

```java

308

// Simple message with key

309

MessageId msgId = producer.newMessage()

310

.key("user-123")

311

.value("User data")

312

.send();

313

314

// Message with properties and event time

315

MessageId msgId = producer.newMessage()

316

.value("Event data")

317

.property("source", "mobile-app")

318

.property("version", "1.0")

319

.eventTime(System.currentTimeMillis())

320

.send();

321

322

// Delayed message delivery

323

MessageId msgId = producer.newMessage()

324

.value("Delayed message")

325

.deliverAfter(5, TimeUnit.MINUTES)

326

.send();

327

328

// Async message with callback

329

producer.newMessage()

330

.value("Async message")

331

.sendAsync()

332

.thenAccept(messageId -> System.out.println("Sent: " + messageId))

333

.exceptionally(ex -> {

334

System.err.println("Failed: " + ex.getMessage());

335

return null;

336

});

337

```

338

339

### Producer Statistics

340

341

Interface for accessing producer statistics and metrics.

342

343

```java { .api }

344

/**

345

* Producer statistics interface

346

*/

347

interface ProducerStats {

348

/** Number of messages sent */

349

long getNumMsgsSent();

350

351

/** Number of bytes sent */

352

long getNumBytesSent();

353

354

/** Number of send failures */

355

long getNumSendFailed();

356

357

/** Number of acknowledgments received */

358

long getNumAcksReceived();

359

360

/** Send rate in messages per second */

361

double getSendMsgsRate();

362

363

/** Send rate in bytes per second */

364

double getSendBytesRate();

365

366

/** 50th percentile send latency in milliseconds */

367

double getSendLatencyMillis50pct();

368

369

/** 75th percentile send latency in milliseconds */

370

double getSendLatencyMillis75pct();

371

372

/** 95th percentile send latency in milliseconds */

373

double getSendLatencyMillis95pct();

374

375

/** 99th percentile send latency in milliseconds */

376

double getSendLatencyMillis99pct();

377

378

/** 99.9th percentile send latency in milliseconds */

379

double getSendLatencyMillis999pct();

380

381

/** Maximum send latency in milliseconds */

382

double getSendLatencyMillisMax();

383

384

/** Total messages sent since creation */

385

long getTotalMsgsSent();

386

387

/** Total bytes sent since creation */

388

long getTotalBytesSent();

389

390

/** Total send failures since creation */

391

long getTotalSendFailed();

392

393

/** Total acknowledgments received since creation */

394

long getTotalAcksReceived();

395

396

/** Current pending queue size */

397

int getPendingQueueSize();

398

}

399

```

400

401

## Supporting Types and Enums

402

403

```java { .api }

404

enum MessageRoutingMode {

405

/** Route to single partition */

406

SinglePartition,

407

/** Round-robin across partitions */

408

RoundRobinPartition,

409

/** Use custom partitioning logic */

410

CustomPartition

411

}

412

413

enum HashingScheme {

414

/** Java String hash */

415

JavaStringHash,

416

/** Murmur3 32-bit hash */

417

Murmur3_32Hash

418

}

419

420

enum CompressionType {

421

NONE,

422

LZ4,

423

ZLIB,

424

ZSTD,

425

SNAPPY

426

}

427

428

enum ProducerAccessMode {

429

/** Multiple producers allowed */

430

Shared,

431

/** Single producer only */

432

Exclusive,

433

/** Wait for exclusive access */

434

WaitForExclusive

435

}

436

437

enum ProducerCryptoFailureAction {

438

/** Fail the send operation */

439

FAIL,

440

/** Send message unencrypted */

441

SEND

442

}

443

444

interface MessageRouter {

445

/** Choose partition for message */

446

int choosePartition(Message<?> msg, TopicMetadata metadata);

447

}

448

449

interface BatcherBuilder {

450

/** Build batch message container */

451

BatchMessageContainer build();

452

}

453

```