or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

collections.mdconfiguration.mddata-structures.mdindex.mdmessaging.mdreactive-async.mdsynchronization.md

messaging.mddocs/

0

# Pub/Sub Messaging

1

2

Redisson provides a comprehensive publish/subscribe messaging system built on Redis pub/sub capabilities. It supports regular topics, pattern-based subscriptions, sharded topics for scalability, and reliable topics with guaranteed delivery.

3

4

## Capabilities

5

6

### Basic Topics

7

8

Standard publish/subscribe topics for real-time messaging between distributed components.

9

10

```java { .api }

11

/**

12

* Get a topic for publish/subscribe messaging

13

* @param name - unique name of the topic

14

* @return RTopic instance

15

*/

16

public RTopic getTopic(String name);

17

public RTopic getTopic(String name, Codec codec);

18

public RTopic getTopic(PlainOptions options);

19

```

20

21

**Topic Interface:**

22

23

```java { .api }

24

public interface RTopic extends RTopicAsync {

25

// Publishing messages

26

long publish(Object message);

27

28

// Subscribing to messages

29

int addListener(MessageListener<Object> listener);

30

int addListener(Class<?> type, MessageListener<?> listener);

31

32

// Listener management

33

void removeListener(int listenerId);

34

void removeAllListeners();

35

36

// Topic information

37

List<String> getChannelNames();

38

long countListeners();

39

long countSubscribers();

40

}

41

42

// Message listener interface

43

@FunctionalInterface

44

public interface MessageListener<M> {

45

void onMessage(CharSequence channel, M msg);

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

import org.redisson.api.*;

53

54

// Get topic

55

RTopic topic = redisson.getTopic("notifications");

56

57

// Subscribe to messages

58

int listenerId = topic.addListener(String.class, (channel, message) -> {

59

System.out.println("Received on " + channel + ": " + message);

60

});

61

62

// Publish messages

63

long subscribersCount = topic.publish("Hello, subscribers!");

64

System.out.println("Message delivered to " + subscribersCount + " subscribers");

65

66

// Publish different message types

67

RTopic<User> userTopic = redisson.getTopic("user-events");

68

userTopic.addListener(User.class, (channel, user) -> {

69

System.out.println("User event: " + user.getName());

70

});

71

userTopic.publish(new User("Alice", 25));

72

73

// Remove listener when done

74

topic.removeListener(listenerId);

75

76

// Multiple listeners for same topic

77

RTopic eventTopic = redisson.getTopic("events");

78

79

int listener1 = eventTopic.addListener(String.class, (channel, msg) -> {

80

System.out.println("Listener 1: " + msg);

81

});

82

83

int listener2 = eventTopic.addListener(String.class, (channel, msg) -> {

84

System.out.println("Listener 2: " + msg);

85

});

86

87

eventTopic.publish("Event message"); // Both listeners receive this

88

89

// Remove specific listener

90

eventTopic.removeListener(listener1);

91

eventTopic.publish("Another event"); // Only listener2 receives this

92

```

93

94

### Pattern Topics

95

96

Pattern-based subscriptions allowing wildcard matching for dynamic topic subscription.

97

98

```java { .api }

99

/**

100

* Get a pattern topic for wildcard subscriptions

101

* @param pattern - pattern with wildcards (* and ?)

102

* @return RPatternTopic instance

103

*/

104

public RPatternTopic getPatternTopic(String pattern);

105

public RPatternTopic getPatternTopic(String pattern, Codec codec);

106

public RPatternTopic getPatternTopic(PatternTopicOptions options);

107

```

108

109

**Pattern Topic Interface:**

110

111

```java { .api }

112

public interface RPatternTopic extends RPatternTopicAsync {

113

// Subscribe with pattern matching

114

int addListener(PatternMessageListener<Object> listener);

115

int addListener(Class<?> type, PatternMessageListener<?> listener);

116

117

// Listener management

118

void removeListener(int listenerId);

119

void removeAllListeners();

120

121

// Pattern information

122

List<String> getPatternNames();

123

long countListeners();

124

}

125

126

// Pattern message listener interface

127

@FunctionalInterface

128

public interface PatternMessageListener<M> {

129

void onMessage(CharSequence pattern, CharSequence channel, M msg);

130

}

131

```

132

133

**Usage Examples:**

134

135

```java

136

// Pattern subscription - listen to multiple related topics

137

RPatternTopic patternTopic = redisson.getPatternTopic("user.*");

138

139

int patternListener = patternTopic.addListener(String.class, (pattern, channel, message) -> {

140

System.out.println("Pattern: " + pattern + ", Channel: " + channel + ", Message: " + message);

141

});

142

143

// These will all match the pattern "user.*"

144

RTopic userLoginTopic = redisson.getTopic("user.login");

145

RTopic userLogoutTopic = redisson.getTopic("user.logout");

146

RTopic userUpdateTopic = redisson.getTopic("user.update");

147

148

userLoginTopic.publish("User John logged in"); // Matches pattern

149

userLogoutTopic.publish("User John logged out"); // Matches pattern

150

userUpdateTopic.publish("User John updated profile"); // Matches pattern

151

152

// More specific patterns

153

RPatternTopic orderPattern = redisson.getPatternTopic("order.*.created");

154

orderPattern.addListener(String.class, (pattern, channel, message) -> {

155

System.out.println("New order created: " + message);

156

});

157

158

// Matches: order.electronics.created, order.books.created, etc.

159

redisson.getTopic("order.electronics.created").publish("Order #123 created");

160

redisson.getTopic("order.books.created").publish("Order #456 created");

161

162

// Multiple patterns

163

RPatternTopic alertPattern = redisson.getPatternTopic("alert.*");

164

RPatternTopic errorPattern = redisson.getPatternTopic("error.*");

165

166

alertPattern.addListener(String.class, (pattern, channel, msg) -> {

167

System.out.println("ALERT - " + channel + ": " + msg);

168

});

169

170

errorPattern.addListener(String.class, (pattern, channel, msg) -> {

171

System.err.println("ERROR - " + channel + ": " + msg);

172

});

173

```

174

175

### Sharded Topics

176

177

Sharded topics distribute messages across multiple Redis nodes for improved scalability and performance.

178

179

```java { .api }

180

/**

181

* Get a sharded topic for scalable messaging

182

* @param name - unique name of the sharded topic

183

* @return RShardedTopic instance

184

*/

185

public RShardedTopic getShardedTopic(String name);

186

public RShardedTopic getShardedTopic(String name, Codec codec);

187

public RShardedTopic getShardedTopic(PlainOptions options);

188

```

189

190

**Sharded Topic Interface:**

191

192

```java { .api }

193

public interface RShardedTopic extends RShardedTopicAsync {

194

// Publishing with sharding

195

long publish(Object message);

196

197

// Subscribing with automatic shard distribution

198

int addListener(MessageListener<Object> listener);

199

int addListener(Class<?> type, MessageListener<?> listener);

200

201

// Listener management

202

void removeListener(int listenerId);

203

void removeAllListeners();

204

205

// Sharding information

206

List<String> getChannelNames();

207

long countListeners();

208

long countSubscribers();

209

}

210

```

211

212

**Usage Examples:**

213

214

```java

215

// Sharded topic for high-throughput messaging

216

RShardedTopic shardedTopic = redisson.getShardedTopic("high-volume-events");

217

218

// Subscribe - automatically distributed across shards

219

int shardedListener = shardedTopic.addListener(String.class, (channel, message) -> {

220

System.out.println("Sharded message: " + message + " on " + channel);

221

});

222

223

// Publish - automatically distributed across available shards

224

for (int i = 0; i < 1000; i++) {

225

shardedTopic.publish("Message " + i);

226

}

227

228

// Multiple listeners automatically distributed

229

RShardedTopic eventStream = redisson.getShardedTopic("event-stream");

230

231

// Add multiple listeners - they'll be distributed across shards

232

for (int i = 0; i < 5; i++) {

233

final int listenerId = i;

234

eventStream.addListener(String.class, (channel, message) -> {

235

System.out.println("Listener " + listenerId + " received: " + message);

236

});

237

}

238

239

// Messages are distributed across all listeners/shards

240

for (int i = 0; i < 100; i++) {

241

eventStream.publish("Event " + i);

242

}

243

```

244

245

### Reliable Topics

246

247

Reliable topics provide guaranteed message delivery with acknowledgment and replay capabilities.

248

249

```java { .api }

250

/**

251

* Get a reliable topic with guaranteed delivery

252

* @param name - unique name of the reliable topic

253

* @return RReliableTopic instance

254

*/

255

public RReliableTopic getReliableTopic(String name);

256

public RReliableTopic getReliableTopic(String name, Codec codec);

257

public RReliableTopic getReliableTopic(PlainOptions options);

258

```

259

260

**Reliable Topic Interface:**

261

262

```java { .api }

263

public interface RReliableTopic extends RReliableTopicAsync {

264

// Publishing with confirmation

265

long publish(Object message);

266

267

// Subscribing with acknowledgment

268

String addListener(MessageListener<Object> listener);

269

String addListener(Class<?> type, MessageListener<?> listener);

270

271

// Listener management with IDs

272

void removeListener(String listenerId);

273

void removeAllListeners();

274

275

// Message acknowledgment and replay

276

long size();

277

List<Object> readAll();

278

void expire(long timeToLive, TimeUnit timeUnit);

279

void expireAt(long timestamp);

280

long remainTimeToLive();

281

}

282

```

283

284

**Usage Examples:**

285

286

```java

287

// Reliable topic ensures message delivery

288

RReliableTopic reliableTopic = redisson.getReliableTopic("critical-events");

289

290

// Subscribe with guaranteed delivery

291

String listenerId = reliableTopic.addListener(String.class, (channel, message) -> {

292

System.out.println("Processing critical event: " + message);

293

// Message is automatically acknowledged after successful processing

294

});

295

296

// Publish critical messages

297

reliableTopic.publish("System alert: High CPU usage");

298

reliableTopic.publish("System alert: Low disk space");

299

300

// Check message queue size

301

long pendingMessages = reliableTopic.size();

302

System.out.println("Pending messages: " + pendingMessages);

303

304

// Read all unprocessed messages (useful for debugging)

305

List<Object> allMessages = reliableTopic.readAll();

306

System.out.println("All messages in queue: " + allMessages);

307

308

// Set expiration for old messages

309

reliableTopic.expire(1, TimeUnit.HOURS); // Messages expire after 1 hour

310

311

// Multiple reliable listeners

312

RReliableTopic orderTopic = redisson.getReliableTopic("order-processing");

313

314

String processor1 = orderTopic.addListener(Order.class, (channel, order) -> {

315

System.out.println("Processor 1 handling order: " + order.getId());

316

processOrder(order);

317

});

318

319

String processor2 = orderTopic.addListener(Order.class, (channel, order) -> {

320

System.out.println("Processor 2 handling order: " + order.getId());

321

processOrder(order);

322

});

323

324

// Orders are reliably delivered to all processors

325

orderTopic.publish(new Order("123", "Product A"));

326

orderTopic.publish(new Order("124", "Product B"));

327

```

328

329

### Async Topic Operations

330

331

All topic operations have async variants returning `RFuture<T>`.

332

333

```java { .api }

334

// Async topic interface

335

public interface RTopicAsync extends RObjectAsync {

336

RFuture<Long> publishAsync(Object message);

337

RFuture<Integer> addListenerAsync(MessageListener<Object> listener);

338

RFuture<Integer> addListenerAsync(Class<?> type, MessageListener<?> listener);

339

RFuture<Void> removeListenerAsync(int listenerId);

340

RFuture<Void> removeAllListenersAsync();

341

RFuture<Long> countListenersAsync();

342

RFuture<Long> countSubscribersAsync();

343

}

344

345

// Async pattern topic interface

346

public interface RPatternTopicAsync extends RObjectAsync {

347

RFuture<Integer> addListenerAsync(PatternMessageListener<Object> listener);

348

RFuture<Integer> addListenerAsync(Class<?> type, PatternMessageListener<?> listener);

349

RFuture<Void> removeListenerAsync(int listenerId);

350

RFuture<Void> removeAllListenersAsync();

351

RFuture<Long> countListenersAsync();

352

}

353

354

// Async reliable topic interface

355

public interface RReliableTopicAsync extends RObjectAsync, RExpirableAsync {

356

RFuture<Long> publishAsync(Object message);

357

RFuture<String> addListenerAsync(MessageListener<Object> listener);

358

RFuture<String> addListenerAsync(Class<?> type, MessageListener<?> listener);

359

RFuture<Void> removeListenerAsync(String listenerId);

360

RFuture<Void> removeAllListenersAsync();

361

RFuture<Long> sizeAsync();

362

RFuture<List<Object>> readAllAsync();

363

}

364

```

365

366

**Async Usage Examples:**

367

368

```java

369

// Async topic operations

370

RTopicAsync asyncTopic = redisson.getTopic("async-events");

371

372

// Async subscribe

373

RFuture<Integer> listenerFuture = asyncTopic.addListenerAsync(String.class, (channel, message) -> {

374

System.out.println("Async message: " + message);

375

});

376

377

listenerFuture.whenComplete((listenerId, error) -> {

378

if (error == null) {

379

System.out.println("Listener added with ID: " + listenerId);

380

381

// Async publish after successful subscription

382

asyncTopic.publishAsync("Hello async world!")

383

.whenComplete((subscribers, publishError) -> {

384

if (publishError == null) {

385

System.out.println("Message sent to " + subscribers + " subscribers");

386

} else {

387

System.err.println("Publish failed: " + publishError.getMessage());

388

}

389

});

390

} else {

391

System.err.println("Failed to add listener: " + error.getMessage());

392

}

393

});

394

395

// Chain async operations

396

RFuture<Long> chainedOperation = asyncTopic.countSubscribersAsync()

397

.thenCompose(count -> {

398

System.out.println("Current subscribers: " + count);

399

return asyncTopic.publishAsync("Subscriber count: " + count);

400

})

401

.thenCompose(delivered -> {

402

System.out.println("Message delivered to: " + delivered);

403

return asyncTopic.countListenersAsync();

404

});

405

406

chainedOperation.whenComplete((listeners, error) -> {

407

if (error == null) {

408

System.out.println("Total listeners: " + listeners);

409

} else {

410

System.err.println("Operation chain failed: " + error.getMessage());

411

}

412

});

413

```

414

415

### Message Filtering and Transformation

416

417

Advanced messaging patterns with filtering and transformation capabilities.

418

419

```java { .api }

420

// Message filtering example

421

public class MessageFiltering {

422

423

public static void setupFilteredTopic(RedissonClient redisson) {

424

RTopic topic = redisson.getTopic("filtered-events");

425

426

// Filter messages by type

427

topic.addListener(String.class, (channel, message) -> {

428

if (message.startsWith("URGENT:")) {

429

handleUrgentMessage(message);

430

}

431

});

432

433

topic.addListener(String.class, (channel, message) -> {

434

if (message.startsWith("INFO:")) {

435

handleInfoMessage(message);

436

}

437

});

438

}

439

440

// Message transformation example

441

public static void setupTransformingTopic(RedissonClient redisson) {

442

RTopic<Map<String, Object>> eventTopic = redisson.getTopic("raw-events");

443

RTopic<ProcessedEvent> processedTopic = redisson.getTopic("processed-events");

444

445

// Transform and republish messages

446

eventTopic.addListener(Map.class, (channel, rawEvent) -> {

447

ProcessedEvent processed = transformEvent(rawEvent);

448

processedTopic.publish(processed);

449

});

450

}

451

}

452

453

// Custom message types

454

public class ProcessedEvent {

455

private String id;

456

private String type;

457

private long timestamp;

458

private Map<String, Object> data;

459

460

// constructors, getters, setters...

461

}

462

```

463

464

## Topic Configuration Options

465

466

```java { .api }

467

// Pattern topic options

468

public class PatternTopicOptions extends PlainOptions {

469

private String pattern;

470

471

public PatternTopicOptions pattern(String pattern);

472

public String getPattern();

473

}

474

475

// Topic listener configuration

476

public class TopicListener<M> {

477

private final Class<M> messageClass;

478

private final MessageListener<M> listener;

479

private final boolean removeOnError;

480

481

public TopicListener(Class<M> messageClass, MessageListener<M> listener);

482

public TopicListener(Class<M> messageClass, MessageListener<M> listener, boolean removeOnError);

483

484

public Class<M> getMessageClass();

485

public MessageListener<M> getListener();

486

public boolean isRemoveOnError();

487

}

488

489

// Reliable topic configuration

490

public class ReliableTopicOptions extends PlainOptions {

491

private long watchdogTimeout = 10 * 60000; // 10 minutes

492

private int batchSize = 100;

493

494

public ReliableTopicOptions watchdogTimeout(long watchdogTimeout, TimeUnit timeUnit);

495

public ReliableTopicOptions batchSize(int batchSize);

496

497

public long getWatchdogTimeout();

498

public int getBatchSize();

499

}

500

```

501

502

**Configuration Examples:**

503

504

```java

505

// Configure pattern topic

506

PatternTopicOptions patternOptions = new PatternTopicOptions()

507

.pattern("system.*.alerts")

508

.codec(new JsonJacksonCodec());

509

510

RPatternTopic patternTopic = redisson.getPatternTopic(patternOptions);

511

512

// Configure reliable topic with custom settings

513

ReliableTopicOptions reliableOptions = new ReliableTopicOptions()

514

.watchdogTimeout(30, TimeUnit.MINUTES)

515

.batchSize(50)

516

.codec(new KryoCodec());

517

518

RReliableTopic reliableTopic = redisson.getReliableTopic(reliableOptions);

519

520

// Error handling for listeners

521

RTopic errorHandlingTopic = redisson.getTopic("error-prone-events");

522

523

errorHandlingTopic.addListener(String.class, (channel, message) -> {

524

try {

525

processMessage(message);

526

} catch (Exception e) {

527

System.err.println("Error processing message: " + e.getMessage());

528

// Message processing failed but listener remains active

529

}

530

});

531

```