or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actuator.mdconfiguration.mdcore-messaging.mdindex.mdlisteners.mdstreams.md

streams.mddocs/

0

# Stream Support

1

2

Auto-configuration for RabbitMQ Streams, providing high-throughput persistent messaging with replay capabilities for building event-driven architectures and streaming data pipelines.

3

4

## Capabilities

5

6

### RabbitMQ Streams Overview

7

8

RabbitMQ Streams are a persistent messaging protocol designed for high-throughput scenarios where message replay and persistence are important. Unlike traditional AMQP queues, streams maintain message order and allow consumers to read from any point in the stream.

9

10

```java { .api }

11

/**

12

* Stream auto-configuration

13

*/

14

@Configuration(proxyBeanMethods = false)

15

@ConditionalOnClass(RabbitStreamTemplate.class)

16

public class RabbitStreamConfiguration {

17

18

/** Configure stream template */

19

@Bean

20

@ConditionalOnMissingBean

21

public RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties);

22

23

/** Create stream template */

24

@Bean

25

@ConditionalOnMissingBean

26

public RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment,

27

RabbitStreamTemplateConfigurer configurer);

28

}

29

```

30

31

### Stream Template

32

33

Primary interface for producing messages to RabbitMQ streams with support for message routing and producer confirmation.

34

35

```java { .api }

36

/**

37

* Template for RabbitMQ Stream operations

38

*/

39

public class RabbitStreamTemplate implements RabbitStreamOperations {

40

41

/** Send message to stream */

42

public MessageBuilder send(Message message);

43

44

/** Send message with routing key */

45

public MessageBuilder send(Message message, String routingKey);

46

47

/** Convert and send object */

48

public MessageBuilder convertAndSend(Object message);

49

50

/** Convert and send with routing key */

51

public MessageBuilder convertAndSend(Object message, String routingKey);

52

53

/** Convert and send with properties */

54

public MessageBuilder convertAndSend(Object message, MessageProperties properties);

55

56

/** Message builder for fluent API */

57

public interface MessageBuilder {

58

/** Set routing key */

59

MessageBuilder to(String routingKey);

60

61

/** Set message properties */

62

MessageBuilder withProperties(MessageProperties properties);

63

64

/** Send synchronously */

65

void send();

66

67

/** Send asynchronously */

68

CompletableFuture<Void> sendAsync();

69

}

70

}

71

```

72

73

**Stream Producer Example:**

74

75

```java

76

import org.springframework.amqp.rabbit.stream.producer.RabbitStreamTemplate;

77

import org.springframework.beans.factory.annotation.Autowired;

78

import org.springframework.stereotype.Service;

79

80

@Service

81

public class StreamProducerService {

82

83

@Autowired

84

private RabbitStreamTemplate streamTemplate;

85

86

public void publishEvent(String streamName, Object event) {

87

// Send to stream with routing key

88

streamTemplate.convertAndSend(event, streamName);

89

}

90

91

public CompletableFuture<Void> publishEventAsync(String streamName, Object event) {

92

// Asynchronous send

93

return streamTemplate.convertAndSend(event)

94

.to(streamName)

95

.sendAsync();

96

}

97

98

public void publishWithCustomProperties(String streamName, Object event, String correlationId) {

99

MessageProperties properties = new MessageProperties();

100

properties.setCorrelationId(correlationId);

101

properties.setTimestamp(new Date());

102

103

streamTemplate.convertAndSend(event, properties)

104

.to(streamName)

105

.send();

106

}

107

}

108

```

109

110

### Stream Consumers

111

112

Consumer configuration for reading messages from RabbitMQ streams with offset management and replay capabilities.

113

114

```java { .api }

115

/**

116

* Stream listener annotation

117

*/

118

@Target({ElementType.METHOD})

119

@Retention(RetentionPolicy.RUNTIME)

120

public @interface RabbitStreamListener {

121

122

/** Stream name to consume from */

123

String[] queues() default {};

124

125

/** Consumer group for offset management */

126

String group() default "";

127

128

/** Offset specification (first, last, timestamp, offset) */

129

String offset() default "";

130

131

/** Container factory */

132

String containerFactory() default "";

133

134

/** Auto startup */

135

String autoStartup() default "";

136

137

/** Concurrency */

138

String concurrency() default "";

139

}

140

141

/**

142

* Stream message context for manual offset management

143

*/

144

public class StreamMessageContext {

145

146

/** Get message offset */

147

public long getOffset();

148

149

/** Get stream name */

150

public String getStream();

151

152

/** Get timestamp */

153

public long getTimestamp();

154

155

/** Manual acknowledgment */

156

public void ack();

157

}

158

```

159

160

**Stream Consumer Examples:**

161

162

```java

163

import org.springframework.amqp.rabbit.annotation.RabbitStreamListener;

164

import org.springframework.stereotype.Component;

165

166

@Component

167

public class StreamConsumers {

168

169

// Basic stream consumer

170

@RabbitStreamListener(queues = "events.stream")

171

public void handleStreamEvent(String message) {

172

System.out.println("Received stream message: " + message);

173

}

174

175

// Consumer with group (for offset management)

176

@RabbitStreamListener(queues = "user.events", group = "analytics-service")

177

public void handleUserEvents(UserEvent event) {

178

// Process user event

179

analyticsService.processUserEvent(event);

180

}

181

182

// Consumer starting from beginning

183

@RabbitStreamListener(queues = "audit.stream", offset = "first")

184

public void replayAuditEvents(AuditEvent event) {

185

// Replay all audit events from beginning

186

auditProcessor.reprocess(event);

187

}

188

189

// Consumer starting from specific timestamp

190

@RabbitStreamListener(queues = "transactions.stream",

191

offset = "timestamp:2023-01-01T00:00:00Z")

192

public void processTransactionsFrom(TransactionEvent transaction) {

193

// Process transactions from specific date

194

transactionProcessor.process(transaction);

195

}

196

197

// Manual offset management

198

@RabbitStreamListener(queues = "critical.stream")

199

public void handleCriticalEvents(String message, StreamMessageContext context) {

200

try {

201

processCriticalMessage(message);

202

// Manually acknowledge after successful processing

203

context.ack();

204

} catch (Exception e) {

205

// Don't ack on error - message will be redelivered to other consumers

206

log.error("Failed to process message at offset {}", context.getOffset(), e);

207

}

208

}

209

}

210

```

211

212

### Stream Configuration Properties

213

214

Configuration properties specific to RabbitMQ Streams under the `spring.rabbitmq.stream` prefix.

215

216

```java { .api }

217

/**

218

* Stream-specific configuration properties

219

*/

220

public static class Stream {

221

222

/** Stream host (defaults to main RabbitMQ host) */

223

private String host;

224

225

/** Stream port (default: 5552) */

226

private int port = 5552;

227

228

/** Stream username */

229

private String username;

230

231

/** Stream password */

232

private String password;

233

234

/** Stream name pattern */

235

private String name;

236

237

/** Stream environment configuration */

238

private final Environment environment = new Environment();

239

240

/**

241

* Stream environment configuration

242

*/

243

public static class Environment {

244

245

/** Maximum frame size */

246

private DataSize maxFrameSize;

247

248

/** Heartbeat interval */

249

private Duration heartbeat;

250

251

/** Connection timeout */

252

private Duration connectionTimeout;

253

254

/** Recovery back-off delay */

255

private Duration recoveryBackOffDelay;

256

257

/** Topology recovery enabled */

258

private Boolean topologyRecovery;

259

}

260

}

261

```

262

263

**Stream Configuration Example:**

264

265

```yaml

266

spring:

267

rabbitmq:

268

# Main RabbitMQ connection

269

host: rabbitmq.example.com

270

username: myapp

271

password: ${RABBITMQ_PASSWORD}

272

273

# Stream-specific configuration

274

stream:

275

host: ${spring.rabbitmq.host} # Use same host

276

port: 5552

277

username: ${spring.rabbitmq.username}

278

password: ${spring.rabbitmq.password}

279

environment:

280

max-frame-size: 1MB

281

heartbeat: 60s

282

connection-timeout: 30s

283

recovery-back-off-delay: 5s

284

topology-recovery: true

285

```

286

287

### Stream Container Factory

288

289

Factory configuration for stream listener containers with performance tuning options.

290

291

```java { .api }

292

/**

293

* Stream container factory configuration

294

*/

295

public static class StreamContainer {

296

297

/** Auto startup */

298

private boolean autoStartup = true;

299

300

/** Native listener */

301

private boolean nativeListener;

302

303

/** Retry configuration */

304

private final ListenerRetry retry = new ListenerRetry();

305

}

306

307

/**

308

* Stream listener container factory

309

*/

310

@Bean

311

@ConditionalOnMissingBean

312

public StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(

313

Environment rabbitStreamEnvironment,

314

StreamRabbitListenerContainerFactoryConfigurer configurer) {

315

316

StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();

317

configurer.configure(factory, rabbitStreamEnvironment);

318

return factory;

319

}

320

```

321

322

### Stream Administrative Operations

323

324

Administrative operations for managing streams, including creation, deletion, and metadata queries.

325

326

```java { .api }

327

/**

328

* Stream administrative operations

329

*/

330

public interface StreamAdmin {

331

332

/** Create a stream */

333

void createStream(String name, StreamCreationOptions options);

334

335

/** Delete a stream */

336

void deleteStream(String name);

337

338

/** Check if stream exists */

339

boolean streamExists(String name);

340

341

/** Get stream metadata */

342

StreamMetadata getStreamMetadata(String name);

343

344

/** Get stream statistics */

345

StreamStatistics getStreamStatistics(String name);

346

}

347

348

/**

349

* Stream creation options

350

*/

351

public class StreamCreationOptions {

352

353

/** Maximum age of messages */

354

private Duration maxAge;

355

356

/** Maximum size of stream */

357

private DataSize maxLength;

358

359

/** Maximum segment size */

360

private DataSize maxSegmentSize;

361

362

/** Leader locator strategy */

363

private String leaderLocator;

364

365

/** Initial cluster size */

366

private int initialClusterSize;

367

}

368

```

369

370

**Stream Administration Example:**

371

372

```java

373

import org.springframework.context.annotation.Bean;

374

import org.springframework.context.annotation.Configuration;

375

376

@Configuration

377

public class StreamAdminConfig {

378

379

@Bean

380

public StreamAdmin streamAdmin(Environment environment) {

381

return new StreamAdmin(environment);

382

}

383

384

@PostConstruct

385

public void setupStreams() {

386

// Create streams with retention policies

387

StreamCreationOptions options = new StreamCreationOptions()

388

.maxAge(Duration.ofDays(7)) // Keep messages for 7 days

389

.maxLength(DataSize.ofGigabytes(10)) // Max 10GB

390

.maxSegmentSize(DataSize.ofMegabytes(500)); // 500MB segments

391

392

if (!streamAdmin.streamExists("events.stream")) {

393

streamAdmin.createStream("events.stream", options);

394

}

395

396

if (!streamAdmin.streamExists("audit.stream")) {

397

// Audit stream with longer retention

398

StreamCreationOptions auditOptions = new StreamCreationOptions()

399

.maxAge(Duration.ofDays(365)) // Keep for 1 year

400

.maxLength(DataSize.ofGigabytes(100));

401

streamAdmin.createStream("audit.stream", auditOptions);

402

}

403

}

404

}

405

```

406

407

### Stream Performance Considerations

408

409

Configuration and best practices for optimal stream performance.

410

411

```java { .api }

412

/**

413

* Stream performance configuration

414

*/

415

@Configuration

416

public class StreamPerformanceConfig {

417

418

@Bean

419

@Primary

420

public RabbitStreamTemplate optimizedStreamTemplate(Environment environment) {

421

RabbitStreamTemplate template = new RabbitStreamTemplate(environment);

422

423

// Configure for high throughput

424

template.setProducerCustomizer(producer -> {

425

producer.batchSize(100); // Batch messages

426

producer.batchPublishingDelay(Duration.ofMillis(10)); // Small delay for batching

427

producer.compression(CompressionType.GZIP); // Compress messages

428

});

429

430

return template;

431

}

432

433

@Bean

434

public StreamRabbitListenerContainerFactory highThroughputContainerFactory(Environment environment) {

435

StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();

436

factory.setEnvironment(environment);

437

438

// Configure for high throughput consumption

439

factory.setConsumerCustomizer(consumer -> {

440

consumer.offset(OffsetSpecification.last()); // Start from latest

441

consumer.manualTrackingStrategy(); // Manual offset tracking

442

});

443

444

return factory;

445

}

446

}

447

```

448

449

### Stream Environment Customization

450

451

Spring Boot provides interfaces for customizing the RabbitMQ Stream environment.

452

453

```java { .api }

454

/**

455

* Callback interface for customizing the Stream EnvironmentBuilder

456

*/

457

@FunctionalInterface

458

public interface EnvironmentBuilderCustomizer {

459

460

/** Customize the EnvironmentBuilder */

461

void customize(EnvironmentBuilder builder);

462

}

463

464

/**

465

* Callback interface for customizing Stream producers

466

*/

467

@FunctionalInterface

468

public interface ProducerCustomizer {

469

470

/** Customize a stream producer */

471

void customize(Producer producer);

472

}

473

474

/**

475

* Callback interface for customizing Stream consumers

476

*/

477

@FunctionalInterface

478

public interface ConsumerCustomizer {

479

480

/** Customize a stream consumer */

481

void customize(Consumer consumer);

482

}

483

```

484

485

**Environment Customization Example:**

486

487

```java

488

import org.springframework.boot.autoconfigure.amqp.EnvironmentBuilderCustomizer;

489

import org.springframework.rabbit.stream.producer.ProducerCustomizer;

490

491

@Configuration

492

public class StreamEnvironmentConfig {

493

494

@Bean

495

public EnvironmentBuilderCustomizer environmentBuilderCustomizer() {

496

return builder -> {

497

builder.lazyInitialization(true)

498

.addressResolver(address -> address) // Custom address resolution

499

.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixedWithInitialDelayDelayPolicy(

500

Duration.ofSeconds(1), Duration.ofSeconds(10)));

501

};

502

}

503

504

@Bean

505

public ProducerCustomizer producerCustomizer() {

506

return producer -> {

507

producer.batchSize(50)

508

.batchPublishingDelay(Duration.ofMillis(100))

509

.maxUnconfirmedMessages(1000);

510

};

511

}

512

}

513

```