or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdparameter-injection.mdreactive-streaming.mdrequest-context.mdrequest-filtering.mdroute-declaration.md

reactive-streaming.mddocs/

0

# Reactive Streaming

1

2

Support for reactive response types including `Uni<T>` for single asynchronous values and `Multi<T>` for streaming data with built-in content-type handling for Server-Sent Events, JSON arrays, and NDJSON.

3

4

## Capabilities

5

6

### Reactive Response Types

7

8

Support for Mutiny reactive types enabling non-blocking, asynchronous response handling.

9

10

```java { .api }

11

/**

12

* Reactive route methods can return:

13

* - Uni<T>: Single asynchronous value

14

* - Multi<T>: Stream of multiple values

15

* - T: Synchronous value (traditional)

16

*/

17

18

// Single async response

19

@Route(path = "/async-data", methods = HttpMethod.GET)

20

public Uni<String> getAsyncData() {

21

return Uni.createFrom().item("async result");

22

}

23

24

// Streaming response

25

@Route(path = "/stream-data", methods = HttpMethod.GET, produces = "text/event-stream")

26

public Multi<String> getStreamData() {

27

return Multi.createFrom().items("item1", "item2", "item3");

28

}

29

```

30

31

**Usage Examples:**

32

33

```java

34

import io.smallrye.mutiny.Uni;

35

import io.smallrye.mutiny.Multi;

36

import io.smallrye.mutiny.infrastructure.Infrastructure;

37

import java.time.Duration;

38

import java.time.Instant;

39

40

@ApplicationScoped

41

public class ReactiveExamples {

42

43

// Simple async response

44

@Route(path = "/async-hello", methods = HttpMethod.GET)

45

public Uni<String> asyncHello() {

46

return Uni.createFrom().item("Hello, Async World!")

47

.onItem().delayIt().by(Duration.ofMillis(100));

48

}

49

50

// Async database operation simulation

51

@Route(path = "/users/:id", methods = HttpMethod.GET, produces = "application/json")

52

public Uni<String> getUser(@Param("id") String userId) {

53

return Uni.createFrom().item(() -> {

54

// Simulate async database call

55

return "{\"id\":\"" + userId + "\",\"name\":\"User " + userId + "\"}";

56

}).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());

57

}

58

59

// Error handling with Uni

60

@Route(path = "/risky-operation", methods = HttpMethod.GET)

61

public Uni<String> riskyOperation() {

62

return Uni.createFrom().item(() -> {

63

if (Math.random() > 0.5) {

64

throw new RuntimeException("Random failure");

65

}

66

return "Success!";

67

}).onFailure().recoverWithItem("Recovered from failure");

68

}

69

70

// Chained async operations

71

@Route(path = "/chained/:id", methods = HttpMethod.GET)

72

public Uni<String> chainedOperation(@Param("id") String id) {

73

return Uni.createFrom().item(id)

74

.onItem().transform(userId -> "User-" + userId)

75

.onItem().transformToUni(userId -> fetchUserData(userId))

76

.onItem().transform(userData -> "Processed: " + userData);

77

}

78

79

private Uni<String> fetchUserData(String userId) {

80

return Uni.createFrom().item("Data for " + userId)

81

.onItem().delayIt().by(Duration.ofMillis(50));

82

}

83

}

84

```

85

86

### Streaming Content Types

87

88

Built-in support for various streaming content types with appropriate HTTP headers and formatting.

89

90

```java { .api }

91

/**

92

* Content type constants for streaming responses

93

*/

94

public class ReactiveRoutes {

95

/** JSON array streaming - "application/json" */

96

public static final String APPLICATION_JSON = "application/json";

97

98

/** Server-Sent Events - "text/event-stream" */

99

public static final String EVENT_STREAM = "text/event-stream";

100

101

/** Newline-delimited JSON - "application/x-ndjson" */

102

public static final String ND_JSON = "application/x-ndjson";

103

104

/** JSON streaming alias - "application/stream+json" */

105

public static final String JSON_STREAM = "application/stream+json";

106

}

107

```

108

109

**Streaming Examples:**

110

111

```java

112

import io.quarkus.vertx.web.ReactiveRoutes;

113

114

@ApplicationScoped

115

public class StreamingExamples {

116

117

// Server-Sent Events stream

118

@Route(path = "/events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

119

public Multi<String> streamEvents() {

120

return Multi.createFrom().ticks().every(Duration.ofSeconds(1))

121

.onItem().transform(tick -> "data: Event " + tick + "\n\n")

122

.select().first(10);

123

}

124

125

// NDJSON stream

126

@Route(path = "/ndjson-data", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)

127

public Multi<String> streamNdjson() {

128

return Multi.createFrom().items(

129

"{\"id\":1,\"name\":\"Alice\"}",

130

"{\"id\":2,\"name\":\"Bob\"}",

131

"{\"id\":3,\"name\":\"Charlie\"}"

132

);

133

}

134

135

// JSON array stream

136

@Route(path = "/json-array", methods = HttpMethod.GET, produces = ReactiveRoutes.APPLICATION_JSON)

137

public Multi<String> streamJsonArray() {

138

return Multi.createFrom().items("\"item1\"", "\"item2\"", "\"item3\"");

139

}

140

141

// Real-time data feed

142

@Route(path = "/live-feed", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

143

public Multi<String> liveFeed() {

144

return Multi.createFrom().ticks().every(Duration.ofMillis(500))

145

.onItem().transform(tick -> {

146

double value = Math.random() * 100;

147

return String.format("data: {\"timestamp\":%d,\"value\":%.2f}\n\n",

148

System.currentTimeMillis(), value);

149

});

150

}

151

152

// Paginated data stream

153

@Route(path = "/paginated-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)

154

public Multi<String> paginatedStream(@Param("pageSize") Optional<String> pageSize) {

155

int size = pageSize.map(Integer::parseInt).orElse(10);

156

157

return Multi.createFrom().range(1, size + 1)

158

.onItem().transform(i -> String.format(

159

"{\"page\":%d,\"data\":\"Item %d\",\"timestamp\":\"%s\"}",

160

i, i, Instant.now().toString()));

161

}

162

}

163

```

164

165

### Server-Sent Events

166

167

Advanced Server-Sent Events support with custom event types and IDs.

168

169

```java { .api }

170

/**

171

* Interface for customizing Server-Sent Event structure

172

*/

173

public interface ReactiveRoutes.ServerSentEvent<T> {

174

/**

175

* Event type/name (optional)

176

* @return Event type or null for default

177

*/

178

default String event() { return null; }

179

180

/**

181

* Event data payload

182

* @return The data to send

183

*/

184

T data();

185

186

/**

187

* Event ID for client-side reconnection

188

* @return Event ID or -1L for auto-generation

189

*/

190

default long id() { return -1L; }

191

}

192

```

193

194

**SSE Examples:**

195

196

```java

197

import io.quarkus.vertx.web.ReactiveRoutes.ServerSentEvent;

198

199

@ApplicationScoped

200

public class SSEExamples {

201

202

// Basic SSE stream

203

@Route(path = "/simple-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

204

public Multi<String> simpleSSE() {

205

return Multi.createFrom().ticks().every(Duration.ofSeconds(2))

206

.onItem().transform(tick ->

207

"event: heartbeat\ndata: " + Instant.now() + "\nid: " + tick + "\n\n");

208

}

209

210

// Custom SSE events

211

@Route(path = "/custom-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

212

public Multi<ServerSentEvent<String>> customSSE() {

213

return Multi.createFrom().ticks().every(Duration.ofSeconds(1))

214

.onItem().transform(tick -> new ServerSentEvent<String>() {

215

@Override

216

public String event() {

217

return tick % 2 == 0 ? "even" : "odd";

218

}

219

220

@Override

221

public String data() {

222

return "Tick number: " + tick;

223

}

224

225

@Override

226

public long id() {

227

return tick;

228

}

229

})

230

.select().first(20);

231

}

232

233

// Mixed event types

234

@Route(path = "/mixed-events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

235

public Multi<String> mixedEvents() {

236

Multi<String> statusEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(5))

237

.onItem().transform(tick -> "event: status\ndata: System OK\nid: status-" + tick + "\n\n");

238

239

Multi<String> dataEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(1))

240

.onItem().transform(tick -> "event: data\ndata: " + Math.random() + "\nid: data-" + tick + "\n\n");

241

242

return Multi.createBy().merging().streams(statusEvents, dataEvents);

243

}

244

245

// SSE with error handling

246

@Route(path = "/robust-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

247

public Multi<String> robustSSE() {

248

return Multi.createFrom().ticks().every(Duration.ofSeconds(1))

249

.onItem().transform(tick -> {

250

if (tick > 0 && tick % 10 == 0) {

251

return "event: milestone\ndata: Reached tick " + tick + "\nid: " + tick + "\n\n";

252

}

253

return "event: tick\ndata: " + tick + "\nid: " + tick + "\n\n";

254

})

255

.onFailure().retry().atMost(3)

256

.onFailure().recoverWithItem("event: error\ndata: Stream failed\n\n");

257

}

258

}

259

```

260

261

### Reactive Data Processing

262

263

Complex reactive data processing patterns for real-world applications.

264

265

```java

266

@ApplicationScoped

267

public class ReactiveDataProcessing {

268

269

// Transform and filter stream

270

@Route(path = "/processed-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)

271

public Multi<String> processedStream() {

272

return Multi.createFrom().range(1, 101) // Numbers 1-100

273

.onItem().transform(n -> n * 2) // Double each number

274

.select().where(n -> n % 3 == 0) // Filter multiples of 3

275

.onItem().transform(n -> String.format("{\"value\":%d,\"processed\":true}", n))

276

.onOverflow().buffer(10); // Buffer to handle backpressure

277

}

278

279

// Async data transformation

280

@Route(path = "/async-transform", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)

281

public Multi<String> asyncTransform() {

282

return Multi.createFrom().items("apple", "banana", "cherry", "date")

283

.onItem().transformToUniAndMerge(fruit ->

284

Uni.createFrom().item(fruit)

285

.onItem().transform(f -> f.toUpperCase())

286

.onItem().delayIt().by(Duration.ofMillis(100))

287

.onItem().transform(f -> String.format("{\"fruit\":\"%s\",\"length\":%d}", f, f.length()))

288

);

289

}

290

291

// Reactive API aggregation

292

@Route(path = "/aggregated-data/:userId", methods = HttpMethod.GET, produces = "application/json")

293

public Uni<String> aggregatedData(@Param("userId") String userId) {

294

Uni<String> userInfo = fetchUserInfo(userId);

295

Uni<String> userPosts = fetchUserPosts(userId);

296

Uni<String> userProfile = fetchUserProfile(userId);

297

298

return Uni.combine().all().unis(userInfo, userPosts, userProfile)

299

.asTuple()

300

.onItem().transform(tuple -> {

301

return String.format("{\"user\":%s,\"posts\":%s,\"profile\":%s}",

302

tuple.getItem1(), tuple.getItem2(), tuple.getItem3());

303

});

304

}

305

306

private Uni<String> fetchUserInfo(String userId) {

307

return Uni.createFrom().item(String.format("{\"id\":\"%s\",\"name\":\"User %s\"}", userId, userId))

308

.onItem().delayIt().by(Duration.ofMillis(50));

309

}

310

311

private Uni<String> fetchUserPosts(String userId) {

312

return Uni.createFrom().item(String.format("[{\"id\":1,\"title\":\"Post by %s\"}]", userId))

313

.onItem().delayIt().by(Duration.ofMillis(75));

314

}

315

316

private Uni<String> fetchUserProfile(String userId) {

317

return Uni.createFrom().item(String.format("{\"bio\":\"Profile of %s\",\"followers\":100}", userId))

318

.onItem().delayIt().by(Duration.ofMillis(25));

319

}

320

}

321

```

322

323

### Backpressure and Flow Control

324

325

Handling backpressure and flow control in streaming scenarios.

326

327

```java

328

@ApplicationScoped

329

public class BackpressureExamples {

330

331

// Buffered stream with overflow handling

332

@Route(path = "/buffered-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)

333

public Multi<String> bufferedStream() {

334

return Multi.createFrom().ticks().every(Duration.ofMillis(10))

335

.onItem().transform(tick -> String.format("{\"tick\":%d,\"timestamp\":%d}", tick, System.currentTimeMillis()))

336

.onOverflow().buffer(100)

337

.onOverflow().drop()

338

.select().first(1000);

339

}

340

341

// Rate-limited stream

342

@Route(path = "/rate-limited", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)

343

public Multi<String> rateLimitedStream() {

344

return Multi.createFrom().range(1, 1001)

345

.onItem().transform(i -> "data: Item " + i + "\n\n")

346

.onItem().call(item -> Uni.createFrom().nullItem()

347

.onItem().delayIt().by(Duration.ofMillis(100))); // Rate limit

348

}

349

350

// Chunked processing

351

@Route(path = "/chunked-processing", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)

352

public Multi<String> chunkedProcessing() {

353

return Multi.createFrom().range(1, 1001)

354

.group().intoLists().of(10) // Process in chunks of 10

355

.onItem().transformToMultiAndMerge(chunk ->

356

Multi.createFrom().iterable(chunk)

357

.onItem().transform(n -> String.format("{\"number\":%d,\"square\":%d}", n, n * n))

358

);

359

}

360

}

361

```