CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-quarkus--quarkus-grpc

Quarkus gRPC extension that enables implementing and consuming gRPC services with reactive and imperative programming models.

Pending
Overview
Eval results
Files

reactive-streaming.mddocs/

Reactive Streaming

Low-level reactive streaming utilities for implementing custom gRPC call patterns with Mutiny integration. These utilities bridge between traditional gRPC StreamObserver patterns and reactive Mutiny types.

Capabilities

ServerCalls Class

Provides server-side call implementations that bridge gRPC StreamObserver patterns with Mutiny reactive types (Uni and Multi).

public class ServerCalls {
    
    /**
     * Handle unary calls: single request -> single response
     */
    public static <I, O> void oneToOne(
        I request, 
        StreamObserver<O> response, 
        String compression,
        Function<I, Uni<O>> implementation
    );
    
    /**
     * Handle server streaming calls: single request -> stream of responses
     */
    public static <I, O> void oneToMany(
        I request, 
        StreamObserver<O> response, 
        String compression,
        Function<I, Multi<O>> implementation
    );
    
    /**
     * Handle client streaming calls: stream of requests -> single response
     */
    public static <I, O> StreamObserver<I> manyToOne(
        StreamObserver<O> response,
        Function<Multi<I>, Uni<O>> implementation
    );
    
    /**
     * Handle bidirectional streaming calls: stream of requests -> stream of responses
     */
    public static <I, O> StreamObserver<I> manyToMany(
        StreamObserver<O> response,
        Function<Multi<I>, Multi<O>> implementation
    );
    
    // Development mode utilities
    public static void setStreamCollector(StreamCollector collector);
    public static StreamCollector getStreamCollector();
}

Usage Examples:

import io.quarkus.grpc.stubs.ServerCalls;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;

public class CustomGrpcService extends GreetingGrpc.GreetingImplBase {
    
    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        ServerCalls.oneToOne(request, responseObserver, null, this::processHello);
    }
    
    @Override
    public void sayHelloStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        ServerCalls.oneToMany(request, responseObserver, "gzip", this::processHelloStream);
    }
    
    @Override
    public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloResponse> responseObserver) {
        return ServerCalls.manyToOne(responseObserver, this::processClientStream);
    }
    
    @Override
    public StreamObserver<HelloRequest> sayHelloBidirectional(StreamObserver<HelloResponse> responseObserver) {
        return ServerCalls.manyToMany(responseObserver, this::processBidirectional);
    }
    
    private Uni<HelloResponse> processHello(HelloRequest request) {
        return Uni.createFrom().item(
            HelloResponse.newBuilder()
                .setMessage("Hello " + request.getName())
                .build()
        );
    }
    
    private Multi<HelloResponse> processHelloStream(HelloRequest request) {
        return Multi.createFrom().range(1, 4)
            .onItem().transform(i -> 
                HelloResponse.newBuilder()
                    .setMessage("Hello " + request.getName() + " #" + i)
                    .build());
    }
    
    private Uni<HelloResponse> processClientStream(Multi<HelloRequest> requests) {
        return requests
            .collect().asList()
            .onItem().transform(list -> 
                HelloResponse.newBuilder()
                    .setMessage("Received " + list.size() + " messages")
                    .build());
    }
    
    private Multi<HelloResponse> processBidirectional(Multi<HelloRequest> requests) {
        return requests
            .onItem().transform(request ->
                HelloResponse.newBuilder()
                    .setMessage("Echo: " + request.getName())
                    .build());
    }
}

ClientCalls Class

Provides client-side call implementations that convert traditional gRPC patterns into reactive Mutiny types.

public class ClientCalls {
    
    /**
     * Convert unary call to Uni
     */
    public static <I, O> Uni<O> oneToOne(
        I request, 
        BiConsumer<I, StreamObserver<O>> delegate
    );
    
    /**
     * Convert server streaming call to Multi
     */
    public static <I, O> Multi<O> oneToMany(
        I request, 
        BiConsumer<I, StreamObserver<O>> delegate
    );
    
    /**
     * Convert client streaming call to Uni
     */
    public static <I, O> Uni<O> manyToOne(
        Multi<I> items, 
        Function<StreamObserver<O>, StreamObserver<I>> delegate
    );
    
    /**
     * Convert bidirectional streaming call to Multi
     */
    public static <I, O> Multi<O> manyToMany(
        Multi<I> items, 
        Function<StreamObserver<O>, StreamObserver<I>> delegate
    );
}

Usage Examples:

import io.quarkus.grpc.stubs.ClientCalls;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;

public class CustomGrpcClient {
    
    private final GreetingGrpc.GreetingStub stub;
    
    public CustomGrpcClient(Channel channel) {
        this.stub = GreetingGrpc.newStub(channel);
    }
    
    public Uni<HelloResponse> sayHello(HelloRequest request) {
        return ClientCalls.oneToOne(request, stub::sayHello);
    }
    
    public Multi<HelloResponse> sayHelloStream(HelloRequest request) {
        return ClientCalls.oneToMany(request, stub::sayHelloStream);
    }
    
    public Uni<HelloResponse> sayHelloClientStream(Multi<HelloRequest> requests) {
        return ClientCalls.manyToOne(requests, stub::sayHelloClientStream);
    }
    
    public Multi<HelloResponse> sayHelloBidirectional(Multi<HelloRequest> requests) {
        return ClientCalls.manyToMany(requests, stub::sayHelloBidirectional);
    }
}

Stream Observer Implementations

Various StreamObserver implementations for different reactive patterns:

public class UniStreamObserver<T> implements StreamObserver<T> {
    // Bridges StreamObserver to UniEmitter
}

public class MultiStreamObserver<T> implements StreamObserver<T> {
    // Bridges StreamObserver to MultiEmitter  
}

public class ManyToManyObserver<T> implements StreamObserver<T> {
    // Specialized observer for bidirectional streaming
}

public class ManyToOneObserver<T> implements StreamObserver<T> {
    // Specialized observer for client streaming to unary response
}

StreamCollector Interface

Development mode support for collecting and managing stream observers:

public interface StreamCollector {
    StreamCollector NO_OP = new StreamCollector() {
        @Override
        public void add(StreamObserver<?> streamObserver) {}
        
        @Override  
        public void remove(StreamObserver<?> streamObserver) {}
    };
    
    void add(StreamObserver<?> streamObserver);
    void remove(StreamObserver<?> streamObserver);
}

Advanced Streaming Patterns

Custom Stream Processing

@GrpcService
public class StreamProcessingService implements MutinyService {
    
    public Multi<ProcessedData> processDataStream(Multi<RawData> rawDataStream) {
        return rawDataStream
            .onItem().transform(this::validateData)
            .onFailure().recoverWithItem(this::createErrorData)
            .onItem().transformToUniAndConcatenate(this::enrichData)
            .onItem().transform(this::processData)
            .onOverflow().buffer(100)
            .onCancellation().invoke(() -> cleanupResources());
    }
    
    private RawData validateData(RawData data) {
        if (data.getValue() == null) {
            throw new IllegalArgumentException("Value cannot be null");
        }
        return data;
    }
    
    private RawData createErrorData(Throwable error) {
        return RawData.newBuilder()
            .setValue("ERROR: " + error.getMessage())
            .build();
    }
    
    private Uni<RawData> enrichData(RawData data) {
        return externalService.enrich(data)
            .onFailure().recoverWithItem(data); // Continue with original on failure
    }
    
    private ProcessedData processData(RawData data) {
        return ProcessedData.newBuilder()
            .setResult(data.getValue().toUpperCase())
            .setTimestamp(System.currentTimeMillis())
            .build();
    }
}

Backpressure Handling

@GrpcService
public class BackpressureService implements MutinyService {
    
    public Multi<DataResponse> streamLargeDataset(DataRequest request) {
        return databaseService.queryLargeDataset(request.getQuery())
            .onOverflow().buffer(1000) // Buffer up to 1000 items
            .onOverflow().drop() // Drop items if buffer is full
            .onItem().transform(this::convertToResponse)
            .onItem().call(response -> {
                // Apply backpressure based on response processing time
                return Uni.createFrom().nullItem()
                    .onItem().delayIt().by(Duration.ofMillis(10));
            });
    }
    
    public Multi<StreamResponse> controlledStream(StreamRequest request) {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .onItem().transform(tick -> generateResponse(tick))
            .onOverflow().bufferSize(50)
            .onRequest().invoke(requested -> 
                log.info("Client requested {} items", requested));
    }
}

Error Recovery in Streams

@GrpcService  
public class ResilientStreamService implements MutinyService {
    
    public Multi<DataItem> resilientDataStream(StreamRequest request) {
        return Multi.createFrom().range(1, 1000)
            .onItem().transformToUniAndConcatenate(this::processItem)
            .onFailure(TransientException.class).retry()
                .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))
                .atMost(3)
            .onFailure(PermanentException.class).recoverWithCompletion()
            .onFailure().recoverWithItem(this::createErrorItem);
    }
    
    private Uni<DataItem> processItem(int index) {
        return externalService.processItem(index)
            .onItem().ifNull().switchTo(() -> 
                Uni.createFrom().failure(new PermanentException("Null result")))
            .onFailure(IOException.class).transform(TransientException::new);
    }
    
    private DataItem createErrorItem(Throwable error) {
        return DataItem.newBuilder()
            .setData("ERROR: " + error.getMessage())
            .setIndex(-1)
            .build();
    }
}

Stream Composition

@GrpcService
public class CompositeStreamService implements MutinyService {
    
    public Multi<CombinedData> combineStreams(CombineRequest request) {
        Multi<DataA> streamA = serviceA.getDataStream(request.getQueryA());
        Multi<DataB> streamB = serviceB.getDataStream(request.getQueryB());
        
        return Multi.createBy().combining().streams(streamA, streamB)
            .using(this::combineData)
            .onItem().transform(this::enrichCombinedData);
    }
    
    public Multi<ProcessedItem> pipelineProcessing(PipelineRequest request) {
        return inputDataStream(request)
            .onItem().transformToUniAndConcatenate(this::stage1Processing)
            .onItem().transformToUniAndConcatenate(this::stage2Processing)  
            .onItem().transformToUniAndConcatenate(this::stage3Processing)
            .onItem().transform(this::finalizeProcessing);
    }
    
    private CombinedData combineData(DataA a, DataB b) {
        return CombinedData.newBuilder()
            .setValueA(a.getValue())
            .setValueB(b.getValue())
            .setTimestamp(System.currentTimeMillis())
            .build();
    }
}

Performance Considerations

  1. Use appropriate buffer sizes for stream processing
  2. Implement backpressure handling for high-throughput streams
  3. Consider compression for large data transfers
  4. Monitor stream observer lifecycle in development mode
  5. Use appropriate concurrency models for parallel processing
  6. Handle cancellation gracefully to free resources
  7. Implement circuit breakers for external service dependencies

Install with Tessl CLI

npx tessl i tessl/maven-io-quarkus--quarkus-grpc

docs

client-usage.md

configuration.md

exception-handling.md

index.md

interceptors.md

reactive-streaming.md

service-implementation.md

tile.json