REST framework offering the route model to define non blocking endpoints
—
Support for reactive response types including Uni<T> for single asynchronous values and Multi<T> for streaming data with built-in content-type handling for Server-Sent Events, JSON arrays, and NDJSON.
Support for Mutiny reactive types enabling non-blocking, asynchronous response handling.
/**
* Reactive route methods can return:
* - Uni<T>: Single asynchronous value
* - Multi<T>: Stream of multiple values
* - T: Synchronous value (traditional)
*/
// Single async response
@Route(path = "/async-data", methods = HttpMethod.GET)
public Uni<String> getAsyncData() {
return Uni.createFrom().item("async result");
}
// Streaming response
@Route(path = "/stream-data", methods = HttpMethod.GET, produces = "text/event-stream")
public Multi<String> getStreamData() {
return Multi.createFrom().items("item1", "item2", "item3");
}Usage Examples:
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.time.Duration;
import java.time.Instant;
@ApplicationScoped
public class ReactiveExamples {
// Simple async response
@Route(path = "/async-hello", methods = HttpMethod.GET)
public Uni<String> asyncHello() {
return Uni.createFrom().item("Hello, Async World!")
.onItem().delayIt().by(Duration.ofMillis(100));
}
// Async database operation simulation
@Route(path = "/users/:id", methods = HttpMethod.GET, produces = "application/json")
public Uni<String> getUser(@Param("id") String userId) {
return Uni.createFrom().item(() -> {
// Simulate async database call
return "{\"id\":\"" + userId + "\",\"name\":\"User " + userId + "\"}";
}).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}
// Error handling with Uni
@Route(path = "/risky-operation", methods = HttpMethod.GET)
public Uni<String> riskyOperation() {
return Uni.createFrom().item(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return "Success!";
}).onFailure().recoverWithItem("Recovered from failure");
}
// Chained async operations
@Route(path = "/chained/:id", methods = HttpMethod.GET)
public Uni<String> chainedOperation(@Param("id") String id) {
return Uni.createFrom().item(id)
.onItem().transform(userId -> "User-" + userId)
.onItem().transformToUni(userId -> fetchUserData(userId))
.onItem().transform(userData -> "Processed: " + userData);
}
private Uni<String> fetchUserData(String userId) {
return Uni.createFrom().item("Data for " + userId)
.onItem().delayIt().by(Duration.ofMillis(50));
}
}Built-in support for various streaming content types with appropriate HTTP headers and formatting.
/**
* Content type constants for streaming responses
*/
public class ReactiveRoutes {
/** JSON array streaming - "application/json" */
public static final String APPLICATION_JSON = "application/json";
/** Server-Sent Events - "text/event-stream" */
public static final String EVENT_STREAM = "text/event-stream";
/** Newline-delimited JSON - "application/x-ndjson" */
public static final String ND_JSON = "application/x-ndjson";
/** JSON streaming alias - "application/stream+json" */
public static final String JSON_STREAM = "application/stream+json";
}Streaming Examples:
import io.quarkus.vertx.web.ReactiveRoutes;
@ApplicationScoped
public class StreamingExamples {
// Server-Sent Events stream
@Route(path = "/events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<String> streamEvents() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(tick -> "data: Event " + tick + "\n\n")
.select().first(10);
}
// NDJSON stream
@Route(path = "/ndjson-data", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
public Multi<String> streamNdjson() {
return Multi.createFrom().items(
"{\"id\":1,\"name\":\"Alice\"}",
"{\"id\":2,\"name\":\"Bob\"}",
"{\"id\":3,\"name\":\"Charlie\"}"
);
}
// JSON array stream
@Route(path = "/json-array", methods = HttpMethod.GET, produces = ReactiveRoutes.APPLICATION_JSON)
public Multi<String> streamJsonArray() {
return Multi.createFrom().items("\"item1\"", "\"item2\"", "\"item3\"");
}
// Real-time data feed
@Route(path = "/live-feed", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<String> liveFeed() {
return Multi.createFrom().ticks().every(Duration.ofMillis(500))
.onItem().transform(tick -> {
double value = Math.random() * 100;
return String.format("data: {\"timestamp\":%d,\"value\":%.2f}\n\n",
System.currentTimeMillis(), value);
});
}
// Paginated data stream
@Route(path = "/paginated-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
public Multi<String> paginatedStream(@Param("pageSize") Optional<String> pageSize) {
int size = pageSize.map(Integer::parseInt).orElse(10);
return Multi.createFrom().range(1, size + 1)
.onItem().transform(i -> String.format(
"{\"page\":%d,\"data\":\"Item %d\",\"timestamp\":\"%s\"}",
i, i, Instant.now().toString()));
}
}Advanced Server-Sent Events support with custom event types and IDs.
/**
* Interface for customizing Server-Sent Event structure
*/
public interface ReactiveRoutes.ServerSentEvent<T> {
/**
* Event type/name (optional)
* @return Event type or null for default
*/
default String event() { return null; }
/**
* Event data payload
* @return The data to send
*/
T data();
/**
* Event ID for client-side reconnection
* @return Event ID or -1L for auto-generation
*/
default long id() { return -1L; }
}SSE Examples:
import io.quarkus.vertx.web.ReactiveRoutes.ServerSentEvent;
@ApplicationScoped
public class SSEExamples {
// Basic SSE stream
@Route(path = "/simple-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<String> simpleSSE() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(tick ->
"event: heartbeat\ndata: " + Instant.now() + "\nid: " + tick + "\n\n");
}
// Custom SSE events
@Route(path = "/custom-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<ServerSentEvent<String>> customSSE() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(tick -> new ServerSentEvent<String>() {
@Override
public String event() {
return tick % 2 == 0 ? "even" : "odd";
}
@Override
public String data() {
return "Tick number: " + tick;
}
@Override
public long id() {
return tick;
}
})
.select().first(20);
}
// Mixed event types
@Route(path = "/mixed-events", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<String> mixedEvents() {
Multi<String> statusEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onItem().transform(tick -> "event: status\ndata: System OK\nid: status-" + tick + "\n\n");
Multi<String> dataEvents = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(tick -> "event: data\ndata: " + Math.random() + "\nid: data-" + tick + "\n\n");
return Multi.createBy().merging().streams(statusEvents, dataEvents);
}
// SSE with error handling
@Route(path = "/robust-sse", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<String> robustSSE() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(tick -> {
if (tick > 0 && tick % 10 == 0) {
return "event: milestone\ndata: Reached tick " + tick + "\nid: " + tick + "\n\n";
}
return "event: tick\ndata: " + tick + "\nid: " + tick + "\n\n";
})
.onFailure().retry().atMost(3)
.onFailure().recoverWithItem("event: error\ndata: Stream failed\n\n");
}
}Complex reactive data processing patterns for real-world applications.
@ApplicationScoped
public class ReactiveDataProcessing {
// Transform and filter stream
@Route(path = "/processed-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
public Multi<String> processedStream() {
return Multi.createFrom().range(1, 101) // Numbers 1-100
.onItem().transform(n -> n * 2) // Double each number
.select().where(n -> n % 3 == 0) // Filter multiples of 3
.onItem().transform(n -> String.format("{\"value\":%d,\"processed\":true}", n))
.onOverflow().buffer(10); // Buffer to handle backpressure
}
// Async data transformation
@Route(path = "/async-transform", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
public Multi<String> asyncTransform() {
return Multi.createFrom().items("apple", "banana", "cherry", "date")
.onItem().transformToUniAndMerge(fruit ->
Uni.createFrom().item(fruit)
.onItem().transform(f -> f.toUpperCase())
.onItem().delayIt().by(Duration.ofMillis(100))
.onItem().transform(f -> String.format("{\"fruit\":\"%s\",\"length\":%d}", f, f.length()))
);
}
// Reactive API aggregation
@Route(path = "/aggregated-data/:userId", methods = HttpMethod.GET, produces = "application/json")
public Uni<String> aggregatedData(@Param("userId") String userId) {
Uni<String> userInfo = fetchUserInfo(userId);
Uni<String> userPosts = fetchUserPosts(userId);
Uni<String> userProfile = fetchUserProfile(userId);
return Uni.combine().all().unis(userInfo, userPosts, userProfile)
.asTuple()
.onItem().transform(tuple -> {
return String.format("{\"user\":%s,\"posts\":%s,\"profile\":%s}",
tuple.getItem1(), tuple.getItem2(), tuple.getItem3());
});
}
private Uni<String> fetchUserInfo(String userId) {
return Uni.createFrom().item(String.format("{\"id\":\"%s\",\"name\":\"User %s\"}", userId, userId))
.onItem().delayIt().by(Duration.ofMillis(50));
}
private Uni<String> fetchUserPosts(String userId) {
return Uni.createFrom().item(String.format("[{\"id\":1,\"title\":\"Post by %s\"}]", userId))
.onItem().delayIt().by(Duration.ofMillis(75));
}
private Uni<String> fetchUserProfile(String userId) {
return Uni.createFrom().item(String.format("{\"bio\":\"Profile of %s\",\"followers\":100}", userId))
.onItem().delayIt().by(Duration.ofMillis(25));
}
}Handling backpressure and flow control in streaming scenarios.
@ApplicationScoped
public class BackpressureExamples {
// Buffered stream with overflow handling
@Route(path = "/buffered-stream", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
public Multi<String> bufferedStream() {
return Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onItem().transform(tick -> String.format("{\"tick\":%d,\"timestamp\":%d}", tick, System.currentTimeMillis()))
.onOverflow().buffer(100)
.onOverflow().drop()
.select().first(1000);
}
// Rate-limited stream
@Route(path = "/rate-limited", methods = HttpMethod.GET, produces = ReactiveRoutes.EVENT_STREAM)
public Multi<String> rateLimitedStream() {
return Multi.createFrom().range(1, 1001)
.onItem().transform(i -> "data: Item " + i + "\n\n")
.onItem().call(item -> Uni.createFrom().nullItem()
.onItem().delayIt().by(Duration.ofMillis(100))); // Rate limit
}
// Chunked processing
@Route(path = "/chunked-processing", methods = HttpMethod.GET, produces = ReactiveRoutes.ND_JSON)
public Multi<String> chunkedProcessing() {
return Multi.createFrom().range(1, 1001)
.group().intoLists().of(10) // Process in chunks of 10
.onItem().transformToMultiAndMerge(chunk ->
Multi.createFrom().iterable(chunk)
.onItem().transform(n -> String.format("{\"number\":%d,\"square\":%d}", n, n * n))
);
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-quarkus--quarkus-reactive-routes