or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/io.quarkus/quarkus-resteasy-reactive@3.15.x

docs

index.md
tile.json

tessl/maven-io-quarkus--quarkus-resteasy-reactive

tessl install tessl/maven-io-quarkus--quarkus-resteasy-reactive@3.15.0

A Jakarta REST implementation utilizing build time processing and Vert.x for high-performance REST endpoints with reactive capabilities in cloud-native environments.

streaming.mddocs/reference/

Streaming and Reactive

Quarkus REST provides native support for reactive streaming with Server-Sent Events (SSE), chunked responses, and reactive types integration through Mutiny's Multi and Uni.

Import

import org.jboss.resteasy.reactive.RestMulti;
import org.jboss.resteasy.reactive.RestStreamElementType;
import org.jboss.resteasy.reactive.RestSseElementType;
import org.jboss.resteasy.reactive.server.core.StreamingOutputStream;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.ws.rs.core.MediaType;

RestMulti<T>

Wrapper for Multi<T> that allows setting HTTP status codes and headers for streaming responses.

public abstract class RestMulti<T> extends Multi<T> {
    // Factory method - returns a Builder for configuration
    public static <T> Builder<T> fromMultiData(Multi<T> multi);

    // Factory methods - return RestMulti directly (no builder)
    public static <T, R> RestMulti<R> fromUniResponse(
        Uni<T> uni,
        Function<T, Multi<R>> dataExtractor
    );

    public static <T, R> RestMulti<R> fromUniResponse(
        Uni<T> uni,
        Function<T, Multi<R>> dataExtractor,
        Function<T, Map<String, List<String>>> headersExtractor
    );

    public static <T, R> RestMulti<R> fromUniResponse(
        Uni<T> uni,
        Function<T, Multi<R>> dataExtractor,
        Function<T, Map<String, List<String>>> headersExtractor,
        Function<T, Integer> statusExtractor
    );

    // Access response metadata
    public abstract Integer getStatus();
    public abstract Map<String, List<String>> getHeaders();
}

RestMulti.Builder<T>

Builder for configuring RestMulti instances created via fromMultiData().

public static class Builder<T> {
    // Configure HTTP response metadata
    public Builder<T> status(int status);
    public Builder<T> header(String name, String value);

    // Configure streaming behavior
    public Builder<T> withDemand(long demand);
    public Builder<T> encodeAsJsonArray(boolean encodeAsJsonArray);

    // Build the RestMulti
    public RestMulti<T> build();
}

fromMultiData

Creates a RestMulti from a Multi data stream with custom status and headers.

Usage:

@GET
@Path("/stream")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Item> stream() {
    Multi<Item> items = Multi.createFrom().items(
        new Item("1", "First"),
        new Item("2", "Second"),
        new Item("3", "Third")
    );

    return RestMulti.fromMultiData(items)
        .status(200)
        .header("X-Stream-Type", "items")
        .header("Cache-Control", "no-cache")
        .build();
}

fromUniResponse

Creates a RestMulti from a Uni<T> with a function to extract the streaming data. Returns RestMulti directly (not a builder).

Three overloaded versions:

  1. fromUniResponse(uni, dataExtractor) - Basic version
  2. fromUniResponse(uni, dataExtractor, headersExtractor) - With custom headers
  3. fromUniResponse(uni, dataExtractor, headersExtractor, statusExtractor) - With custom headers and status

Usage:

@GET
@Path("/user/{id}/events")
public Multi<Event> streamUserEvents(@RestPath String id) {
    return RestMulti.fromUniResponse(
        userService.findByIdAsync(id),  // Uni<User>
        user -> eventService.streamEventsForUser(user.getId())  // Function<User, Multi<Event>>
    );
}

// With custom headers and status
@GET
@Path("/user/{id}/feed")
public Multi<FeedItem> streamUserFeed(@RestPath String id) {
    return RestMulti.fromUniResponse(
        userService.findByIdAsync(id),  // Uni<User>
        user -> feedService.streamFeed(user.getId()),  // Data extractor
        user -> {  // Headers extractor
            Map<String, List<String>> headers = new HashMap<>();
            headers.put("X-User-Id", List.of(user.getId()));
            headers.put("X-User-Name", List.of(user.getName()));
            return headers;
        },
        user -> 200  // Status extractor
    );
}

withDemand

Sets the number of items to process concurrently.

Usage:

@GET
@Path("/stream")
public Multi<Item> stream() {
    return RestMulti.fromMultiData(itemService.streamAll())
        .withDemand(10)  // Process 10 items concurrently
        .build();
}

encodeAsJsonArray

Controls whether JSON items are encoded as a single array (true) or as separate JSON objects (false).

Usage:

@GET
@Path("/stream")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Item> stream() {
    return RestMulti.fromMultiData(itemService.streamAll())
        .encodeAsJsonArray(false)  // Send as separate JSON objects
        .build();
}

@RestStreamElementType

Specifies the MIME type for individual elements in a stream (overrides @Produces for element serialization).

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RestStreamElementType {
    String value();  // MIME type for stream elements
}

Usage:

@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Item> streamItems() {
    // Each Item serialized as JSON in SSE events
    return itemService.streamAll();
}

@GET
@Path("/logs")
@Produces(MediaType.TEXT_PLAIN)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> streamLogs() {
    return logService.tailLogs();
}

@RestSseElementType (Deprecated)

Legacy annotation for SSE element type. Use @RestStreamElementType instead.

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Deprecated
public @interface RestSseElementType {
    String value();  // MIME type for SSE elements
}

StreamingOutputStream

Marker class indicating streaming context, used internally for type resolution.

public class StreamingOutputStream extends OutputStream {
    // Marker type for streaming context
}

Streaming Patterns

Server-Sent Events (SSE)

@GET
@Path("/events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> streamEvents() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(tick -> "Event " + tick);
}

@GET
@Path("/notifications")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Notification> streamNotifications() {
    return notificationService.streamForCurrentUser();
}

Chunked JSON Streaming

// Separate JSON objects (NDJSON format)
@GET
@Path("/items")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Item> streamItems() {
    return RestMulti.fromMultiData(itemService.streamAll())
        .encodeAsJsonArray(false)
        .build();
}

// JSON array
@GET
@Path("/items/array")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Item> streamItemsArray() {
    return RestMulti.fromMultiData(itemService.streamAll())
        .encodeAsJsonArray(true)
        .build();
}

Text Streaming

@GET
@Path("/logs")
@Produces(MediaType.TEXT_PLAIN)
public Multi<String> tailLogs() {
    return logService.streamLogs();
}

Reactive Responses

@GET
@Path("/{id}")
public Uni<Item> getAsync(@RestPath String id) {
    return itemService.findByIdAsync(id);
}

@GET
@Path("/search")
public Multi<Item> search(@RestQuery String q) {
    return searchService.searchAsync(q);
}

@POST
public Uni<RestResponse<Item>> createAsync(Item item) {
    return itemService.createAsync(item)
        .map(created -> RestResponse.status(Status.CREATED, created));
}

Backpressure and Flow Control

@GET
@Path("/stream")
public Multi<Item> streamWithBackpressure() {
    return RestMulti.fromMultiData(
        Multi.createFrom().items(/* large dataset */)
            .onOverflow().buffer(100)  // Buffer up to 100 items
            .onItem().call(item -> processAsync(item))
    )
    .withDemand(10)  // Process 10 items concurrently
    .build();
}

Conditional Streaming

@GET
@Path("/user/{id}/events")
public Multi<Event> streamUserEvents(@RestPath String id) {
    return RestMulti.fromUniResponse(
        userService.findByIdAsync(id),
        user -> eventService.streamEventsForUser(user.getId()),
        user -> Map.of("X-User-Id", List.of(id)),  // Custom headers
        user -> 200  // Custom status
    );
}

Error Handling in Streams

@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Item> streamWithErrorHandling() {
    return itemService.streamAll()
        .onFailure().retry().atMost(3)
        .onFailure().recoverWithItem(() -> new Item("error", "Failed"));
}

Integration with Mutiny

Quarkus REST natively integrates with Mutiny reactive types:

Uni<T>

Represents a single asynchronous value:

// Async resource method
@GET
@Path("/{id}")
public Uni<Item> get(@RestPath String id) {
    return itemService.findByIdAsync(id);
}

// Async with RestResponse
@GET
@Path("/{id}")
public Uni<RestResponse<Item>> getWithStatus(@RestPath String id) {
    return itemService.findByIdAsync(id)
        .map(item -> item != null
            ? RestResponse.ok(item)
            : RestResponse.notFound()
        );
}

// Chaining async operations
@POST
public Uni<Item> create(Item item) {
    return itemService.createAsync(item)
        .chain(created -> auditService.logCreationAsync(created))
        .map(created -> created);
}

Multi<T>

Represents a stream of values:

// Streaming items
@GET
@Path("/stream")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Item> stream() {
    return itemService.streamAll();
}

// Transforming streams
@GET
@Path("/stream/transformed")
public Multi<ItemDTO> streamTransformed() {
    return itemService.streamAll()
        .map(item -> new ItemDTO(item));
}

// Filtering streams
@GET
@Path("/stream/active")
public Multi<Item> streamActive() {
    return itemService.streamAll()
        .filter(item -> item.isActive());
}

Media Types

Common media types for streaming:

  • SERVER_SENT_EVENTS (text/event-stream) - Server-Sent Events
  • APPLICATION_JSON - JSON streaming
  • TEXT_PLAIN - Text streaming
  • APPLICATION_NDJSON - Newline-delimited JSON
  • APPLICATION_OCTET_STREAM - Binary streaming