CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-quarkus--quarkus-reactive-routes

REST framework offering the route model to define non blocking endpoints

Pending
Overview
Eval results
Files

reactive-streaming.mddocs/

Reactive Streaming

Support for reactive response types including Uni<T> for single asynchronous values and Multi<T> for streaming data with built-in content-type handling for Server-Sent Events, JSON arrays, and NDJSON.

Capabilities

Reactive Response Types

Support for Mutiny reactive types enabling non-blocking, asynchronous response handling.

/**
 * Reactive route methods can return:
 * - Uni<T>: Single asynchronous value
 * - Multi<T>: Stream of multiple values
 * - T: Synchronous value (traditional)
 */

// Single async response
@Route(path = "/async-data", methods = HttpMethod.GET)
public Uni<String> getAsyncData() {
    return Uni.createFrom().item("async result");
}

// Streaming response
@Route(path = "/stream-data", methods = HttpMethod.GET, produces = "text/event-stream")
public Multi<String> getStreamData() {
    return Multi.createFrom().items("item1", "item2", "item3");
}

Usage Examples:

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.time.Duration;
import java.time.Instant;

@ApplicationScoped
public class ReactiveExamples {

    // Simple async response
    @Route(path = "/async-hello", methods = HttpMethod.GET)
    public Uni<String> asyncHello() {
        return Uni.createFrom().item("Hello, Async World!")
                .onItem().delayIt().by(Duration.ofMillis(100));
    }

    // Async database operation simulation
    @Route(path = "/users/:id", methods = HttpMethod.GET, produces = "application/json")
    public Uni<String> getUser(@Param("id") String userId) {
        return Uni.createFrom().item(() -> {
            // Simulate async database call
            return "{\"id\":\"" + userId + "\",\"name\":\"User " + userId + "\"}";
        }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
    }

    // Error handling with Uni
    @Route(path = "/risky-operation", methods = HttpMethod.GET)
    public Uni<String> riskyOperation() {
        return Uni.createFrom().item(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Random failure");
            }
            return "Success!";
        }).onFailure().recoverWithItem("Recovered from failure");
    }

    // Chained async operations
    @Route(path = "/chained/:id", methods = HttpMethod.GET)
    public Uni<String> chainedOperation(@Param("id") String id) {
        return Uni.createFrom().item(id)
                .onItem().transform(userId -> "User-" + userId)
                .onItem().transformToUni(userId -> fetchUserData(userId))
                .onItem().transform(userData -> "Processed: " + userData);
    }
    
    private Uni<String> fetchUserData(String userId) {
        return Uni.createFrom().item("Data for " + userId)
                .onItem().delayIt().by(Duration.ofMillis(50));
    }
}

Streaming Content Types

Built-in support for various streaming content types with appropriate HTTP headers and formatting.

/**
 * Content type constants for streaming responses
 */
public class ReactiveRoutes {
    /** JSON array streaming - "application/json" */
    public static final String APPLICATION_JSON = "application/json";
    
    /** Server-Sent Events - "text/event-stream" */
    public static final String EVENT_STREAM = "text/event-stream";
    
    /** Newline-delimited JSON - "application/x-ndjson" */
    public static final String ND_JSON = "application/x-ndjson";
    
    /** JSON streaming alias - "application/stream+json" */
    public static final String JSON_STREAM = "application/stream+json";
}

Streaming Examples:

import io.quarkus.vertx.web.ReactiveRoutes;

@ApplicationScoped
public class StreamingExamples {

    // Server-Sent Events stream
    @Route(path = "/events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<String> streamEvents() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(tick -> "data: Event " + tick + "\n\n")
                .select().first(10);
    }

    // NDJSON stream  
    @Route(path = "/ndjson-data", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
    public Multi<String> streamNdjson() {
        return Multi.createFrom().items(
                "{\"id\":1,\"name\":\"Alice\"}",
                "{\"id\":2,\"name\":\"Bob\"}",
                "{\"id\":3,\"name\":\"Charlie\"}"
        );
    }

    // JSON array stream
    @Route(path = "/json-array", methods = HttpMethod.GET, produces = ReactiveRoutes.APPLICATION_JSON)
    public Multi<String> streamJsonArray() {
        return Multi.createFrom().items("\"item1\"", "\"item2\"", "\"item3\"");
    }

    // Real-time data feed
    @Route(path = "/live-feed", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<String> liveFeed() {
        return Multi.createFrom().ticks().every(Duration.ofMillis(500))
                .onItem().transform(tick -> {
                    double value = Math.random() * 100;
                    return String.format("data: {\"timestamp\":%d,\"value\":%.2f}\n\n", 
                                       System.currentTimeMillis(), value);
                });
    }

    // Paginated data stream
    @Route(path = "/paginated-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
    public Multi<String> paginatedStream(@Param("pageSize") Optional<String> pageSize) {
        int size = pageSize.map(Integer::parseInt).orElse(10);
        
        return Multi.createFrom().range(1, size + 1)
                .onItem().transform(i -> String.format(
                    "{\"page\":%d,\"data\":\"Item %d\",\"timestamp\":\"%s\"}", 
                    i, i, Instant.now().toString()));
    }
}

Server-Sent Events

Advanced Server-Sent Events support with custom event types and IDs.

/**
 * Interface for customizing Server-Sent Event structure
 */
public interface ReactiveRoutes.ServerSentEvent<T> {
    /**
     * Event type/name (optional)
     * @return Event type or null for default
     */
    default String event() { return null; }
    
    /**
     * Event data payload
     * @return The data to send
     */
    T data();
    
    /**
     * Event ID for client-side reconnection
     * @return Event ID or -1L for auto-generation
     */
    default long id() { return -1L; }
}

SSE Examples:

import io.quarkus.vertx.web.ReactiveRoutes.ServerSentEvent;

@ApplicationScoped
public class SSEExamples {

    // Basic SSE stream
    @Route(path = "/simple-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<String> simpleSSE() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(tick -> 
                    "event: heartbeat\ndata: " + Instant.now() + "\nid: " + tick + "\n\n");
    }

    // Custom SSE events
    @Route(path = "/custom-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<ServerSentEvent<String>> customSSE() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(tick -> new ServerSentEvent<String>() {
                    @Override
                    public String event() {
                        return tick % 2 == 0 ? "even" : "odd";
                    }
                    
                    @Override
                    public String data() {
                        return "Tick number: " + tick;
                    }
                    
                    @Override
                    public long id() {
                        return tick;
                    }
                })
                .select().first(20);
    }

    // Mixed event types
    @Route(path = "/mixed-events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<String> mixedEvents() {
        Multi<String> statusEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onItem().transform(tick -> "event: status\ndata: System OK\nid: status-" + tick + "\n\n");
                
        Multi<String> dataEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(tick -> "event: data\ndata: " + Math.random() + "\nid: data-" + tick + "\n\n");
                
        return Multi.createBy().merging().streams(statusEvents, dataEvents);
    }

    // SSE with error handling
    @Route(path = "/robust-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<String> robustSSE() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(tick -> {
                    if (tick > 0 && tick % 10 == 0) {
                        return "event: milestone\ndata: Reached tick " + tick + "\nid: " + tick + "\n\n";
                    }
                    return "event: tick\ndata: " + tick + "\nid: " + tick + "\n\n";
                })
                .onFailure().retry().atMost(3)
                .onFailure().recoverWithItem("event: error\ndata: Stream failed\n\n");
    }
}

Reactive Data Processing

Complex reactive data processing patterns for real-world applications.

@ApplicationScoped
public class ReactiveDataProcessing {

    // Transform and filter stream
    @Route(path = "/processed-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
    public Multi<String> processedStream() {
        return Multi.createFrom().range(1, 101)  // Numbers 1-100
                .onItem().transform(n -> n * 2)   // Double each number
                .select().where(n -> n % 3 == 0)  // Filter multiples of 3
                .onItem().transform(n -> String.format("{\"value\":%d,\"processed\":true}", n))
                .onOverflow().buffer(10);         // Buffer to handle backpressure
    }

    // Async data transformation
    @Route(path = "/async-transform", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
    public Multi<String> asyncTransform() {
        return Multi.createFrom().items("apple", "banana", "cherry", "date")
                .onItem().transformToUniAndMerge(fruit -> 
                    Uni.createFrom().item(fruit)
                        .onItem().transform(f -> f.toUpperCase())
                        .onItem().delayIt().by(Duration.ofMillis(100))
                        .onItem().transform(f -> String.format("{\"fruit\":\"%s\",\"length\":%d}", f, f.length()))
                );
    }

    // Reactive API aggregation
    @Route(path = "/aggregated-data/:userId", methods = HttpMethod.GET, produces = "application/json")
    public Uni<String> aggregatedData(@Param("userId") String userId) {
        Uni<String> userInfo = fetchUserInfo(userId);
        Uni<String> userPosts = fetchUserPosts(userId);
        Uni<String> userProfile = fetchUserProfile(userId);
        
        return Uni.combine().all().unis(userInfo, userPosts, userProfile)
                .asTuple()
                .onItem().transform(tuple -> {
                    return String.format("{\"user\":%s,\"posts\":%s,\"profile\":%s}", 
                                       tuple.getItem1(), tuple.getItem2(), tuple.getItem3());
                });
    }
    
    private Uni<String> fetchUserInfo(String userId) {
        return Uni.createFrom().item(String.format("{\"id\":\"%s\",\"name\":\"User %s\"}", userId, userId))
                .onItem().delayIt().by(Duration.ofMillis(50));
    }
    
    private Uni<String> fetchUserPosts(String userId) {
        return Uni.createFrom().item(String.format("[{\"id\":1,\"title\":\"Post by %s\"}]", userId))
                .onItem().delayIt().by(Duration.ofMillis(75));
    }
    
    private Uni<String> fetchUserProfile(String userId) {
        return Uni.createFrom().item(String.format("{\"bio\":\"Profile of %s\",\"followers\":100}", userId))
                .onItem().delayIt().by(Duration.ofMillis(25));
    }
}

Backpressure and Flow Control

Handling backpressure and flow control in streaming scenarios.

@ApplicationScoped
public class BackpressureExamples {

    // Buffered stream with overflow handling
    @Route(path = "/buffered-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
    public Multi<String> bufferedStream() {
        return Multi.createFrom().ticks().every(Duration.ofMillis(10))
                .onItem().transform(tick -> String.format("{\"tick\":%d,\"timestamp\":%d}", tick, System.currentTimeMillis()))
                .onOverflow().buffer(100)
                .onOverflow().drop()
                .select().first(1000);
    }

    // Rate-limited stream
    @Route(path = "/rate-limited", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
    public Multi<String> rateLimitedStream() {
        return Multi.createFrom().range(1, 1001)
                .onItem().transform(i -> "data: Item " + i + "\n\n")
                .onItem().call(item -> Uni.createFrom().nullItem()
                        .onItem().delayIt().by(Duration.ofMillis(100))); // Rate limit
    }

    // Chunked processing
    @Route(path = "/chunked-processing", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
    public Multi<String> chunkedProcessing() {
        return Multi.createFrom().range(1, 1001)
                .group().intoLists().of(10)  // Process in chunks of 10
                .onItem().transformToMultiAndMerge(chunk -> 
                    Multi.createFrom().iterable(chunk)
                        .onItem().transform(n -> String.format("{\"number\":%d,\"square\":%d}", n, n * n))
                );
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-quarkus--quarkus-reactive-routes

docs

index.md

parameter-injection.md

reactive-streaming.md

request-context.md

request-filtering.md

route-declaration.md

tile.json