Quarkus gRPC extension that enables implementing and consuming gRPC services with reactive and imperative programming models.
—
Low-level reactive streaming utilities for implementing custom gRPC call patterns with Mutiny integration. These utilities bridge between traditional gRPC StreamObserver patterns and reactive Mutiny types.
Provides server-side call implementations that bridge gRPC StreamObserver patterns with Mutiny reactive types (Uni and Multi).
public class ServerCalls {
/**
* Handle unary calls: single request -> single response
*/
public static <I, O> void oneToOne(
I request,
StreamObserver<O> response,
String compression,
Function<I, Uni<O>> implementation
);
/**
* Handle server streaming calls: single request -> stream of responses
*/
public static <I, O> void oneToMany(
I request,
StreamObserver<O> response,
String compression,
Function<I, Multi<O>> implementation
);
/**
* Handle client streaming calls: stream of requests -> single response
*/
public static <I, O> StreamObserver<I> manyToOne(
StreamObserver<O> response,
Function<Multi<I>, Uni<O>> implementation
);
/**
* Handle bidirectional streaming calls: stream of requests -> stream of responses
*/
public static <I, O> StreamObserver<I> manyToMany(
StreamObserver<O> response,
Function<Multi<I>, Multi<O>> implementation
);
// Development mode utilities
public static void setStreamCollector(StreamCollector collector);
public static StreamCollector getStreamCollector();
}Usage Examples:
import io.quarkus.grpc.stubs.ServerCalls;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
public class CustomGrpcService extends GreetingGrpc.GreetingImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
ServerCalls.oneToOne(request, responseObserver, null, this::processHello);
}
@Override
public void sayHelloStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
ServerCalls.oneToMany(request, responseObserver, "gzip", this::processHelloStream);
}
@Override
public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloResponse> responseObserver) {
return ServerCalls.manyToOne(responseObserver, this::processClientStream);
}
@Override
public StreamObserver<HelloRequest> sayHelloBidirectional(StreamObserver<HelloResponse> responseObserver) {
return ServerCalls.manyToMany(responseObserver, this::processBidirectional);
}
private Uni<HelloResponse> processHello(HelloRequest request) {
return Uni.createFrom().item(
HelloResponse.newBuilder()
.setMessage("Hello " + request.getName())
.build()
);
}
private Multi<HelloResponse> processHelloStream(HelloRequest request) {
return Multi.createFrom().range(1, 4)
.onItem().transform(i ->
HelloResponse.newBuilder()
.setMessage("Hello " + request.getName() + " #" + i)
.build());
}
private Uni<HelloResponse> processClientStream(Multi<HelloRequest> requests) {
return requests
.collect().asList()
.onItem().transform(list ->
HelloResponse.newBuilder()
.setMessage("Received " + list.size() + " messages")
.build());
}
private Multi<HelloResponse> processBidirectional(Multi<HelloRequest> requests) {
return requests
.onItem().transform(request ->
HelloResponse.newBuilder()
.setMessage("Echo: " + request.getName())
.build());
}
}Provides client-side call implementations that convert traditional gRPC patterns into reactive Mutiny types.
public class ClientCalls {
/**
* Convert unary call to Uni
*/
public static <I, O> Uni<O> oneToOne(
I request,
BiConsumer<I, StreamObserver<O>> delegate
);
/**
* Convert server streaming call to Multi
*/
public static <I, O> Multi<O> oneToMany(
I request,
BiConsumer<I, StreamObserver<O>> delegate
);
/**
* Convert client streaming call to Uni
*/
public static <I, O> Uni<O> manyToOne(
Multi<I> items,
Function<StreamObserver<O>, StreamObserver<I>> delegate
);
/**
* Convert bidirectional streaming call to Multi
*/
public static <I, O> Multi<O> manyToMany(
Multi<I> items,
Function<StreamObserver<O>, StreamObserver<I>> delegate
);
}Usage Examples:
import io.quarkus.grpc.stubs.ClientCalls;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
public class CustomGrpcClient {
private final GreetingGrpc.GreetingStub stub;
public CustomGrpcClient(Channel channel) {
this.stub = GreetingGrpc.newStub(channel);
}
public Uni<HelloResponse> sayHello(HelloRequest request) {
return ClientCalls.oneToOne(request, stub::sayHello);
}
public Multi<HelloResponse> sayHelloStream(HelloRequest request) {
return ClientCalls.oneToMany(request, stub::sayHelloStream);
}
public Uni<HelloResponse> sayHelloClientStream(Multi<HelloRequest> requests) {
return ClientCalls.manyToOne(requests, stub::sayHelloClientStream);
}
public Multi<HelloResponse> sayHelloBidirectional(Multi<HelloRequest> requests) {
return ClientCalls.manyToMany(requests, stub::sayHelloBidirectional);
}
}Various StreamObserver implementations for different reactive patterns:
public class UniStreamObserver<T> implements StreamObserver<T> {
// Bridges StreamObserver to UniEmitter
}
public class MultiStreamObserver<T> implements StreamObserver<T> {
// Bridges StreamObserver to MultiEmitter
}
public class ManyToManyObserver<T> implements StreamObserver<T> {
// Specialized observer for bidirectional streaming
}
public class ManyToOneObserver<T> implements StreamObserver<T> {
// Specialized observer for client streaming to unary response
}Development mode support for collecting and managing stream observers:
public interface StreamCollector {
StreamCollector NO_OP = new StreamCollector() {
@Override
public void add(StreamObserver<?> streamObserver) {}
@Override
public void remove(StreamObserver<?> streamObserver) {}
};
void add(StreamObserver<?> streamObserver);
void remove(StreamObserver<?> streamObserver);
}@GrpcService
public class StreamProcessingService implements MutinyService {
public Multi<ProcessedData> processDataStream(Multi<RawData> rawDataStream) {
return rawDataStream
.onItem().transform(this::validateData)
.onFailure().recoverWithItem(this::createErrorData)
.onItem().transformToUniAndConcatenate(this::enrichData)
.onItem().transform(this::processData)
.onOverflow().buffer(100)
.onCancellation().invoke(() -> cleanupResources());
}
private RawData validateData(RawData data) {
if (data.getValue() == null) {
throw new IllegalArgumentException("Value cannot be null");
}
return data;
}
private RawData createErrorData(Throwable error) {
return RawData.newBuilder()
.setValue("ERROR: " + error.getMessage())
.build();
}
private Uni<RawData> enrichData(RawData data) {
return externalService.enrich(data)
.onFailure().recoverWithItem(data); // Continue with original on failure
}
private ProcessedData processData(RawData data) {
return ProcessedData.newBuilder()
.setResult(data.getValue().toUpperCase())
.setTimestamp(System.currentTimeMillis())
.build();
}
}@GrpcService
public class BackpressureService implements MutinyService {
public Multi<DataResponse> streamLargeDataset(DataRequest request) {
return databaseService.queryLargeDataset(request.getQuery())
.onOverflow().buffer(1000) // Buffer up to 1000 items
.onOverflow().drop() // Drop items if buffer is full
.onItem().transform(this::convertToResponse)
.onItem().call(response -> {
// Apply backpressure based on response processing time
return Uni.createFrom().nullItem()
.onItem().delayIt().by(Duration.ofMillis(10));
});
}
public Multi<StreamResponse> controlledStream(StreamRequest request) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(tick -> generateResponse(tick))
.onOverflow().bufferSize(50)
.onRequest().invoke(requested ->
log.info("Client requested {} items", requested));
}
}@GrpcService
public class ResilientStreamService implements MutinyService {
public Multi<DataItem> resilientDataStream(StreamRequest request) {
return Multi.createFrom().range(1, 1000)
.onItem().transformToUniAndConcatenate(this::processItem)
.onFailure(TransientException.class).retry()
.withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))
.atMost(3)
.onFailure(PermanentException.class).recoverWithCompletion()
.onFailure().recoverWithItem(this::createErrorItem);
}
private Uni<DataItem> processItem(int index) {
return externalService.processItem(index)
.onItem().ifNull().switchTo(() ->
Uni.createFrom().failure(new PermanentException("Null result")))
.onFailure(IOException.class).transform(TransientException::new);
}
private DataItem createErrorItem(Throwable error) {
return DataItem.newBuilder()
.setData("ERROR: " + error.getMessage())
.setIndex(-1)
.build();
}
}@GrpcService
public class CompositeStreamService implements MutinyService {
public Multi<CombinedData> combineStreams(CombineRequest request) {
Multi<DataA> streamA = serviceA.getDataStream(request.getQueryA());
Multi<DataB> streamB = serviceB.getDataStream(request.getQueryB());
return Multi.createBy().combining().streams(streamA, streamB)
.using(this::combineData)
.onItem().transform(this::enrichCombinedData);
}
public Multi<ProcessedItem> pipelineProcessing(PipelineRequest request) {
return inputDataStream(request)
.onItem().transformToUniAndConcatenate(this::stage1Processing)
.onItem().transformToUniAndConcatenate(this::stage2Processing)
.onItem().transformToUniAndConcatenate(this::stage3Processing)
.onItem().transform(this::finalizeProcessing);
}
private CombinedData combineData(DataA a, DataB b) {
return CombinedData.newBuilder()
.setValueA(a.getValue())
.setValueB(b.getValue())
.setTimestamp(System.currentTimeMillis())
.build();
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-quarkus--quarkus-grpc