or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotation-handling.mdcore-messaging.mdindex.mdmessage-conversion.mdmessaging-templates.mdrsocket-integration.mdstomp-websocket.md

rsocket-integration.mddocs/

0

# RSocket Integration

1

2

RSocket protocol support for reactive, binary messaging with backpressure handling, request-response patterns, and reactive streams integration.

3

4

## Capabilities

5

6

### RSocket Requester Interface

7

8

Main interface for RSocket client interactions with fluent API design.

9

10

```java { .api }

11

/**

12

* A contract for executing RSocket requests.

13

*/

14

public interface RSocketRequester {

15

16

/**

17

* Begin to specify a new request with the route to a handler method.

18

*/

19

RequestSpec route(String route, Object... routeVars);

20

21

/**

22

* Begin to specify a new request with a formatted route.

23

*/

24

RequestSpec route(String route);

25

26

/**

27

* Access to the underlying RSocket for more advanced scenarios.

28

*/

29

RSocket rsocket();

30

31

/**

32

* Return the data MimeType selected for the underlying RSocket at connection time.

33

*/

34

MimeType dataMimeType();

35

36

/**

37

* Return the metadata MimeType selected for the underlying RSocket at connection time.

38

*/

39

MimeType metadataMimeType();

40

41

/**

42

* Specification for a request including the route and data.

43

*/

44

interface RequestSpec {

45

46

/**

47

* Provide request data.

48

*/

49

ResponseSpec data(Object data);

50

51

/**

52

* Provide request data with a specific MimeType.

53

*/

54

ResponseSpec data(Object data, MimeType mimeType);

55

56

/**

57

* Provide request data as Publisher.

58

*/

59

ResponseSpec data(Publisher<?> data, Class<?> dataType);

60

61

/**

62

* Provide request data as Publisher with MimeType.

63

*/

64

ResponseSpec data(Publisher<?> data, ParameterizedTypeReference<?> dataTypeRef);

65

66

/**

67

* Add metadata.

68

*/

69

RequestSpec metadata(Object metadata, MimeType mimeType);

70

71

/**

72

* Proceed without data for requests that don't need data.

73

*/

74

ResponseSpec noData();

75

}

76

77

/**

78

* Specification for the expected RSocket response.

79

*/

80

interface ResponseSpec {

81

82

/**

83

* Perform a Fire-and-forget interaction model.

84

*/

85

Mono<Void> send();

86

87

/**

88

* Perform a Request-Response interaction model.

89

*/

90

<T> Mono<T> retrieveMono(Class<T> dataType);

91

92

/**

93

* Perform a Request-Response interaction model with parameterized type.

94

*/

95

<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);

96

97

/**

98

* Perform a Request-Stream interaction model.

99

*/

100

<T> Flux<T> retrieveFlux(Class<T> dataType);

101

102

/**

103

* Perform a Request-Stream interaction model with parameterized type.

104

*/

105

<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);

106

}

107

}

108

```

109

110

### RSocket Strategies

111

112

Configuration interface for RSocket messaging strategies.

113

114

```java { .api }

115

/**

116

* Access to strategies for use by RSocket messaging.

117

*/

118

public interface RSocketStrategies {

119

120

/**

121

* Return the configured Encoder instances.

122

*/

123

List<Encoder<?>> encoders();

124

125

/**

126

* Return the configured Decoder instances.

127

*/

128

List<Decoder<?>> decoders();

129

130

/**

131

* Return the configured ReactiveAdapterRegistry.

132

*/

133

ReactiveAdapterRegistry reactiveAdapterRegistry();

134

135

/**

136

* Return a builder to create a new RSocketStrategies instance.

137

*/

138

static Builder builder() {

139

return new DefaultRSocketStrategiesBuilder();

140

}

141

142

/**

143

* Builder to create RSocketStrategies.

144

*/

145

interface Builder {

146

147

/**

148

* Append to the list of encoders to use for serializing Objects to the payload of RSocket frames.

149

*/

150

Builder encoder(Encoder<?>... encoder);

151

152

/**

153

* Append to the list of decoders to use for de-serializing the payload of RSocket frames.

154

*/

155

Builder decoder(Decoder<?>... decoder);

156

157

/**

158

* Configure the ReactiveAdapterRegistry to use.

159

*/

160

Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);

161

162

/**

163

* Configure Jackson for JSON encoding and decoding.

164

*/

165

Builder jackson2JsonEncoder(Jackson2JsonEncoder encoder);

166

167

/**

168

* Configure Jackson for JSON encoding and decoding.

169

*/

170

Builder jackson2JsonDecoder(Jackson2JsonDecoder decoder);

171

172

/**

173

* Build the RSocketStrategies instance.

174

*/

175

RSocketStrategies build();

176

}

177

}

178

```

179

180

### Metadata Extraction

181

182

Interface and registry for extracting metadata from RSocket frames.

183

184

```java { .api }

185

/**

186

* Strategy to extract a map of values from MetadataAndPayload frame metadata.

187

*/

188

public interface MetadataExtractor {

189

190

/**

191

* Extract a map of values from the metadata of a MetadataAndPayload frame.

192

*/

193

Map<String, Object> extract(Payload payload, MimeType metadataMimeType);

194

}

195

196

/**

197

* Registry of MetadataExtractor instances mapped by route.

198

*/

199

public interface MetadataExtractorRegistry {

200

201

/**

202

* Register a MetadataExtractor for a specific mime type.

203

*/

204

void metadataToExtract(MimeType mimeType, Class<?> targetType, String name);

205

206

/**

207

* Register a MetadataExtractor for a specific mime type with BiConsumer.

208

*/

209

void metadataToExtract(MimeType mimeType, Class<?> targetType, BiConsumer<Object, Map<String, Object>> consumer);

210

211

/**

212

* Register a custom MetadataExtractor for a mime type.

213

*/

214

void metadataToExtract(MimeType mimeType, MetadataExtractor extractor);

215

}

216

217

/**

218

* Default implementation of MetadataExtractor.

219

*/

220

public class DefaultMetadataExtractor implements MetadataExtractor, MetadataExtractorRegistry {

221

222

public DefaultMetadataExtractor();

223

224

public DefaultMetadataExtractor(Decoder<?>... decoders);

225

226

/**

227

* Configure the mime type for routing metadata.

228

*/

229

public void metadataToExtract(MimeType mimeType, Class<?> targetType, String name);

230

231

/**

232

* Configure the mime type for routing metadata with BiConsumer.

233

*/

234

public void metadataToExtract(MimeType mimeType, Class<?> targetType, BiConsumer<Object, Map<String, Object>> consumer);

235

}

236

```

237

238

### RSocket Message Handler

239

240

Main message handler for processing RSocket requests.

241

242

```java { .api }

243

/**

244

* MessageHandler for RSocket requests.

245

*/

246

public class RSocketMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {

247

248

public RSocketMessageHandler();

249

250

/**

251

* Configure the RSocketStrategies to use for access to encoders, decoders, and a reactive adapter registry.

252

*/

253

public void setRSocketStrategies(RSocketStrategies rsocketStrategies);

254

255

/**

256

* Return the configured RSocketStrategies.

257

*/

258

public RSocketStrategies getRSocketStrategies();

259

260

/**

261

* Set the MetadataExtractor to extract route and other metadata.

262

*/

263

public void setMetadataExtractor(MetadataExtractor metadataExtractor);

264

265

/**

266

* Return the configured MetadataExtractor.

267

*/

268

public MetadataExtractor getMetadataExtractor();

269

270

/**

271

* Configure the registry for adapting various reactive types.

272

*/

273

public void setReactiveAdapterRegistry(ReactiveAdapterRegistry registry);

274

275

/**

276

* Return the configured ReactiveAdapterRegistry.

277

*/

278

public ReactiveAdapterRegistry getReactiveAdapterRegistry();

279

280

/**

281

* Set the HandlerMethodArgumentResolver instances to use.

282

*/

283

public void setArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers);

284

285

/**

286

* Return the configured argument resolvers.

287

*/

288

public List<HandlerMethodArgumentResolver> getArgumentResolvers();

289

290

/**

291

* Set the HandlerMethodReturnValueHandler instances to use.

292

*/

293

public void setReturnValueHandlers(List<HandlerMethodReturnValueHandler> handlers);

294

295

/**

296

* Return the configured return value handlers.

297

*/

298

public List<HandlerMethodReturnValueHandler> getReturnValueHandlers();

299

}

300

```

301

302

### RSocket Request Mapping

303

304

Information about RSocket request mappings.

305

306

```java { .api }

307

/**

308

* Request mapping information for RSocket requests.

309

*/

310

public final class RSocketRequestMappingInfo implements RequestMappingInfo {

311

312

/**

313

* Return the mapping paths.

314

*/

315

@Override

316

public Set<String> getPatterns();

317

318

/**

319

* Return a new instance with the given mapping paths.

320

*/

321

public RSocketRequestMappingInfo paths(String... paths);

322

323

/**

324

* Builder for creating RSocketRequestMappingInfo instances.

325

*/

326

public static Builder paths(String... paths) {

327

return new Builder().paths(paths);

328

}

329

330

/**

331

* Builder class for RSocketRequestMappingInfo.

332

*/

333

public static class Builder {

334

335

/**

336

* Set the path mapping.

337

*/

338

public Builder paths(String... paths);

339

340

/**

341

* Build the RSocketRequestMappingInfo.

342

*/

343

public RSocketRequestMappingInfo build();

344

}

345

}

346

```

347

348

**Usage Examples:**

349

350

```java

351

import org.springframework.messaging.rsocket.RSocketRequester;

352

import org.springframework.messaging.rsocket.RSocketStrategies;

353

import org.springframework.messaging.rsocket.annotation.ConnectMapping;

354

import org.springframework.messaging.handler.annotation.MessageMapping;

355

import org.springframework.messaging.handler.annotation.Payload;

356

import reactor.core.publisher.Flux;

357

import reactor.core.publisher.Mono;

358

359

// RSocket client setup

360

RSocketRequester requester = RSocketRequester.builder()

361

.rsocketStrategies(RSocketStrategies.builder()

362

.encoder(new Jackson2JsonEncoder())

363

.decoder(new Jackson2JsonDecoder())

364

.build())

365

.tcp("localhost", 7000);

366

367

// Request-Response interaction

368

Mono<String> response = requester

369

.route("greeting")

370

.data("Hello")

371

.retrieveMono(String.class);

372

373

// Request-Stream interaction

374

Flux<String> stream = requester

375

.route("greetings")

376

.data("Hello")

377

.retrieveFlux(String.class);

378

379

// Fire-and-Forget interaction

380

Mono<Void> fireAndForget = requester

381

.route("log")

382

.data("Important message")

383

.send();

384

385

// RSocket server handler

386

@Controller

387

public class RSocketController {

388

389

// Handle connection setup

390

@ConnectMapping("setup")

391

public void handleConnection(@Payload ConnectionSetupPayload setup) {

392

System.out.println("Client connected with setup: " + setup);

393

}

394

395

// Request-Response

396

@MessageMapping("greeting")

397

public Mono<String> greeting(@Payload String name) {

398

return Mono.just("Hello, " + name + "!");

399

}

400

401

// Request-Stream

402

@MessageMapping("greetings")

403

public Flux<String> greetings(@Payload String name) {

404

return Flux.interval(Duration.ofSeconds(1))

405

.map(i -> "Hello " + name + " #" + i);

406

}

407

408

// Fire-and-Forget

409

@MessageMapping("log")

410

public Mono<Void> log(@Payload String message) {

411

System.out.println("Log: " + message);

412

return Mono.empty();

413

}

414

415

// Request-Channel (bidirectional streaming)

416

@MessageMapping("chat")

417

public Flux<ChatMessage> chat(Flux<ChatMessage> messages) {

418

return messages

419

.doOnNext(msg -> System.out.println("Received: " + msg))

420

.map(msg -> new ChatMessage("Echo: " + msg.getContent()));

421

}

422

}

423

424

// Custom metadata extraction

425

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();

426

extractor.metadataToExtract(MimeTypeUtils.APPLICATION_JSON, User.class, "user");

427

428

// RSocket strategies configuration

429

RSocketStrategies strategies = RSocketStrategies.builder()

430

.encoder(new Jackson2JsonEncoder())

431

.decoder(new Jackson2JsonDecoder())

432

.encoder(new Jackson2CborEncoder())

433

.decoder(new Jackson2CborDecoder())

434

.build();

435

```