0
# Reactive Streaming
1
2
Low-level reactive streaming utilities for implementing custom gRPC call patterns with Mutiny integration. These utilities bridge between traditional gRPC StreamObserver patterns and reactive Mutiny types.
3
4
## Capabilities
5
6
### ServerCalls Class
7
8
Provides server-side call implementations that bridge gRPC StreamObserver patterns with Mutiny reactive types (`Uni` and `Multi`).
9
10
```java { .api }
11
public class ServerCalls {
12
13
/**
14
* Handle unary calls: single request -> single response
15
*/
16
public static <I, O> void oneToOne(
17
I request,
18
StreamObserver<O> response,
19
String compression,
20
Function<I, Uni<O>> implementation
21
);
22
23
/**
24
* Handle server streaming calls: single request -> stream of responses
25
*/
26
public static <I, O> void oneToMany(
27
I request,
28
StreamObserver<O> response,
29
String compression,
30
Function<I, Multi<O>> implementation
31
);
32
33
/**
34
* Handle client streaming calls: stream of requests -> single response
35
*/
36
public static <I, O> StreamObserver<I> manyToOne(
37
StreamObserver<O> response,
38
Function<Multi<I>, Uni<O>> implementation
39
);
40
41
/**
42
* Handle bidirectional streaming calls: stream of requests -> stream of responses
43
*/
44
public static <I, O> StreamObserver<I> manyToMany(
45
StreamObserver<O> response,
46
Function<Multi<I>, Multi<O>> implementation
47
);
48
49
// Development mode utilities
50
public static void setStreamCollector(StreamCollector collector);
51
public static StreamCollector getStreamCollector();
52
}
53
```
54
55
**Usage Examples:**
56
57
```java
58
import io.quarkus.grpc.stubs.ServerCalls;
59
import io.smallrye.mutiny.Uni;
60
import io.smallrye.mutiny.Multi;
61
62
public class CustomGrpcService extends GreetingGrpc.GreetingImplBase {
63
64
@Override
65
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
66
ServerCalls.oneToOne(request, responseObserver, null, this::processHello);
67
}
68
69
@Override
70
public void sayHelloStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
71
ServerCalls.oneToMany(request, responseObserver, "gzip", this::processHelloStream);
72
}
73
74
@Override
75
public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloResponse> responseObserver) {
76
return ServerCalls.manyToOne(responseObserver, this::processClientStream);
77
}
78
79
@Override
80
public StreamObserver<HelloRequest> sayHelloBidirectional(StreamObserver<HelloResponse> responseObserver) {
81
return ServerCalls.manyToMany(responseObserver, this::processBidirectional);
82
}
83
84
private Uni<HelloResponse> processHello(HelloRequest request) {
85
return Uni.createFrom().item(
86
HelloResponse.newBuilder()
87
.setMessage("Hello " + request.getName())
88
.build()
89
);
90
}
91
92
private Multi<HelloResponse> processHelloStream(HelloRequest request) {
93
return Multi.createFrom().range(1, 4)
94
.onItem().transform(i ->
95
HelloResponse.newBuilder()
96
.setMessage("Hello " + request.getName() + " #" + i)
97
.build());
98
}
99
100
private Uni<HelloResponse> processClientStream(Multi<HelloRequest> requests) {
101
return requests
102
.collect().asList()
103
.onItem().transform(list ->
104
HelloResponse.newBuilder()
105
.setMessage("Received " + list.size() + " messages")
106
.build());
107
}
108
109
private Multi<HelloResponse> processBidirectional(Multi<HelloRequest> requests) {
110
return requests
111
.onItem().transform(request ->
112
HelloResponse.newBuilder()
113
.setMessage("Echo: " + request.getName())
114
.build());
115
}
116
}
117
```
118
119
### ClientCalls Class
120
121
Provides client-side call implementations that convert traditional gRPC patterns into reactive Mutiny types.
122
123
```java { .api }
124
public class ClientCalls {
125
126
/**
127
* Convert unary call to Uni
128
*/
129
public static <I, O> Uni<O> oneToOne(
130
I request,
131
BiConsumer<I, StreamObserver<O>> delegate
132
);
133
134
/**
135
* Convert server streaming call to Multi
136
*/
137
public static <I, O> Multi<O> oneToMany(
138
I request,
139
BiConsumer<I, StreamObserver<O>> delegate
140
);
141
142
/**
143
* Convert client streaming call to Uni
144
*/
145
public static <I, O> Uni<O> manyToOne(
146
Multi<I> items,
147
Function<StreamObserver<O>, StreamObserver<I>> delegate
148
);
149
150
/**
151
* Convert bidirectional streaming call to Multi
152
*/
153
public static <I, O> Multi<O> manyToMany(
154
Multi<I> items,
155
Function<StreamObserver<O>, StreamObserver<I>> delegate
156
);
157
}
158
```
159
160
**Usage Examples:**
161
162
```java
163
import io.quarkus.grpc.stubs.ClientCalls;
164
import io.smallrye.mutiny.Uni;
165
import io.smallrye.mutiny.Multi;
166
167
public class CustomGrpcClient {
168
169
private final GreetingGrpc.GreetingStub stub;
170
171
public CustomGrpcClient(Channel channel) {
172
this.stub = GreetingGrpc.newStub(channel);
173
}
174
175
public Uni<HelloResponse> sayHello(HelloRequest request) {
176
return ClientCalls.oneToOne(request, stub::sayHello);
177
}
178
179
public Multi<HelloResponse> sayHelloStream(HelloRequest request) {
180
return ClientCalls.oneToMany(request, stub::sayHelloStream);
181
}
182
183
public Uni<HelloResponse> sayHelloClientStream(Multi<HelloRequest> requests) {
184
return ClientCalls.manyToOne(requests, stub::sayHelloClientStream);
185
}
186
187
public Multi<HelloResponse> sayHelloBidirectional(Multi<HelloRequest> requests) {
188
return ClientCalls.manyToMany(requests, stub::sayHelloBidirectional);
189
}
190
}
191
```
192
193
### Stream Observer Implementations
194
195
Various StreamObserver implementations for different reactive patterns:
196
197
```java { .api }
198
public class UniStreamObserver<T> implements StreamObserver<T> {
199
// Bridges StreamObserver to UniEmitter
200
}
201
202
public class MultiStreamObserver<T> implements StreamObserver<T> {
203
// Bridges StreamObserver to MultiEmitter
204
}
205
206
public class ManyToManyObserver<T> implements StreamObserver<T> {
207
// Specialized observer for bidirectional streaming
208
}
209
210
public class ManyToOneObserver<T> implements StreamObserver<T> {
211
// Specialized observer for client streaming to unary response
212
}
213
```
214
215
### StreamCollector Interface
216
217
Development mode support for collecting and managing stream observers:
218
219
```java { .api }
220
public interface StreamCollector {
221
StreamCollector NO_OP = new StreamCollector() {
222
@Override
223
public void add(StreamObserver<?> streamObserver) {}
224
225
@Override
226
public void remove(StreamObserver<?> streamObserver) {}
227
};
228
229
void add(StreamObserver<?> streamObserver);
230
void remove(StreamObserver<?> streamObserver);
231
}
232
```
233
234
## Advanced Streaming Patterns
235
236
### Custom Stream Processing
237
238
```java
239
@GrpcService
240
public class StreamProcessingService implements MutinyService {
241
242
public Multi<ProcessedData> processDataStream(Multi<RawData> rawDataStream) {
243
return rawDataStream
244
.onItem().transform(this::validateData)
245
.onFailure().recoverWithItem(this::createErrorData)
246
.onItem().transformToUniAndConcatenate(this::enrichData)
247
.onItem().transform(this::processData)
248
.onOverflow().buffer(100)
249
.onCancellation().invoke(() -> cleanupResources());
250
}
251
252
private RawData validateData(RawData data) {
253
if (data.getValue() == null) {
254
throw new IllegalArgumentException("Value cannot be null");
255
}
256
return data;
257
}
258
259
private RawData createErrorData(Throwable error) {
260
return RawData.newBuilder()
261
.setValue("ERROR: " + error.getMessage())
262
.build();
263
}
264
265
private Uni<RawData> enrichData(RawData data) {
266
return externalService.enrich(data)
267
.onFailure().recoverWithItem(data); // Continue with original on failure
268
}
269
270
private ProcessedData processData(RawData data) {
271
return ProcessedData.newBuilder()
272
.setResult(data.getValue().toUpperCase())
273
.setTimestamp(System.currentTimeMillis())
274
.build();
275
}
276
}
277
```
278
279
### Backpressure Handling
280
281
```java
282
@GrpcService
283
public class BackpressureService implements MutinyService {
284
285
public Multi<DataResponse> streamLargeDataset(DataRequest request) {
286
return databaseService.queryLargeDataset(request.getQuery())
287
.onOverflow().buffer(1000) // Buffer up to 1000 items
288
.onOverflow().drop() // Drop items if buffer is full
289
.onItem().transform(this::convertToResponse)
290
.onItem().call(response -> {
291
// Apply backpressure based on response processing time
292
return Uni.createFrom().nullItem()
293
.onItem().delayIt().by(Duration.ofMillis(10));
294
});
295
}
296
297
public Multi<StreamResponse> controlledStream(StreamRequest request) {
298
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
299
.onItem().transform(tick -> generateResponse(tick))
300
.onOverflow().bufferSize(50)
301
.onRequest().invoke(requested ->
302
log.info("Client requested {} items", requested));
303
}
304
}
305
```
306
307
### Error Recovery in Streams
308
309
```java
310
@GrpcService
311
public class ResilientStreamService implements MutinyService {
312
313
public Multi<DataItem> resilientDataStream(StreamRequest request) {
314
return Multi.createFrom().range(1, 1000)
315
.onItem().transformToUniAndConcatenate(this::processItem)
316
.onFailure(TransientException.class).retry()
317
.withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))
318
.atMost(3)
319
.onFailure(PermanentException.class).recoverWithCompletion()
320
.onFailure().recoverWithItem(this::createErrorItem);
321
}
322
323
private Uni<DataItem> processItem(int index) {
324
return externalService.processItem(index)
325
.onItem().ifNull().switchTo(() ->
326
Uni.createFrom().failure(new PermanentException("Null result")))
327
.onFailure(IOException.class).transform(TransientException::new);
328
}
329
330
private DataItem createErrorItem(Throwable error) {
331
return DataItem.newBuilder()
332
.setData("ERROR: " + error.getMessage())
333
.setIndex(-1)
334
.build();
335
}
336
}
337
```
338
339
### Stream Composition
340
341
```java
342
@GrpcService
343
public class CompositeStreamService implements MutinyService {
344
345
public Multi<CombinedData> combineStreams(CombineRequest request) {
346
Multi<DataA> streamA = serviceA.getDataStream(request.getQueryA());
347
Multi<DataB> streamB = serviceB.getDataStream(request.getQueryB());
348
349
return Multi.createBy().combining().streams(streamA, streamB)
350
.using(this::combineData)
351
.onItem().transform(this::enrichCombinedData);
352
}
353
354
public Multi<ProcessedItem> pipelineProcessing(PipelineRequest request) {
355
return inputDataStream(request)
356
.onItem().transformToUniAndConcatenate(this::stage1Processing)
357
.onItem().transformToUniAndConcatenate(this::stage2Processing)
358
.onItem().transformToUniAndConcatenate(this::stage3Processing)
359
.onItem().transform(this::finalizeProcessing);
360
}
361
362
private CombinedData combineData(DataA a, DataB b) {
363
return CombinedData.newBuilder()
364
.setValueA(a.getValue())
365
.setValueB(b.getValue())
366
.setTimestamp(System.currentTimeMillis())
367
.build();
368
}
369
}
370
```
371
372
## Performance Considerations
373
374
1. **Use appropriate buffer sizes** for stream processing
375
2. **Implement backpressure handling** for high-throughput streams
376
3. **Consider compression** for large data transfers
377
4. **Monitor stream observer lifecycle** in development mode
378
5. **Use appropriate concurrency models** for parallel processing
379
6. **Handle cancellation gracefully** to free resources
380
7. **Implement circuit breakers** for external service dependencies