Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
AsyncDataStream provides utilities for asynchronous I/O operations in Apache Flink, enabling efficient integration with external systems without blocking stream processing. This is particularly useful for database lookups, REST API calls, and other I/O-bound operations.
Create asynchronous processing operators that can handle concurrent I/O operations.
/**
* Apply async function with ordered results
* @param in - input DataStream
* @param func - async function to apply
* @param timeout - timeout for async operations
* @param timeUnit - time unit for timeout
* @return async processed DataStream with ordered results
*/
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit
);
/**
* Apply async function with unordered results for better performance
* @param in - input DataStream
* @param func - async function to apply
* @param timeout - timeout for async operations
* @param timeUnit - time unit for timeout
* @return async processed DataStream with unordered results
*/
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit
);
/**
* Apply async function with ordered results and capacity
* @param in - input DataStream
* @param func - async function to apply
* @param timeout - timeout for async operations
* @param timeUnit - time unit for timeout
* @param capacity - capacity of async operator
* @return async processed DataStream
*/
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity
);
/**
* Apply async function with unordered results and capacity
* @param in - input DataStream
* @param func - async function to apply
* @param timeout - timeout for async operations
* @param timeUnit - time unit for timeout
* @param capacity - capacity of async operator
* @return async processed DataStream
*/
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity
);Usage Examples:
// Database lookup example
DataStream<String> input = env.fromElements("key1", "key2", "key3");
// Ordered async processing - maintains event order
DataStream<String> orderedResult = AsyncDataStream.orderedWait(
input,
new DatabaseAsyncFunction(),
1000, TimeUnit.MILLISECONDS
);
// Unordered async processing - better performance, no order guarantee
DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(
input,
new DatabaseAsyncFunction(),
1000, TimeUnit.MILLISECONDS,
100 // capacity
);
// Custom async function for database lookup
class DatabaseAsyncFunction implements AsyncFunction<String, String> {
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient();
}
@Override
public void asyncInvoke(String key, ResultFuture<String> resultFuture) throws Exception {
// Perform async database lookup
CompletableFuture<String> future = client.asyncGet(key);
future.whenComplete((result, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
} else {
resultFuture.complete(Collections.singletonList(result));
}
});
}
}Use RichAsyncFunction for async operations with access to runtime context and lifecycle methods.
/**
* Rich async function with lifecycle methods and runtime context access
*/
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
// Inherits open(), close(), getRuntimeContext() from AbstractRichFunction
abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
// Optional timeout handling
void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception;
}Usage Examples:
DataStream<UserEvent> events = env.addSource(new UserEventSource());
DataStream<EnrichedUserEvent> enriched = AsyncDataStream.orderedWait(
events,
new UserEnrichmentFunction(),
2000, TimeUnit.MILLISECONDS
);
class UserEnrichmentFunction extends RichAsyncFunction<UserEvent, EnrichedUserEvent> {
private transient UserProfileService profileService;
private transient MetricGroup asyncMetrics;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Initialize external service
profileService = new UserProfileService();
// Get metrics for monitoring
asyncMetrics = getRuntimeContext()
.getMetricGroup()
.addGroup("async-enrichment");
}
@Override
public void asyncInvoke(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {
// Async call to external service
CompletableFuture<UserProfile> profileFuture = profileService.getUserProfile(event.getUserId());
profileFuture.whenComplete((profile, throwable) -> {
if (throwable != null) {
// Handle failure - could emit default value or propagate error
resultFuture.complete(Collections.singletonList(
new EnrichedUserEvent(event, UserProfile.defaultProfile())
));
} else {
resultFuture.complete(Collections.singletonList(
new EnrichedUserEvent(event, profile)
));
}
});
}
@Override
public void timeout(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {
// Handle timeout - provide default enrichment
resultFuture.complete(Collections.singletonList(
new EnrichedUserEvent(event, UserProfile.timeoutProfile())
));
}
@Override
public void close() throws Exception {
if (profileService != null) {
profileService.close();
}
super.close();
}
}/**
* Interface for asynchronous functions
* @param <IN> - input type
* @param <OUT> - output type
*/
interface AsyncFunction<IN, OUT> extends Function {
/**
* Trigger async operation for the given input
* @param input - input element
* @param resultFuture - future to complete with results
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
/**
* Optional method to handle timeouts
* @param input - input element that timed out
* @param resultFuture - future to complete with results or error
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(new TimeoutException("Async operation timed out"));
}
}
/**
* Rich async function with lifecycle methods
*/
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
// Inherits lifecycle methods: open(), close(), getRuntimeContext()
}/**
* Future for completing asynchronous operations
* @param <OUT> - output type
*/
interface ResultFuture<OUT> {
/**
* Complete the async operation with results
* @param result - collection of result elements
*/
void complete(Collection<OUT> result);
/**
* Complete the async operation with an exception
* @param error - exception that occurred
*/
void completeExceptionally(Throwable error);
}/**
* Utility class for creating async operators
*/
class AsyncDataStream {
// Static factory methods for creating async operators
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11