docs
Support for asynchronous request processing using Callable, DeferredResult, and Server-Sent Events (SSE) for improved scalability and real-time communication capabilities.
Enables asynchronous request processing where the result is set at a later time, possibly from a different thread.
/**
* DeferredResult provides an alternative to using a Callable for asynchronous request processing.
* An application can produce a DeferredResult from a separate thread and submit it by setting its result.
*
* @param <T> the result type
*/
public class DeferredResult<T> {
/**
* Create a DeferredResult.
*/
public DeferredResult() {}
/**
* Create a DeferredResult with a timeout value.
* By default not set in which case the default configured in the MVC
* Java Config or the MVC namespace is used, or if that's not configured,
* then the timeout depends on the default of the underlying server.
*
* @param timeoutValue timeout value in milliseconds
*/
public DeferredResult(Long timeoutValue) {}
/**
* Create a DeferredResult with a timeout value and a default result to use
* in case of timeout.
*
* @param timeoutValue timeout value in milliseconds
* @param timeoutResult the result to use
*/
public DeferredResult(Long timeoutValue, Object timeoutResult) {}
/**
* Return true if this DeferredResult is no longer usable either because it was
* previously set or because the underlying request expired.
*
* @return whether the DeferredResult is expired
*/
public final boolean isSetOrExpired() {}
/**
* Return true if the DeferredResult has been set.
*
* @return whether a result has been set
*/
public boolean hasResult() {}
/**
* Return the result, or null if the result wasn't set. Since the result can also
* be null, it is recommended to use hasResult() first to check if there was a result
* prior to calling this method.
*
* @return the result
*/
public Object getResult() {}
/**
* Set the value for the DeferredResult and handle it.
*
* @param result the value to set
* @return true if the result was set and passed on for handling;
* false if the result was already set or the async request expired
*/
public boolean setResult(T result) {}
/**
* Set an error value for the DeferredResult and handle it.
* The value may be an Exception or Throwable in which case it will be
* processed as if a handler raised the exception.
*
* @param result the error result value
* @return true if the result was set to the error value and passed on for handling;
* false if the result was already set or the async request expired
*/
public boolean setErrorResult(Object result) {}
/**
* Register code to invoke when the async request times out.
* This method is called from a container thread when an async request times out.
*
* @param callback the callback to invoke
*/
public void onTimeout(Runnable callback) {}
/**
* Register code to invoke when an error occurred during the async request.
* This method is called from a container thread when an error occurs while
* processing an async request.
*
* @param callback a consumer that accepts the throwable
*/
public void onError(Consumer<Throwable> callback) {}
/**
* Register code to invoke when the async request completes.
* This method is called from a container thread when an async request completes
* for any reason including timeout and network error.
*
* @param callback the callback to invoke
*/
public void onCompletion(Runnable callback) {}
/**
* Provide a handler to use to handle the result value.
*
* @param resultHandler the handler
*/
public final void setResultHandler(DeferredResultHandler resultHandler) {}
}Usage Example:
@RestController
@RequestMapping("/api/async")
public class AsyncController {
private final ConcurrentHashMap<String, DeferredResult<String>> requests = new ConcurrentHashMap<>();
@GetMapping("/subscribe/{id}")
public DeferredResult<String> subscribe(@PathVariable String id) {
DeferredResult<String> deferredResult = new DeferredResult<>(30000L, "Timeout");
deferredResult.onTimeout(() ->
log.warn("Request {} timed out", id));
deferredResult.onCompletion(() ->
requests.remove(id));
deferredResult.onError((throwable) ->
log.error("Error processing request {}", id, throwable));
requests.put(id, deferredResult);
return deferredResult;
}
@PostMapping("/notify/{id}")
public ResponseEntity<String> notifySubscriber(
@PathVariable String id,
@RequestBody String message) {
DeferredResult<String> deferredResult = requests.get(id);
if (deferredResult != null && !deferredResult.isSetOrExpired()) {
deferredResult.setResult(message);
return ResponseEntity.ok("Notification sent");
}
return ResponseEntity.notFound().build();
}
// Example with error handling
@GetMapping("/process/{id}")
public DeferredResult<ProcessResult> processAsync(@PathVariable Long id) {
DeferredResult<ProcessResult> deferredResult = new DeferredResult<>(60000L);
asyncService.processAsync(id, result -> {
deferredResult.setResult(result);
}, error -> {
deferredResult.setErrorResult(error);
});
return deferredResult;
}
}Base class for emitting multiple objects in an HTTP response.
/**
* A controller method return value type for asynchronous request processing where
* one or more objects are written to the response.
*
* While DeferredResult is used to produce a single result, a ResponseBodyEmitter
* can be used to send multiple objects where each object is written with a compatible
* HttpMessageConverter.
*/
public class ResponseBodyEmitter {
/**
* Create a new ResponseBodyEmitter instance.
*/
public ResponseBodyEmitter() {}
/**
* Create a ResponseBodyEmitter with a custom timeout value.
* By default not set in which case the default configured in the MVC Java Config
* or the MVC namespace is used, or if that's not set, then the timeout depends
* on the default of the underlying server.
*
* @param timeout timeout value in milliseconds
*/
public ResponseBodyEmitter(Long timeout) {}
/**
* Return the configured timeout value, if any.
*
* @return the timeout value
*/
public Long getTimeout() {}
/**
* Invoked when the async request times out. Implementations may invoke
* completeWithError(Throwable) to complete processing.
*
* @param callback the callback to invoke
*/
public void onTimeout(Runnable callback) {}
/**
* Invoked when an error occurs during async processing.
*
* @param callback a consumer that accepts the throwable
*/
public void onError(Consumer<Throwable> callback) {}
/**
* Invoked when the async request completes for any reason including timeout
* and network error. This method is useful for resource cleanup.
*
* @param callback the callback to invoke
*/
public void onCompletion(Runnable callback) {}
/**
* Write the given object to the response.
* The object is written with a compatible HttpMessageConverter.
*
* @param object the object to write
* @throws IOException in case of I/O errors
* @throws IllegalStateException wraps any other errors
*/
public void send(Object object) throws IOException {}
/**
* Write the given object to the response, using the specified media type hint.
*
* @param object the object to write
* @param mediaType a MediaType hint for selecting an HttpMessageConverter
* @throws IOException in case of I/O errors
* @throws IllegalStateException wraps any other errors
*/
public void send(Object object, MediaType mediaType) throws IOException {}
/**
* Complete request processing by performing a dispatch into the servlet container,
* where Spring MVC will be invoked once more, and completes the request processing
* lifecycle.
*
* NOTE: this method should be called by the application to complete request processing.
* It should not be called after container-related events such as an error while
* container is writing the response or after the servlet container calls complete().
*/
public void complete() {}
/**
* Complete request processing with an error.
* A dispatch is made into the app server where Spring MVC will pass the exception
* to its exception handling mechanism.
*
* @param ex the exception to pass on for exception handling
*/
public void completeWithError(Throwable ex) {}
}Usage Example:
@RestController
@RequestMapping("/api/stream")
public class StreamingController {
@GetMapping("/numbers")
public ResponseBodyEmitter streamNumbers() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
executor.execute(() -> {
try {
for (int i = 0; i < 10; i++) {
emitter.send(i);
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
@GetMapping("/logs")
public ResponseBodyEmitter streamLogs() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(300000L); // 5 minutes
emitter.onTimeout(() -> {
log.info("Streaming timed out");
});
emitter.onCompletion(() -> {
log.info("Streaming completed");
});
logStreamingService.streamLogs(
log -> {
try {
emitter.send(log, MediaType.TEXT_PLAIN);
} catch (IOException e) {
emitter.completeWithError(e);
}
},
() -> emitter.complete()
);
return emitter;
}
}Specialized ResponseBodyEmitter for Server-Sent Events (SSE).
/**
* A specialization of ResponseBodyEmitter for sending Server-Sent Events.
*/
public class SseEmitter extends ResponseBodyEmitter {
/**
* Create a new SseEmitter instance.
*/
public SseEmitter() {}
/**
* Create a SseEmitter with a custom timeout value.
*
* @param timeout the timeout value in milliseconds
*/
public SseEmitter(Long timeout) {}
/**
* Send the object formatted as a single SSE "data" line. It's equivalent to:
* SseEmitter.event().data(Object).
*
* @param object the object to write
* @throws IOException in case of I/O errors
* @throws IllegalStateException wraps any other errors
*/
@Override
public void send(Object object) throws IOException {}
/**
* Send the object formatted as a single SSE "data" line. It's equivalent to:
* SseEmitter.event().data(Object, MediaType).
*
* @param object the object to write
* @param mediaType a MediaType hint for selecting an HttpMessageConverter
* @throws IOException in case of I/O errors
* @throws IllegalStateException wraps any other errors
*/
@Override
public void send(Object object, MediaType mediaType) throws IOException {}
/**
* Send an SSE event prepared with the given builder.
*
* @param builder a builder for an SSE formatted event
* @throws IOException in case of I/O errors
*/
public void send(SseEventBuilder builder) throws IOException {}
/**
* Return a builder for an SSE event.
*
* @return a builder for SSE event
*/
public static SseEventBuilder event() {}
/**
* A builder for an SSE event.
*/
public interface SseEventBuilder {
/**
* Add an SSE "id" line.
*
* @param id the event id
* @return this builder
*/
SseEventBuilder id(String id) {}
/**
* Add an SSE "event" line.
*
* @param name the event name
* @return this builder
*/
SseEventBuilder name(String name) {}
/**
* Add an SSE "retry" line.
*
* @param reconnectTime the reconnect time in milliseconds
* @return this builder
*/
SseEventBuilder reconnectTime(long reconnectTime) {}
/**
* Add an SSE "comment" line.
*
* @param comment the comment
* @return this builder
*/
SseEventBuilder comment(String comment) {}
/**
* Add an SSE "data" line for the given object and the media type to use for
* formatting the object via an HttpMessageConverter.
*
* @param object the object to write as data
* @return this builder
*/
SseEventBuilder data(Object object) {}
/**
* Add an SSE "data" line for the given object and the media type to use for
* formatting the object via an HttpMessageConverter.
*
* @param object the object to write as data
* @param mediaType the media type
* @return this builder
*/
SseEventBuilder data(Object object, MediaType mediaType) {}
/**
* Build the SSE event.
*
* @return a set of data to be sent
*/
Set<DataWithMediaType> build() {}
}
}Usage Example:
@RestController
@RequestMapping("/api/events")
public class SseController {
@GetMapping(path = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamNotifications() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitter.onTimeout(() -> {
emitter.complete();
});
emitter.onCompletion(() -> {
notificationService.removeListener(emitter);
});
notificationService.addListener(notification -> {
try {
emitter.send(SseEmitter.event()
.id(notification.getId())
.name("notification")
.data(notification));
} catch (IOException e) {
emitter.completeWithError(e);
}
});
return emitter;
}
@GetMapping("/stock-prices")
public SseEmitter streamStockPrices(@RequestParam String symbol) {
SseEmitter emitter = new SseEmitter();
stockService.subscribe(symbol, price -> {
try {
emitter.send(SseEmitter.event()
.name("price-update")
.data(price)
.reconnectTime(1000L));
} catch (IOException e) {
emitter.completeWithError(e);
}
});
emitter.onCompletion(() -> stockService.unsubscribe(symbol));
return emitter;
}
@GetMapping("/progress/{taskId}")
public SseEmitter trackProgress(@PathVariable String taskId) {
SseEmitter emitter = new SseEmitter(600000L); // 10 minutes
taskService.trackProgress(taskId, progress -> {
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(progress.getPercentage()))
.data(progress)
.comment("Progress update"));
if (progress.isComplete()) {
emitter.complete();
}
} catch (IOException e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}Simple approach to asynchronous request processing where the controller returns a Callable.
/**
* A task that returns a result and may throw an exception.
* This is a standard Java interface that can be used for async request processing.
*
* @param <V> the result type
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}Usage Example:
@RestController
@RequestMapping("/api/async")
public class CallableController {
@GetMapping("/long-task")
public Callable<String> executeLongRunningTask() {
return () -> {
Thread.sleep(5000); // Simulate long task
return "Task completed";
};
}
@GetMapping("/report/{id}")
public Callable<Report> generateReport(@PathVariable Long id) {
return () -> {
// This runs in a separate thread
return reportService.generateReport(id);
};
}
@PostMapping("/process")
public Callable<ProcessResult> processData(@RequestBody ProcessRequest request) {
return () -> {
// Long-running processing
ProcessResult result = processingService.process(request);
return result;
};
}
}Provides additional options for async request processing beyond Callable.
/**
* Holder for a Callable, a timeout value, and a task executor.
*
* @param <V> the value type
*/
public class WebAsyncTask<V> {
/**
* Create a WebAsyncTask wrapping the given Callable.
*
* @param callable the callable for concurrent handling
*/
public WebAsyncTask(Callable<V> callable) {}
/**
* Create a WebAsyncTask with a timeout value and a Callable.
*
* @param timeout a timeout value in milliseconds
* @param callable the callable for concurrent handling
*/
public WebAsyncTask(long timeout, Callable<V> callable) {}
/**
* Create a WebAsyncTask with a timeout value, an executor name, and a Callable.
*
* @param timeout a timeout value in milliseconds
* @param executorName the name of an AsyncTaskExecutor bean
* @param callable the callable for concurrent handling
*/
public WebAsyncTask(Long timeout, String executorName, Callable<V> callable) {}
/**
* Create a WebAsyncTask with a timeout value, an executor, and a Callable.
*
* @param timeout a timeout value in milliseconds
* @param executor the executor to use for concurrent handling
* @param callable the callable for concurrent handling
*/
public WebAsyncTask(Long timeout, AsyncTaskExecutor executor, Callable<V> callable) {}
/**
* Return the Callable for concurrent handling (never null).
*
* @return the callable
*/
public Callable<?> getCallable() {}
/**
* Return the timeout value in milliseconds, or null if not set.
*
* @return the timeout value
*/
public Long getTimeout() {}
/**
* Return the AsyncTaskExecutor to use for concurrent handling, or null.
*
* @return the executor
*/
public AsyncTaskExecutor getExecutor() {}
/**
* Register code to invoke when the async request times out.
*
* @param callback the callback to invoke on timeout
*/
public void onTimeout(Callable<V> callback) {}
/**
* Register code to invoke when an error occurred during async processing.
*
* @param callback a callback that accepts the error
*/
public void onError(Callable<V> callback) {}
/**
* Register code to invoke when the async request completes.
*
* @param runnable the runnable to invoke on completion
*/
public void onCompletion(Runnable runnable) {}
}Usage Example:
@RestController
@RequestMapping("/api/tasks")
public class WebAsyncTaskController {
@Autowired
private AsyncTaskExecutor taskExecutor;
@GetMapping("/custom-timeout")
public WebAsyncTask<String> taskWithTimeout() {
Callable<String> callable = () -> {
Thread.sleep(2000);
return "Task completed";
};
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5000L, callable);
asyncTask.onTimeout(() -> "Request timed out");
asyncTask.onCompletion(() ->
log.info("Task completed"));
return asyncTask;
}
@GetMapping("/custom-executor")
public WebAsyncTask<ProcessResult> taskWithCustomExecutor() {
Callable<ProcessResult> callable = () -> {
return processingService.heavyComputation();
};
WebAsyncTask<ProcessResult> asyncTask =
new WebAsyncTask<>(30000L, taskExecutor, callable);
asyncTask.onError(() -> {
log.error("Error during processing");
return new ProcessResult("error");
});
return asyncTask;
}
}Raw streaming output for direct control over the response output stream.
/**
* A controller method return value type for asynchronous request processing
* where the application can write directly to the response OutputStream
* without holding up the Servlet container thread.
*/
@FunctionalInterface
public interface StreamingResponseBody {
/**
* A callback for writing to the response body.
*
* @param outputStream the stream to write to
* @throws IOException in case of I/O errors
*/
void writeTo(OutputStream outputStream) throws IOException;
}