or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdclient-side-caching.mdclustering.mdcommands-operations.mdconnection-management.mdcore-clients.mdexceptions.mdindex.mdmodules.mdparameters.mdpubsub.mdtransactions-pipelining.md

pubsub.mddocs/

0

# Pub/Sub Messaging

1

2

This document covers Redis publish/subscribe messaging functionality, including channel subscriptions, pattern matching, message handling, and both text and binary message support.

3

4

## Core Pub/Sub Classes

5

6

### JedisPubSub

7

8

Abstract class for handling Redis pub/sub messages with string-based channels and messages.

9

10

```java { .api }

11

public abstract class JedisPubSub {

12

/**

13

* Called when a message is received on a subscribed channel

14

* @param channel Channel name that received the message

15

* @param message Message content

16

*/

17

public abstract void onMessage(String channel, String message);

18

19

/**

20

* Called when a message is received matching a pattern subscription

21

* @param pattern Pattern that matched

22

* @param channel Actual channel name

23

* @param message Message content

24

*/

25

public abstract void onPMessage(String pattern, String channel, String message);

26

27

/**

28

* Called when successfully subscribed to a channel

29

* @param channel Channel name

30

* @param subscribedChannels Total number of subscribed channels

31

*/

32

public abstract void onSubscribe(String channel, int subscribedChannels);

33

34

/**

35

* Called when successfully subscribed to a pattern

36

* @param pattern Pattern subscribed to

37

* @param subscribedChannels Total number of subscribed patterns

38

*/

39

public abstract void onPSubscribe(String pattern, int subscribedChannels);

40

41

/**

42

* Called when unsubscribed from a channel

43

* @param channel Channel name

44

* @param subscribedChannels Remaining subscribed channels

45

*/

46

public abstract void onUnsubscribe(String channel, int subscribedChannels);

47

48

/**

49

* Called when unsubscribed from a pattern

50

* @param pattern Pattern unsubscribed from

51

* @param subscribedChannels Remaining subscribed patterns

52

*/

53

public abstract void onPUnsubscribe(String pattern, int subscribedChannels);

54

55

/**

56

* Subscribe to channels

57

* @param channels Channel names to subscribe to

58

*/

59

public void subscribe(String... channels);

60

61

/**

62

* Subscribe to patterns

63

* @param patterns Pattern strings to subscribe to

64

*/

65

public void psubscribe(String... patterns);

66

67

/**

68

* Unsubscribe from channels

69

* @param channels Channel names to unsubscribe from (empty = all)

70

*/

71

public void unsubscribe(String... channels);

72

73

/**

74

* Unsubscribe from patterns

75

* @param patterns Pattern strings to unsubscribe from (empty = all)

76

*/

77

public void punsubscribe(String... patterns);

78

79

/**

80

* Check if subscribed to any channels or patterns

81

* @return true if subscribed

82

*/

83

public boolean isSubscribed();

84

85

/**

86

* Get number of subscribed channels

87

* @return Number of channel subscriptions

88

*/

89

public int getSubscribedChannels();

90

91

/**

92

* Get number of subscribed patterns

93

* @return Number of pattern subscriptions

94

*/

95

public int getSubscribedPatterns();

96

}

97

```

98

99

#### Usage Example

100

101

```java

102

class MessageHandler extends JedisPubSub {

103

@Override

104

public void onMessage(String channel, String message) {

105

System.out.println("Received on " + channel + ": " + message);

106

107

// Handle different message types

108

if (channel.equals("notifications")) {

109

handleNotification(message);

110

} else if (channel.equals("events")) {

111

handleEvent(message);

112

}

113

}

114

115

@Override

116

public void onPMessage(String pattern, String channel, String message) {

117

System.out.println("Pattern " + pattern + " matched " + channel + ": " + message);

118

}

119

120

@Override

121

public void onSubscribe(String channel, int subscribedChannels) {

122

System.out.println("Subscribed to " + channel +

123

" (total subscriptions: " + subscribedChannels + ")");

124

}

125

126

@Override

127

public void onPSubscribe(String pattern, int subscribedChannels) {

128

System.out.println("Subscribed to pattern " + pattern);

129

}

130

131

@Override

132

public void onUnsubscribe(String channel, int subscribedChannels) {

133

System.out.println("Unsubscribed from " + channel);

134

}

135

136

@Override

137

public void onPUnsubscribe(String pattern, int subscribedChannels) {

138

System.out.println("Unsubscribed from pattern " + pattern);

139

}

140

141

private void handleNotification(String message) {

142

// Process notification

143

}

144

145

private void handleEvent(String message) {

146

// Process event

147

}

148

}

149

150

// Usage

151

Jedis subscriberJedis = new Jedis("localhost", 6379);

152

MessageHandler handler = new MessageHandler();

153

154

// Subscribe in separate thread (blocking operation)

155

new Thread(() -> {

156

try {

157

subscriberJedis.subscribe(handler, "notifications", "events", "alerts");

158

} catch (Exception e) {

159

e.printStackTrace();

160

} finally {

161

subscriberJedis.close();

162

}

163

}).start();

164

165

// Publish messages from another connection

166

Jedis publisherJedis = new Jedis("localhost", 6379);

167

publisherJedis.publish("notifications", "System maintenance scheduled");

168

publisherJedis.publish("events", "User login: user123");

169

publisherJedis.close();

170

171

// Unsubscribe when done

172

handler.unsubscribe("alerts"); // Unsubscribe from specific channel

173

// handler.unsubscribe(); // Unsubscribe from all channels

174

```

175

176

### BinaryJedisPubSub

177

178

Abstract class for handling pub/sub messages with binary data support.

179

180

```java { .api }

181

public abstract class BinaryJedisPubSub {

182

/**

183

* Called when a binary message is received on a subscribed channel

184

* @param channel Channel name as bytes

185

* @param message Binary message content

186

*/

187

public abstract void onMessage(byte[] channel, byte[] message);

188

189

/**

190

* Called when a binary message matches a pattern subscription

191

* @param pattern Pattern as bytes

192

* @param channel Channel name as bytes

193

* @param message Binary message content

194

*/

195

public abstract void onPMessage(byte[] pattern, byte[] channel, byte[] message);

196

197

/**

198

* Called when successfully subscribed to a binary channel

199

* @param channel Channel name as bytes

200

* @param subscribedChannels Total subscribed channels

201

*/

202

public abstract void onSubscribe(byte[] channel, int subscribedChannels);

203

204

/**

205

* Called when successfully subscribed to a binary pattern

206

* @param pattern Pattern as bytes

207

* @param subscribedChannels Total subscribed patterns

208

*/

209

public abstract void onPSubscribe(byte[] pattern, int subscribedChannels);

210

211

/**

212

* Called when unsubscribed from a binary channel

213

* @param channel Channel name as bytes

214

* @param subscribedChannels Remaining subscribed channels

215

*/

216

public abstract void onUnsubscribe(byte[] channel, int subscribedChannels);

217

218

/**

219

* Called when unsubscribed from a binary pattern

220

* @param pattern Pattern as bytes

221

* @param subscribedChannels Remaining subscribed patterns

222

*/

223

public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels);

224

225

/**

226

* Subscribe to binary channels

227

* @param channels Channel names as byte arrays

228

*/

229

public void subscribe(byte[]... channels);

230

231

/**

232

* Subscribe to binary patterns

233

* @param patterns Pattern byte arrays

234

*/

235

public void psubscribe(byte[]... patterns);

236

237

/**

238

* Unsubscribe from binary channels

239

* @param channels Channel names as byte arrays

240

*/

241

public void unsubscribe(byte[]... channels);

242

243

/**

244

* Unsubscribe from binary patterns

245

* @param patterns Pattern byte arrays

246

*/

247

public void punsubscribe(byte[]... patterns);

248

249

/**

250

* Check if subscribed to any channels or patterns

251

* @return true if subscribed

252

*/

253

public boolean isSubscribed();

254

}

255

```

256

257

#### Usage Example

258

259

```java

260

class BinaryMessageHandler extends BinaryJedisPubSub {

261

@Override

262

public void onMessage(byte[] channel, byte[] message) {

263

String channelStr = new String(channel, StandardCharsets.UTF_8);

264

265

if (channelStr.equals("image_data")) {

266

handleImageData(message);

267

} else if (channelStr.equals("file_uploads")) {

268

handleFileUpload(message);

269

}

270

}

271

272

@Override

273

public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {

274

// Handle pattern-matched binary messages

275

String patternStr = new String(pattern, StandardCharsets.UTF_8);

276

String channelStr = new String(channel, StandardCharsets.UTF_8);

277

System.out.println("Binary pattern " + patternStr + " matched " + channelStr);

278

}

279

280

@Override

281

public void onSubscribe(byte[] channel, int subscribedChannels) {

282

String channelStr = new String(channel, StandardCharsets.UTF_8);

283

System.out.println("Subscribed to binary channel: " + channelStr);

284

}

285

286

// ... other callback implementations

287

288

private void handleImageData(byte[] imageBytes) {

289

// Process binary image data

290

System.out.println("Received image data: " + imageBytes.length + " bytes");

291

}

292

293

private void handleFileUpload(byte[] fileData) {

294

// Process binary file upload

295

System.out.println("Received file upload: " + fileData.length + " bytes");

296

}

297

}

298

299

// Usage

300

Jedis subscriberJedis = new Jedis("localhost", 6379);

301

BinaryMessageHandler binaryHandler = new BinaryMessageHandler();

302

303

new Thread(() -> {

304

try {

305

subscriberJedis.subscribe(binaryHandler,

306

"image_data".getBytes(),

307

"file_uploads".getBytes());

308

} finally {

309

subscriberJedis.close();

310

}

311

}).start();

312

313

// Publish binary data

314

Jedis publisherJedis = new Jedis("localhost", 6379);

315

byte[] imageData = loadImageFile();

316

publisherJedis.publish("image_data".getBytes(), imageData);

317

publisherJedis.close();

318

```

319

320

## Pub/Sub Commands

321

322

### Publishing Messages

323

324

Core publishing commands available on any Jedis client.

325

326

```java { .api }

327

public interface JedisCommands {

328

/**

329

* Publish message to channel

330

* @param channel Channel name

331

* @param message Message content

332

* @return Number of clients that received the message

333

*/

334

Long publish(String channel, String message);

335

336

/**

337

* Publish binary message to channel

338

* @param channel Channel name as bytes

339

* @param message Binary message content

340

* @return Number of clients that received the message

341

*/

342

Long publish(byte[] channel, byte[] message);

343

344

/**

345

* Get number of subscribers for channels

346

* @param channels Channel names

347

* @return List of subscriber counts for each channel

348

*/

349

List<Long> pubsubNumSub(String... channels);

350

351

/**

352

* Get number of subscriptions to patterns

353

* @return Number of pattern subscriptions

354

*/

355

Long pubsubNumPat();

356

357

/**

358

* Get number of subscribers for binary channels

359

* @param channels Channel names as bytes

360

* @return List of subscriber counts

361

*/

362

List<Long> pubsubNumSub(byte[]... channels);

363

364

/**

365

* List active channels

366

* @return List of channels with at least one subscriber

367

*/

368

List<String> pubsubChannels();

369

370

/**

371

* List active channels matching pattern

372

* @param pattern Channel pattern

373

* @return List of matching channels with subscribers

374

*/

375

List<String> pubsubChannels(String pattern);

376

377

/**

378

* Get detailed pubsub information for sharded channels

379

* @param channels Shard channel names

380

* @return List of shard channel subscriber counts

381

*/

382

List<Long> pubsubShardNumSub(String... channels);

383

384

/**

385

* List active shard channels

386

* @return List of shard channels with subscribers

387

*/

388

List<String> pubsubShardChannels();

389

390

/**

391

* List active shard channels matching pattern

392

* @param pattern Shard channel pattern

393

* @return List of matching shard channels

394

*/

395

List<String> pubsubShardChannels(String pattern);

396

}

397

```

398

399

### Subscription Management

400

401

Methods available during pub/sub subscriptions.

402

403

```java { .api }

404

// Available in JedisPubSub and BinaryJedisPubSub

405

public void subscribe(String... channels); // Subscribe to channels

406

public void psubscribe(String... patterns); // Subscribe to patterns

407

public void unsubscribe(String... channels); // Unsubscribe from channels

408

public void punsubscribe(String... patterns); // Unsubscribe from patterns

409

```

410

411

#### Usage Example

412

413

```java

414

// Publisher

415

Jedis publisher = new Jedis("localhost", 6379);

416

417

// Publish to different channels

418

Long subscribers1 = publisher.publish("news", "Breaking news update");

419

Long subscribers2 = publisher.publish("chat:room1", "Hello everyone!");

420

Long subscribers3 = publisher.publish("alerts", "System maintenance in 1 hour");

421

422

System.out.println("News channel has " + subscribers1 + " subscribers");

423

424

// Check channel activity

425

List<String> activeChannels = publisher.pubsubChannels();

426

System.out.println("Active channels: " + activeChannels);

427

428

// Get subscriber counts

429

List<Long> counts = publisher.pubsubNumSub("news", "chat:room1", "alerts");

430

System.out.println("Subscriber counts: " + counts);

431

432

// Get pattern subscription count

433

Long patternSubs = publisher.pubsubNumPat();

434

System.out.println("Pattern subscriptions: " + patternSubs);

435

436

publisher.close();

437

```

438

439

## Advanced Pub/Sub Features

440

441

### Sharded Pub/Sub

442

443

Redis 7.0+ feature for sharded pub/sub that scales across cluster nodes.

444

445

```java { .api }

446

public interface JedisCommands {

447

/**

448

* Publish message to sharded channel

449

* @param shardChannel Shard channel name

450

* @param message Message content

451

* @return Number of clients that received the message

452

*/

453

Long spublish(String shardChannel, String message);

454

455

/**

456

* Subscribe to sharded channels

457

* @param jedisPubSub Message handler

458

* @param shardChannels Shard channel names

459

*/

460

void ssubscribe(JedisPubSub jedisPubSub, String... shardChannels);

461

462

/**

463

* Subscribe to binary sharded channels

464

* @param jedisPubSub Binary message handler

465

* @param shardChannels Shard channel names as bytes

466

*/

467

void ssubscribe(BinaryJedisPubSub jedisPubSub, byte[]... shardChannels);

468

}

469

```

470

471

### Pattern Subscriptions

472

473

Subscribe to channels using glob-style patterns.

474

475

```java { .api }

476

// Pattern examples:

477

// "news.*" - matches "news.sports", "news.politics", etc.

478

// "chat:*" - matches "chat:room1", "chat:room2", etc.

479

// "log:?:*" - matches "log:1:debug", "log:2:error", etc.

480

// "*" - matches all channels

481

482

class PatternSubscriber extends JedisPubSub {

483

@Override

484

public void onPMessage(String pattern, String channel, String message) {

485

System.out.println("Pattern: " + pattern);

486

System.out.println("Channel: " + channel);

487

System.out.println("Message: " + message);

488

489

// Route based on pattern

490

switch (pattern) {

491

case "news.*":

492

handleNewsMessage(channel, message);

493

break;

494

case "chat:*":

495

handleChatMessage(channel, message);

496

break;

497

case "error:*":

498

handleErrorMessage(channel, message);

499

break;

500

}

501

}

502

503

@Override

504

public void onMessage(String channel, String message) {

505

// Handle direct channel subscriptions

506

}

507

508

// ... other required methods

509

}

510

511

// Usage

512

PatternSubscriber subscriber = new PatternSubscriber();

513

Jedis jedis = new Jedis("localhost", 6379);

514

515

// Subscribe to patterns (blocking)

516

jedis.psubscribe(subscriber, "news.*", "chat:*", "error:*");

517

```

518

519

### Multi-Channel Publishing

520

521

Efficient publishing to multiple channels or patterns.

522

523

```java { .api }

524

// Publish to multiple related channels

525

Jedis publisher = new Jedis("localhost", 6379);

526

527

String eventData = "User logged in: user123";

528

529

// Publish to multiple channels for different consumers

530

publisher.publish("events", eventData); // General events

531

publisher.publish("user:events", eventData); // User-specific events

532

publisher.publish("audit:login", eventData); // Audit trail

533

publisher.publish("analytics:user", eventData); // Analytics

534

535

// Publish different messages to related channels

536

String baseChannel = "game:room1:";

537

publisher.publish(baseChannel + "chat", "Player joined the game");

538

publisher.publish(baseChannel + "state", "Game started");

539

publisher.publish(baseChannel + "score", "Score updated");

540

```

541

542

### Connection Management for Pub/Sub

543

544

Proper connection handling for pub/sub operations.

545

546

```java { .api }

547

public class PubSubManager {

548

private final Jedis subscriberJedis;

549

private final Jedis publisherJedis;

550

private final ExecutorService executor;

551

private volatile boolean running = true;

552

553

public PubSubManager() {

554

this.subscriberJedis = new Jedis("localhost", 6379);

555

this.publisherJedis = new Jedis("localhost", 6379);

556

this.executor = Executors.newCachedThreadPool();

557

}

558

559

public void startSubscribing(JedisPubSub pubSub, String... channels) {

560

executor.submit(() -> {

561

try {

562

subscriberJedis.subscribe(pubSub, channels);

563

} catch (Exception e) {

564

System.err.println("Subscription error: " + e.getMessage());

565

}

566

});

567

}

568

569

public void startPatternSubscribing(JedisPubSub pubSub, String... patterns) {

570

executor.submit(() -> {

571

try {

572

subscriberJedis.psubscribe(pubSub, patterns);

573

} catch (Exception e) {

574

System.err.println("Pattern subscription error: " + e.getMessage());

575

}

576

});

577

}

578

579

public Long publish(String channel, String message) {

580

return publisherJedis.publish(channel, message);

581

}

582

583

public void shutdown() {

584

running = false;

585

executor.shutdown();

586

587

try {

588

if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {

589

executor.shutdownNow();

590

}

591

} catch (InterruptedException e) {

592

executor.shutdownNow();

593

Thread.currentThread().interrupt();

594

}

595

596

subscriberJedis.close();

597

publisherJedis.close();

598

}

599

}

600

```

601

602

### Message Processing Patterns

603

604

Common patterns for processing pub/sub messages.

605

606

```java { .api }

607

// 1. Message routing based on channel

608

class ChannelRouter extends JedisPubSub {

609

private final Map<String, MessageProcessor> processors = new HashMap<>();

610

611

public void registerProcessor(String channel, MessageProcessor processor) {

612

processors.put(channel, processor);

613

}

614

615

@Override

616

public void onMessage(String channel, String message) {

617

MessageProcessor processor = processors.get(channel);

618

if (processor != null) {

619

processor.process(channel, message);

620

} else {

621

System.out.println("No processor for channel: " + channel);

622

}

623

}

624

625

// ... other required methods

626

}

627

628

// 2. JSON message deserialization

629

class JsonMessageHandler extends JedisPubSub {

630

private final ObjectMapper objectMapper = new ObjectMapper();

631

632

@Override

633

public void onMessage(String channel, String message) {

634

try {

635

switch (channel) {

636

case "user_events":

637

UserEvent event = objectMapper.readValue(message, UserEvent.class);

638

handleUserEvent(event);

639

break;

640

case "system_alerts":

641

SystemAlert alert = objectMapper.readValue(message, SystemAlert.class);

642

handleSystemAlert(alert);

643

break;

644

}

645

} catch (Exception e) {

646

System.err.println("Failed to parse message: " + e.getMessage());

647

}

648

}

649

650

// ... handler methods

651

}

652

653

// 3. Batch message processing

654

class BatchMessageProcessor extends JedisPubSub {

655

private final List<String> messageBuffer = new ArrayList<>();

656

private final int batchSize = 100;

657

private final ScheduledExecutorService scheduler =

658

Executors.newSingleThreadScheduledExecutor();

659

660

public BatchMessageProcessor() {

661

// Process batches every 5 seconds

662

scheduler.scheduleAtFixedRate(this::processBatch, 5, 5, TimeUnit.SECONDS);

663

}

664

665

@Override

666

public synchronized void onMessage(String channel, String message) {

667

messageBuffer.add(channel + ":" + message);

668

669

if (messageBuffer.size() >= batchSize) {

670

processBatch();

671

}

672

}

673

674

private synchronized void processBatch() {

675

if (!messageBuffer.isEmpty()) {

676

List<String> batch = new ArrayList<>(messageBuffer);

677

messageBuffer.clear();

678

679

// Process batch in background

680

CompletableFuture.runAsync(() -> {

681

processBatchAsync(batch);

682

});

683

}

684

}

685

686

private void processBatchAsync(List<String> messages) {

687

// Batch processing logic

688

System.out.println("Processing batch of " + messages.size() + " messages");

689

}

690

}

691

```

692

693

### Error Handling and Resilience

694

695

Best practices for robust pub/sub implementations.

696

697

```java { .api }

698

class ResilientSubscriber extends JedisPubSub {

699

private final AtomicBoolean connected = new AtomicBoolean(false);

700

private final int maxRetries = 3;

701

private volatile boolean shouldReconnect = true;

702

703

@Override

704

public void onSubscribe(String channel, int subscribedChannels) {

705

connected.set(true);

706

System.out.println("Successfully subscribed to " + channel);

707

}

708

709

@Override

710

public void onMessage(String channel, String message) {

711

try {

712

processMessage(channel, message);

713

} catch (Exception e) {

714

System.err.println("Error processing message from " + channel + ": " + e.getMessage());

715

// Could implement dead letter queue or retry logic here

716

}

717

}

718

719

public void handleConnectionLoss() {

720

connected.set(false);

721

722

if (shouldReconnect) {

723

reconnectWithBackoff();

724

}

725

}

726

727

private void reconnectWithBackoff() {

728

int attempt = 0;

729

while (attempt < maxRetries && shouldReconnect) {

730

try {

731

Thread.sleep(Math.min(1000 * (1 << attempt), 30000)); // Exponential backoff

732

733

Jedis jedis = new Jedis("localhost", 6379);

734

jedis.subscribe(this, "channel1", "channel2");

735

736

break; // Success

737

} catch (Exception e) {

738

attempt++;

739

System.err.println("Reconnection attempt " + attempt + " failed: " + e.getMessage());

740

}

741

}

742

}

743

744

public void shutdown() {

745

shouldReconnect = false;

746

unsubscribe(); // Unsubscribe from all channels

747

}

748

749

private void processMessage(String channel, String message) {

750

// Message processing logic with error handling

751

}

752

}

753

```

754

755

Redis pub/sub provides a powerful messaging system for real-time communication. Jedis offers comprehensive support for both simple and advanced pub/sub scenarios with proper error handling and connection management.