or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-usage.mdconfiguration.mdexception-handling.mdindex.mdinterceptors.mdreactive-streaming.mdservice-implementation.md

reactive-streaming.mddocs/

0

# Reactive Streaming

1

2

Low-level reactive streaming utilities for implementing custom gRPC call patterns with Mutiny integration. These utilities bridge between traditional gRPC StreamObserver patterns and reactive Mutiny types.

3

4

## Capabilities

5

6

### ServerCalls Class

7

8

Provides server-side call implementations that bridge gRPC StreamObserver patterns with Mutiny reactive types (`Uni` and `Multi`).

9

10

```java { .api }

11

public class ServerCalls {

12

13

/**

14

* Handle unary calls: single request -> single response

15

*/

16

public static <I, O> void oneToOne(

17

I request,

18

StreamObserver<O> response,

19

String compression,

20

Function<I, Uni<O>> implementation

21

);

22

23

/**

24

* Handle server streaming calls: single request -> stream of responses

25

*/

26

public static <I, O> void oneToMany(

27

I request,

28

StreamObserver<O> response,

29

String compression,

30

Function<I, Multi<O>> implementation

31

);

32

33

/**

34

* Handle client streaming calls: stream of requests -> single response

35

*/

36

public static <I, O> StreamObserver<I> manyToOne(

37

StreamObserver<O> response,

38

Function<Multi<I>, Uni<O>> implementation

39

);

40

41

/**

42

* Handle bidirectional streaming calls: stream of requests -> stream of responses

43

*/

44

public static <I, O> StreamObserver<I> manyToMany(

45

StreamObserver<O> response,

46

Function<Multi<I>, Multi<O>> implementation

47

);

48

49

// Development mode utilities

50

public static void setStreamCollector(StreamCollector collector);

51

public static StreamCollector getStreamCollector();

52

}

53

```

54

55

**Usage Examples:**

56

57

```java

58

import io.quarkus.grpc.stubs.ServerCalls;

59

import io.smallrye.mutiny.Uni;

60

import io.smallrye.mutiny.Multi;

61

62

public class CustomGrpcService extends GreetingGrpc.GreetingImplBase {

63

64

@Override

65

public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {

66

ServerCalls.oneToOne(request, responseObserver, null, this::processHello);

67

}

68

69

@Override

70

public void sayHelloStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {

71

ServerCalls.oneToMany(request, responseObserver, "gzip", this::processHelloStream);

72

}

73

74

@Override

75

public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloResponse> responseObserver) {

76

return ServerCalls.manyToOne(responseObserver, this::processClientStream);

77

}

78

79

@Override

80

public StreamObserver<HelloRequest> sayHelloBidirectional(StreamObserver<HelloResponse> responseObserver) {

81

return ServerCalls.manyToMany(responseObserver, this::processBidirectional);

82

}

83

84

private Uni<HelloResponse> processHello(HelloRequest request) {

85

return Uni.createFrom().item(

86

HelloResponse.newBuilder()

87

.setMessage("Hello " + request.getName())

88

.build()

89

);

90

}

91

92

private Multi<HelloResponse> processHelloStream(HelloRequest request) {

93

return Multi.createFrom().range(1, 4)

94

.onItem().transform(i ->

95

HelloResponse.newBuilder()

96

.setMessage("Hello " + request.getName() + " #" + i)

97

.build());

98

}

99

100

private Uni<HelloResponse> processClientStream(Multi<HelloRequest> requests) {

101

return requests

102

.collect().asList()

103

.onItem().transform(list ->

104

HelloResponse.newBuilder()

105

.setMessage("Received " + list.size() + " messages")

106

.build());

107

}

108

109

private Multi<HelloResponse> processBidirectional(Multi<HelloRequest> requests) {

110

return requests

111

.onItem().transform(request ->

112

HelloResponse.newBuilder()

113

.setMessage("Echo: " + request.getName())

114

.build());

115

}

116

}

117

```

118

119

### ClientCalls Class

120

121

Provides client-side call implementations that convert traditional gRPC patterns into reactive Mutiny types.

122

123

```java { .api }

124

public class ClientCalls {

125

126

/**

127

* Convert unary call to Uni

128

*/

129

public static <I, O> Uni<O> oneToOne(

130

I request,

131

BiConsumer<I, StreamObserver<O>> delegate

132

);

133

134

/**

135

* Convert server streaming call to Multi

136

*/

137

public static <I, O> Multi<O> oneToMany(

138

I request,

139

BiConsumer<I, StreamObserver<O>> delegate

140

);

141

142

/**

143

* Convert client streaming call to Uni

144

*/

145

public static <I, O> Uni<O> manyToOne(

146

Multi<I> items,

147

Function<StreamObserver<O>, StreamObserver<I>> delegate

148

);

149

150

/**

151

* Convert bidirectional streaming call to Multi

152

*/

153

public static <I, O> Multi<O> manyToMany(

154

Multi<I> items,

155

Function<StreamObserver<O>, StreamObserver<I>> delegate

156

);

157

}

158

```

159

160

**Usage Examples:**

161

162

```java

163

import io.quarkus.grpc.stubs.ClientCalls;

164

import io.smallrye.mutiny.Uni;

165

import io.smallrye.mutiny.Multi;

166

167

public class CustomGrpcClient {

168

169

private final GreetingGrpc.GreetingStub stub;

170

171

public CustomGrpcClient(Channel channel) {

172

this.stub = GreetingGrpc.newStub(channel);

173

}

174

175

public Uni<HelloResponse> sayHello(HelloRequest request) {

176

return ClientCalls.oneToOne(request, stub::sayHello);

177

}

178

179

public Multi<HelloResponse> sayHelloStream(HelloRequest request) {

180

return ClientCalls.oneToMany(request, stub::sayHelloStream);

181

}

182

183

public Uni<HelloResponse> sayHelloClientStream(Multi<HelloRequest> requests) {

184

return ClientCalls.manyToOne(requests, stub::sayHelloClientStream);

185

}

186

187

public Multi<HelloResponse> sayHelloBidirectional(Multi<HelloRequest> requests) {

188

return ClientCalls.manyToMany(requests, stub::sayHelloBidirectional);

189

}

190

}

191

```

192

193

### Stream Observer Implementations

194

195

Various StreamObserver implementations for different reactive patterns:

196

197

```java { .api }

198

public class UniStreamObserver<T> implements StreamObserver<T> {

199

// Bridges StreamObserver to UniEmitter

200

}

201

202

public class MultiStreamObserver<T> implements StreamObserver<T> {

203

// Bridges StreamObserver to MultiEmitter

204

}

205

206

public class ManyToManyObserver<T> implements StreamObserver<T> {

207

// Specialized observer for bidirectional streaming

208

}

209

210

public class ManyToOneObserver<T> implements StreamObserver<T> {

211

// Specialized observer for client streaming to unary response

212

}

213

```

214

215

### StreamCollector Interface

216

217

Development mode support for collecting and managing stream observers:

218

219

```java { .api }

220

public interface StreamCollector {

221

StreamCollector NO_OP = new StreamCollector() {

222

@Override

223

public void add(StreamObserver<?> streamObserver) {}

224

225

@Override

226

public void remove(StreamObserver<?> streamObserver) {}

227

};

228

229

void add(StreamObserver<?> streamObserver);

230

void remove(StreamObserver<?> streamObserver);

231

}

232

```

233

234

## Advanced Streaming Patterns

235

236

### Custom Stream Processing

237

238

```java

239

@GrpcService

240

public class StreamProcessingService implements MutinyService {

241

242

public Multi<ProcessedData> processDataStream(Multi<RawData> rawDataStream) {

243

return rawDataStream

244

.onItem().transform(this::validateData)

245

.onFailure().recoverWithItem(this::createErrorData)

246

.onItem().transformToUniAndConcatenate(this::enrichData)

247

.onItem().transform(this::processData)

248

.onOverflow().buffer(100)

249

.onCancellation().invoke(() -> cleanupResources());

250

}

251

252

private RawData validateData(RawData data) {

253

if (data.getValue() == null) {

254

throw new IllegalArgumentException("Value cannot be null");

255

}

256

return data;

257

}

258

259

private RawData createErrorData(Throwable error) {

260

return RawData.newBuilder()

261

.setValue("ERROR: " + error.getMessage())

262

.build();

263

}

264

265

private Uni<RawData> enrichData(RawData data) {

266

return externalService.enrich(data)

267

.onFailure().recoverWithItem(data); // Continue with original on failure

268

}

269

270

private ProcessedData processData(RawData data) {

271

return ProcessedData.newBuilder()

272

.setResult(data.getValue().toUpperCase())

273

.setTimestamp(System.currentTimeMillis())

274

.build();

275

}

276

}

277

```

278

279

### Backpressure Handling

280

281

```java

282

@GrpcService

283

public class BackpressureService implements MutinyService {

284

285

public Multi<DataResponse> streamLargeDataset(DataRequest request) {

286

return databaseService.queryLargeDataset(request.getQuery())

287

.onOverflow().buffer(1000) // Buffer up to 1000 items

288

.onOverflow().drop() // Drop items if buffer is full

289

.onItem().transform(this::convertToResponse)

290

.onItem().call(response -> {

291

// Apply backpressure based on response processing time

292

return Uni.createFrom().nullItem()

293

.onItem().delayIt().by(Duration.ofMillis(10));

294

});

295

}

296

297

public Multi<StreamResponse> controlledStream(StreamRequest request) {

298

return Multi.createFrom().ticks().every(Duration.ofSeconds(1))

299

.onItem().transform(tick -> generateResponse(tick))

300

.onOverflow().bufferSize(50)

301

.onRequest().invoke(requested ->

302

log.info("Client requested {} items", requested));

303

}

304

}

305

```

306

307

### Error Recovery in Streams

308

309

```java

310

@GrpcService

311

public class ResilientStreamService implements MutinyService {

312

313

public Multi<DataItem> resilientDataStream(StreamRequest request) {

314

return Multi.createFrom().range(1, 1000)

315

.onItem().transformToUniAndConcatenate(this::processItem)

316

.onFailure(TransientException.class).retry()

317

.withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))

318

.atMost(3)

319

.onFailure(PermanentException.class).recoverWithCompletion()

320

.onFailure().recoverWithItem(this::createErrorItem);

321

}

322

323

private Uni<DataItem> processItem(int index) {

324

return externalService.processItem(index)

325

.onItem().ifNull().switchTo(() ->

326

Uni.createFrom().failure(new PermanentException("Null result")))

327

.onFailure(IOException.class).transform(TransientException::new);

328

}

329

330

private DataItem createErrorItem(Throwable error) {

331

return DataItem.newBuilder()

332

.setData("ERROR: " + error.getMessage())

333

.setIndex(-1)

334

.build();

335

}

336

}

337

```

338

339

### Stream Composition

340

341

```java

342

@GrpcService

343

public class CompositeStreamService implements MutinyService {

344

345

public Multi<CombinedData> combineStreams(CombineRequest request) {

346

Multi<DataA> streamA = serviceA.getDataStream(request.getQueryA());

347

Multi<DataB> streamB = serviceB.getDataStream(request.getQueryB());

348

349

return Multi.createBy().combining().streams(streamA, streamB)

350

.using(this::combineData)

351

.onItem().transform(this::enrichCombinedData);

352

}

353

354

public Multi<ProcessedItem> pipelineProcessing(PipelineRequest request) {

355

return inputDataStream(request)

356

.onItem().transformToUniAndConcatenate(this::stage1Processing)

357

.onItem().transformToUniAndConcatenate(this::stage2Processing)

358

.onItem().transformToUniAndConcatenate(this::stage3Processing)

359

.onItem().transform(this::finalizeProcessing);

360

}

361

362

private CombinedData combineData(DataA a, DataB b) {

363

return CombinedData.newBuilder()

364

.setValueA(a.getValue())

365

.setValueB(b.getValue())

366

.setTimestamp(System.currentTimeMillis())

367

.build();

368

}

369

}

370

```

371

372

## Performance Considerations

373

374

1. **Use appropriate buffer sizes** for stream processing

375

2. **Implement backpressure handling** for high-throughput streams

376

3. **Consider compression** for large data transfers

377

4. **Monitor stream observer lifecycle** in development mode

378

5. **Use appropriate concurrency models** for parallel processing

379

6. **Handle cancellation gracefully** to free resources

380

7. **Implement circuit breakers** for external service dependencies