0
# Reactive Streaming
1
2
Support for reactive response types including `Uni<T>` for single asynchronous values and `Multi<T>` for streaming data with built-in content-type handling for Server-Sent Events, JSON arrays, and NDJSON.
3
4
## Capabilities
5
6
### Reactive Response Types
7
8
Support for Mutiny reactive types enabling non-blocking, asynchronous response handling.
9
10
```java { .api }
11
/**
12
* Reactive route methods can return:
13
* - Uni<T>: Single asynchronous value
14
* - Multi<T>: Stream of multiple values
15
* - T: Synchronous value (traditional)
16
*/
17
18
// Single async response
19
@Route(path = "/async-data", methods = HttpMethod.GET)
20
public Uni<String> getAsyncData() {
21
return Uni.createFrom().item("async result");
22
}
23
24
// Streaming response
25
@Route(path = "/stream-data", methods = HttpMethod.GET, produces = "text/event-stream")
26
public Multi<String> getStreamData() {
27
return Multi.createFrom().items("item1", "item2", "item3");
28
}
29
```
30
31
**Usage Examples:**
32
33
```java
34
import io.smallrye.mutiny.Uni;
35
import io.smallrye.mutiny.Multi;
36
import io.smallrye.mutiny.infrastructure.Infrastructure;
37
import java.time.Duration;
38
import java.time.Instant;
39
40
@ApplicationScoped
41
public class ReactiveExamples {
42
43
// Simple async response
44
@Route(path = "/async-hello", methods = HttpMethod.GET)
45
public Uni<String> asyncHello() {
46
return Uni.createFrom().item("Hello, Async World!")
47
.onItem().delayIt().by(Duration.ofMillis(100));
48
}
49
50
// Async database operation simulation
51
@Route(path = "/users/:id", methods = HttpMethod.GET, produces = "application/json")
52
public Uni<String> getUser(@Param("id") String userId) {
53
return Uni.createFrom().item(() -> {
54
// Simulate async database call
55
return "{\"id\":\"" + userId + "\",\"name\":\"User " + userId + "\"}";
56
}).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
57
}
58
59
// Error handling with Uni
60
@Route(path = "/risky-operation", methods = HttpMethod.GET)
61
public Uni<String> riskyOperation() {
62
return Uni.createFrom().item(() -> {
63
if (Math.random() > 0.5) {
64
throw new RuntimeException("Random failure");
65
}
66
return "Success!";
67
}).onFailure().recoverWithItem("Recovered from failure");
68
}
69
70
// Chained async operations
71
@Route(path = "/chained/:id", methods = HttpMethod.GET)
72
public Uni<String> chainedOperation(@Param("id") String id) {
73
return Uni.createFrom().item(id)
74
.onItem().transform(userId -> "User-" + userId)
75
.onItem().transformToUni(userId -> fetchUserData(userId))
76
.onItem().transform(userData -> "Processed: " + userData);
77
}
78
79
private Uni<String> fetchUserData(String userId) {
80
return Uni.createFrom().item("Data for " + userId)
81
.onItem().delayIt().by(Duration.ofMillis(50));
82
}
83
}
84
```
85
86
### Streaming Content Types
87
88
Built-in support for various streaming content types with appropriate HTTP headers and formatting.
89
90
```java { .api }
91
/**
92
* Content type constants for streaming responses
93
*/
94
public class ReactiveRoutes {
95
/** JSON array streaming - "application/json" */
96
public static final String APPLICATION_JSON = "application/json";
97
98
/** Server-Sent Events - "text/event-stream" */
99
public static final String EVENT_STREAM = "text/event-stream";
100
101
/** Newline-delimited JSON - "application/x-ndjson" */
102
public static final String ND_JSON = "application/x-ndjson";
103
104
/** JSON streaming alias - "application/stream+json" */
105
public static final String JSON_STREAM = "application/stream+json";
106
}
107
```
108
109
**Streaming Examples:**
110
111
```java
112
import io.quarkus.vertx.web.ReactiveRoutes;
113
114
@ApplicationScoped
115
public class StreamingExamples {
116
117
// Server-Sent Events stream
118
@Route(path = "/events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
119
public Multi<String> streamEvents() {
120
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
121
.onItem().transform(tick -> "data: Event " + tick + "\n\n")
122
.select().first(10);
123
}
124
125
// NDJSON stream
126
@Route(path = "/ndjson-data", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
127
public Multi<String> streamNdjson() {
128
return Multi.createFrom().items(
129
"{\"id\":1,\"name\":\"Alice\"}",
130
"{\"id\":2,\"name\":\"Bob\"}",
131
"{\"id\":3,\"name\":\"Charlie\"}"
132
);
133
}
134
135
// JSON array stream
136
@Route(path = "/json-array", methods = HttpMethod.GET, produces = ReactiveRoutes.APPLICATION_JSON)
137
public Multi<String> streamJsonArray() {
138
return Multi.createFrom().items("\"item1\"", "\"item2\"", "\"item3\"");
139
}
140
141
// Real-time data feed
142
@Route(path = "/live-feed", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
143
public Multi<String> liveFeed() {
144
return Multi.createFrom().ticks().every(Duration.ofMillis(500))
145
.onItem().transform(tick -> {
146
double value = Math.random() * 100;
147
return String.format("data: {\"timestamp\":%d,\"value\":%.2f}\n\n",
148
System.currentTimeMillis(), value);
149
});
150
}
151
152
// Paginated data stream
153
@Route(path = "/paginated-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
154
public Multi<String> paginatedStream(@Param("pageSize") Optional<String> pageSize) {
155
int size = pageSize.map(Integer::parseInt).orElse(10);
156
157
return Multi.createFrom().range(1, size + 1)
158
.onItem().transform(i -> String.format(
159
"{\"page\":%d,\"data\":\"Item %d\",\"timestamp\":\"%s\"}",
160
i, i, Instant.now().toString()));
161
}
162
}
163
```
164
165
### Server-Sent Events
166
167
Advanced Server-Sent Events support with custom event types and IDs.
168
169
```java { .api }
170
/**
171
* Interface for customizing Server-Sent Event structure
172
*/
173
public interface ReactiveRoutes.ServerSentEvent<T> {
174
/**
175
* Event type/name (optional)
176
* @return Event type or null for default
177
*/
178
default String event() { return null; }
179
180
/**
181
* Event data payload
182
* @return The data to send
183
*/
184
T data();
185
186
/**
187
* Event ID for client-side reconnection
188
* @return Event ID or -1L for auto-generation
189
*/
190
default long id() { return -1L; }
191
}
192
```
193
194
**SSE Examples:**
195
196
```java
197
import io.quarkus.vertx.web.ReactiveRoutes.ServerSentEvent;
198
199
@ApplicationScoped
200
public class SSEExamples {
201
202
// Basic SSE stream
203
@Route(path = "/simple-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
204
public Multi<String> simpleSSE() {
205
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
206
.onItem().transform(tick ->
207
"event: heartbeat\ndata: " + Instant.now() + "\nid: " + tick + "\n\n");
208
}
209
210
// Custom SSE events
211
@Route(path = "/custom-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
212
public Multi<ServerSentEvent<String>> customSSE() {
213
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
214
.onItem().transform(tick -> new ServerSentEvent<String>() {
215
@Override
216
public String event() {
217
return tick % 2 == 0 ? "even" : "odd";
218
}
219
220
@Override
221
public String data() {
222
return "Tick number: " + tick;
223
}
224
225
@Override
226
public long id() {
227
return tick;
228
}
229
})
230
.select().first(20);
231
}
232
233
// Mixed event types
234
@Route(path = "/mixed-events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
235
public Multi<String> mixedEvents() {
236
Multi<String> statusEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(5))
237
.onItem().transform(tick -> "event: status\ndata: System OK\nid: status-" + tick + "\n\n");
238
239
Multi<String> dataEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
240
.onItem().transform(tick -> "event: data\ndata: " + Math.random() + "\nid: data-" + tick + "\n\n");
241
242
return Multi.createBy().merging().streams(statusEvents, dataEvents);
243
}
244
245
// SSE with error handling
246
@Route(path = "/robust-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
247
public Multi<String> robustSSE() {
248
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
249
.onItem().transform(tick -> {
250
if (tick > 0 && tick % 10 == 0) {
251
return "event: milestone\ndata: Reached tick " + tick + "\nid: " + tick + "\n\n";
252
}
253
return "event: tick\ndata: " + tick + "\nid: " + tick + "\n\n";
254
})
255
.onFailure().retry().atMost(3)
256
.onFailure().recoverWithItem("event: error\ndata: Stream failed\n\n");
257
}
258
}
259
```
260
261
### Reactive Data Processing
262
263
Complex reactive data processing patterns for real-world applications.
264
265
```java
266
@ApplicationScoped
267
public class ReactiveDataProcessing {
268
269
// Transform and filter stream
270
@Route(path = "/processed-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
271
public Multi<String> processedStream() {
272
return Multi.createFrom().range(1, 101) // Numbers 1-100
273
.onItem().transform(n -> n * 2) // Double each number
274
.select().where(n -> n % 3 == 0) // Filter multiples of 3
275
.onItem().transform(n -> String.format("{\"value\":%d,\"processed\":true}", n))
276
.onOverflow().buffer(10); // Buffer to handle backpressure
277
}
278
279
// Async data transformation
280
@Route(path = "/async-transform", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
281
public Multi<String> asyncTransform() {
282
return Multi.createFrom().items("apple", "banana", "cherry", "date")
283
.onItem().transformToUniAndMerge(fruit ->
284
Uni.createFrom().item(fruit)
285
.onItem().transform(f -> f.toUpperCase())
286
.onItem().delayIt().by(Duration.ofMillis(100))
287
.onItem().transform(f -> String.format("{\"fruit\":\"%s\",\"length\":%d}", f, f.length()))
288
);
289
}
290
291
// Reactive API aggregation
292
@Route(path = "/aggregated-data/:userId", methods = HttpMethod.GET, produces = "application/json")
293
public Uni<String> aggregatedData(@Param("userId") String userId) {
294
Uni<String> userInfo = fetchUserInfo(userId);
295
Uni<String> userPosts = fetchUserPosts(userId);
296
Uni<String> userProfile = fetchUserProfile(userId);
297
298
return Uni.combine().all().unis(userInfo, userPosts, userProfile)
299
.asTuple()
300
.onItem().transform(tuple -> {
301
return String.format("{\"user\":%s,\"posts\":%s,\"profile\":%s}",
302
tuple.getItem1(), tuple.getItem2(), tuple.getItem3());
303
});
304
}
305
306
private Uni<String> fetchUserInfo(String userId) {
307
return Uni.createFrom().item(String.format("{\"id\":\"%s\",\"name\":\"User %s\"}", userId, userId))
308
.onItem().delayIt().by(Duration.ofMillis(50));
309
}
310
311
private Uni<String> fetchUserPosts(String userId) {
312
return Uni.createFrom().item(String.format("[{\"id\":1,\"title\":\"Post by %s\"}]", userId))
313
.onItem().delayIt().by(Duration.ofMillis(75));
314
}
315
316
private Uni<String> fetchUserProfile(String userId) {
317
return Uni.createFrom().item(String.format("{\"bio\":\"Profile of %s\",\"followers\":100}", userId))
318
.onItem().delayIt().by(Duration.ofMillis(25));
319
}
320
}
321
```
322
323
### Backpressure and Flow Control
324
325
Handling backpressure and flow control in streaming scenarios.
326
327
```java
328
@ApplicationScoped
329
public class BackpressureExamples {
330
331
// Buffered stream with overflow handling
332
@Route(path = "/buffered-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
333
public Multi<String> bufferedStream() {
334
return Multi.createFrom().ticks().every(Duration.ofMillis(10))
335
.onItem().transform(tick -> String.format("{\"tick\":%d,\"timestamp\":%d}", tick, System.currentTimeMillis()))
336
.onOverflow().buffer(100)
337
.onOverflow().drop()
338
.select().first(1000);
339
}
340
341
// Rate-limited stream
342
@Route(path = "/rate-limited", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
343
public Multi<String> rateLimitedStream() {
344
return Multi.createFrom().range(1, 1001)
345
.onItem().transform(i -> "data: Item " + i + "\n\n")
346
.onItem().call(item -> Uni.createFrom().nullItem()
347
.onItem().delayIt().by(Duration.ofMillis(100))); // Rate limit
348
}
349
350
// Chunked processing
351
@Route(path = "/chunked-processing", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
352
public Multi<String> chunkedProcessing() {
353
return Multi.createFrom().range(1, 1001)
354
.group().intoLists().of(10) // Process in chunks of 10
355
.onItem().transformToMultiAndMerge(chunk ->
356
Multi.createFrom().iterable(chunk)
357
.onItem().transform(n -> String.format("{\"number\":%d,\"square\":%d}", n, n * n))
358
);
359
}
360
}
361
```