or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auto-configuration.mdclient-configuration.mdcodec-configuration.mdindex.mdmessage-handling.mdserver-configuration.md

message-handling.mddocs/

0

# Message Handling

1

2

Spring messaging integration enabling @MessageMapping annotations and bidirectional communication patterns in RSocket controllers.

3

4

## Capabilities

5

6

### RSocket Message Handler

7

8

Core message handler that processes @MessageMapping annotations and integrates with Spring's messaging infrastructure.

9

10

```java { .api }

11

/**

12

* Message handler for RSocket @MessageMapping support

13

*/

14

class RSocketMessageHandler extends MessagingMethodMessageHandler {

15

16

/** Set RSocket strategies for codec configuration */

17

void setRSocketStrategies(RSocketStrategies strategies);

18

19

/** Get configured RSocket strategies */

20

RSocketStrategies getRSocketStrategies();

21

22

/** Set default data MIME type */

23

void setDefaultDataMimeType(MimeType defaultDataMimeType);

24

25

/** Set default metadata MIME type */

26

void setDefaultMetadataMimeType(MimeType defaultMetadataMimeType);

27

28

/** Configure routing matcher for message routing */

29

void setRouteMatcher(RouteMatcher routeMatcher);

30

31

/** Create socket acceptor for server integration */

32

SocketAcceptor responder();

33

}

34

```

35

36

### Message Mapping Annotations

37

38

Core annotations for defining RSocket message handlers.

39

40

```java { .api }

41

/**

42

* Marks a method as an RSocket message handler

43

*/

44

@Target(ElementType.METHOD)

45

@Retention(RetentionPolicy.RUNTIME)

46

@interface MessageMapping {

47

/** Route patterns this handler responds to */

48

String[] value() default {};

49

}

50

51

/**

52

* Extracts values from route variables

53

*/

54

@Target(ElementType.PARAMETER)

55

@Retention(RetentionPolicy.RUNTIME)

56

@interface DestinationVariable {

57

/** Variable name to extract */

58

String value() default "";

59

}

60

61

/**

62

* Accesses message headers/metadata

63

*/

64

@Target(ElementType.PARAMETER)

65

@Retention(RetentionPolicy.RUNTIME)

66

@interface Header {

67

/** Header name to access */

68

String value() default "";

69

/** Whether header is required */

70

boolean required() default true;

71

/** Default value if header missing */

72

String defaultValue() default ValueConstants.DEFAULT_NONE;

73

}

74

75

/**

76

* Accesses all message headers

77

*/

78

@Target(ElementType.PARAMETER)

79

@Retention(RetentionPolicy.RUNTIME)

80

@interface Headers {

81

}

82

```

83

84

### Controller Implementation Patterns

85

86

Different patterns for implementing RSocket message handlers.

87

88

```java { .api }

89

/**

90

* Basic RSocket controller with message mappings

91

*/

92

@Controller

93

public class RSocketController {

94

95

// Request-Response pattern

96

@MessageMapping("user.get")

97

public Mono<User> getUser(String userId) {

98

return userService.findById(userId);

99

}

100

101

// Fire-and-Forget pattern

102

@MessageMapping("user.update")

103

public Mono<Void> updateUser(User user) {

104

return userService.save(user).then();

105

}

106

107

// Request-Stream pattern

108

@MessageMapping("notifications.stream")

109

public Flux<Notification> streamNotifications(String userId) {

110

return notificationService.streamForUser(userId);

111

}

112

113

// Request-Channel pattern (bidirectional streaming)

114

@MessageMapping("chat")

115

public Flux<ChatMessage> handleChat(Flux<ChatMessage> inbound) {

116

return chatService.handleConversation(inbound);

117

}

118

}

119

```

120

121

**Advanced Controller Examples:**

122

123

```java

124

@Controller

125

public class AdvancedRSocketController {

126

127

// Route variables and headers

128

@MessageMapping("user.{id}.profile")

129

public Mono<Profile> getUserProfile(

130

@DestinationVariable String id,

131

@Header("user-role") String userRole,

132

@Headers Map<String, Object> headers

133

) {

134

return profileService.getProfile(id, userRole, headers);

135

}

136

137

// Complex data types

138

@MessageMapping("search")

139

public Flux<SearchResult> search(SearchRequest request) {

140

return searchService.search(request);

141

}

142

143

// Exception handling

144

@MessageMapping("risky-operation")

145

public Mono<String> riskyOperation(String input) {

146

return processService.process(input)

147

.onErrorResume(ValidationException.class,

148

ex -> Mono.error(new RSocketException(0x201, ex.getMessage())));

149

}

150

151

// Requester injection for bidirectional communication

152

@MessageMapping("interactive")

153

public Mono<Void> interactive(String data, RSocketRequester requester) {

154

return requester.route("callback")

155

.data("processed: " + data)

156

.send();

157

}

158

}

159

```

160

161

### Message Handler Customization

162

163

Customize message handler behavior through the customizer interface.

164

165

```java { .api }

166

/**

167

* Callback interface for customizing RSocketMessageHandler

168

*/

169

@FunctionalInterface

170

interface RSocketMessageHandlerCustomizer {

171

172

/**

173

* Customize the message handler

174

* @param messageHandler Handler to customize

175

*/

176

void customize(RSocketMessageHandler messageHandler);

177

}

178

```

179

180

**Customization Examples:**

181

182

```java

183

@Configuration

184

public class MessageHandlerConfiguration {

185

186

// Configure default MIME types

187

@Bean

188

public RSocketMessageHandlerCustomizer mimeTypeCustomizer() {

189

return messageHandler -> {

190

messageHandler.setDefaultDataMimeType(MimeTypeUtils.APPLICATION_CBOR);

191

messageHandler.setDefaultMetadataMimeType(MimeTypeUtils.parseMediaType("message/x.rsocket.routing.v0"));

192

};

193

}

194

195

// Configure routing

196

@Bean

197

public RSocketMessageHandlerCustomizer routingCustomizer() {

198

return messageHandler -> {

199

PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();

200

matcher.setCaseSensitive(false);

201

messageHandler.setRouteMatcher(matcher);

202

};

203

}

204

205

// Add argument resolvers

206

@Bean

207

public RSocketMessageHandlerCustomizer argumentResolverCustomizer() {

208

return messageHandler -> {

209

messageHandler.getArgumentResolverConfigurer()

210

.addCustomResolver(new CustomArgumentResolver());

211

};

212

}

213

214

// Configure encoding

215

@Bean

216

public RSocketMessageHandlerCustomizer encodingCustomizer() {

217

return messageHandler -> {

218

messageHandler.setEncoderConfigurer(configurer -> {

219

configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB

220

});

221

};

222

}

223

}

224

```

225

226

### Error Handling

227

228

Comprehensive error handling patterns for RSocket message handlers.

229

230

```java

231

@Controller

232

@MessageExceptionHandler

233

public class RSocketErrorController {

234

235

// Global error handler

236

@MessageExceptionHandler

237

public Mono<ErrorResponse> handleValidationError(ValidationException ex) {

238

return Mono.just(new ErrorResponse("VALIDATION_ERROR", ex.getMessage()));

239

}

240

241

// Specific error handler

242

@MessageExceptionHandler(DataIntegrityViolationException.class)

243

public Mono<ErrorResponse> handleDataError(DataIntegrityViolationException ex) {

244

return Mono.just(new ErrorResponse("DATA_ERROR", "Data constraint violation"));

245

}

246

247

// RSocket-specific error handler

248

@MessageExceptionHandler(RSocketException.class)

249

public Mono<Void> handleRSocketError(RSocketException ex) {

250

log.error("RSocket error: code={}, message={}", ex.errorCode(), ex.getMessage());

251

return Mono.empty();

252

}

253

}

254

255

// Custom error response

256

class ErrorResponse {

257

private final String code;

258

private final String message;

259

private final Instant timestamp;

260

261

public ErrorResponse(String code, String message) {

262

this.code = code;

263

this.message = message;

264

this.timestamp = Instant.now();

265

}

266

267

// getters...

268

}

269

```

270

271

### Security Integration

272

273

Security patterns for RSocket message handlers.

274

275

```java

276

@Controller

277

@PreAuthorize("hasRole('USER')")

278

public class SecureRSocketController {

279

280

// Method-level security

281

@MessageMapping("admin.users")

282

@PreAuthorize("hasRole('ADMIN')")

283

public Flux<User> getAllUsers() {

284

return userService.findAll();

285

}

286

287

// Principal injection

288

@MessageMapping("profile")

289

public Mono<Profile> getCurrentProfile(Principal principal) {

290

return profileService.getProfile(principal.getName());

291

}

292

293

// Authentication context

294

@MessageMapping("secure-data")

295

public Mono<SecureData> getSecureData() {

296

return ReactiveSecurityContextHolder.getContext()

297

.map(SecurityContext::getAuthentication)

298

.cast(JwtAuthenticationToken.class)

299

.flatMap(auth -> secureDataService.getData(auth.getToken()));

300

}

301

}

302

```

303

304

### Metadata and Headers

305

306

Working with RSocket metadata and custom headers.

307

308

```java

309

@Controller

310

public class MetadataController {

311

312

// Composite metadata handling

313

@MessageMapping("metadata-example")

314

public Mono<String> handleMetadata(

315

String data,

316

@Header("custom-header") String customValue,

317

@Header(value = "optional-header", required = false) Optional<String> optionalValue

318

) {

319

return Mono.just("Processed: " + data + " with " + customValue);

320

}

321

322

// Manual metadata extraction

323

@MessageMapping("raw-metadata")

324

public Mono<String> handleRawMetadata(String data, @Headers Map<String, Object> headers) {

325

String traceId = (String) headers.get("trace-id");

326

String userId = (String) headers.get("user-id");

327

328

return processWithContext(data, traceId, userId);

329

}

330

331

// Setting response metadata

332

@MessageMapping("with-response-metadata")

333

public Mono<ResponseEntity<String>> responseWithMetadata(String data) {

334

return Mono.just(

335

ResponseEntity.ok()

336

.header("response-time", Instant.now().toString())

337

.header("server-id", "server-1")

338

.body("Processed: " + data)

339

);

340

}

341

}

342

```

343

344

### Testing Message Handlers

345

346

Testing patterns for RSocket message handlers.

347

348

```java

349

@SpringBootTest

350

class RSocketMessageHandlerTest {

351

352

@Autowired

353

private RSocketRequester.Builder requesterBuilder;

354

355

@LocalServerPort

356

private int port;

357

358

private RSocketRequester requester;

359

360

@BeforeEach

361

void setup() {

362

requester = requesterBuilder

363

.tcp("localhost", port)

364

.block();

365

}

366

367

@Test

368

void testRequestResponse() {

369

StepVerifier.create(

370

requester.route("user.get")

371

.data("123")

372

.retrieveMono(User.class)

373

)

374

.assertNext(user -> {

375

assertThat(user.getId()).isEqualTo("123");

376

assertThat(user.getName()).isNotBlank();

377

})

378

.verifyComplete();

379

}

380

381

@Test

382

void testRequestStream() {

383

StepVerifier.create(

384

requester.route("notifications.stream")

385

.data("user123")

386

.retrieveFlux(Notification.class)

387

.take(3)

388

)

389

.expectNextCount(3)

390

.verifyComplete();

391

}

392

393

@Test

394

void testErrorHandling() {

395

StepVerifier.create(

396

requester.route("invalid-operation")

397

.data("bad-data")

398

.retrieveMono(String.class)

399

)

400

.expectError(RSocketException.class)

401

.verify();

402

}

403

404

@AfterEach

405

void cleanup() {

406

if (requester != null) {

407

requester.dispose();

408

}

409

}

410

}

411

```

412

413

## Interaction Patterns

414

415

### Request-Response

416

Single request, single response pattern.

417

418

```java

419

@MessageMapping("echo")

420

public Mono<String> echo(String message) {

421

return Mono.just("Echo: " + message);

422

}

423

```

424

425

### Fire-and-Forget

426

Send data without expecting a response.

427

428

```java

429

@MessageMapping("log")

430

public Mono<Void> logMessage(LogEntry entry) {

431

return logService.save(entry).then();

432

}

433

```

434

435

### Request-Stream

436

Single request, stream of responses.

437

438

```java

439

@MessageMapping("events")

440

public Flux<Event> streamEvents(String topic) {

441

return eventService.streamEvents(topic);

442

}

443

```

444

445

### Request-Channel

446

Bidirectional streaming of data.

447

448

```java

449

@MessageMapping("process-stream")

450

public Flux<ProcessedData> processStream(Flux<RawData> dataStream) {

451

return dataStream

452

.map(this::process)

453

.filter(Objects::nonNull);

454

}

455

```