CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Pending
Overview
Eval results
Files

async-io.mddocs/

Async I/O Operations

AsyncDataStream provides utilities for asynchronous I/O operations in Apache Flink, enabling efficient integration with external systems without blocking stream processing. This is particularly useful for database lookups, REST API calls, and other I/O-bound operations.

Capabilities

Async Data Stream Operations

Create asynchronous processing operators that can handle concurrent I/O operations.

/**
 * Apply async function with ordered results
 * @param in - input DataStream
 * @param func - async function to apply
 * @param timeout - timeout for async operations
 * @param timeUnit - time unit for timeout
 * @return async processed DataStream with ordered results
 */
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit
);

/**
 * Apply async function with unordered results for better performance
 * @param in - input DataStream
 * @param func - async function to apply
 * @param timeout - timeout for async operations
 * @param timeUnit - time unit for timeout
 * @return async processed DataStream with unordered results
 */
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit
);

/**
 * Apply async function with ordered results and capacity
 * @param in - input DataStream
 * @param func - async function to apply
 * @param timeout - timeout for async operations
 * @param timeUnit - time unit for timeout
 * @param capacity - capacity of async operator
 * @return async processed DataStream
 */
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit,
    int capacity
);

/**
 * Apply async function with unordered results and capacity
 * @param in - input DataStream
 * @param func - async function to apply
 * @param timeout - timeout for async operations
 * @param timeUnit - time unit for timeout
 * @param capacity - capacity of async operator
 * @return async processed DataStream
 */
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit,
    int capacity
);

Usage Examples:

// Database lookup example
DataStream<String> input = env.fromElements("key1", "key2", "key3");

// Ordered async processing - maintains event order
DataStream<String> orderedResult = AsyncDataStream.orderedWait(
    input,
    new DatabaseAsyncFunction(),
    1000, TimeUnit.MILLISECONDS
);

// Unordered async processing - better performance, no order guarantee
DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(
    input,
    new DatabaseAsyncFunction(),
    1000, TimeUnit.MILLISECONDS,
    100 // capacity
);

// Custom async function for database lookup
class DatabaseAsyncFunction implements AsyncFunction<String, String> {
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient();
    }

    @Override
    public void asyncInvoke(String key, ResultFuture<String> resultFuture) throws Exception {
        // Perform async database lookup
        CompletableFuture<String> future = client.asyncGet(key);
        
        future.whenComplete((result, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally(throwable);
            } else {
                resultFuture.complete(Collections.singletonList(result));
            }
        });
    }
}

Rich Async Function

Use RichAsyncFunction for async operations with access to runtime context and lifecycle methods.

/**
 * Rich async function with lifecycle methods and runtime context access
 */
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
    // Inherits open(), close(), getRuntimeContext() from AbstractRichFunction
    abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
    
    // Optional timeout handling
    void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception;
}

Usage Examples:

DataStream<UserEvent> events = env.addSource(new UserEventSource());

DataStream<EnrichedUserEvent> enriched = AsyncDataStream.orderedWait(
    events,
    new UserEnrichmentFunction(),
    2000, TimeUnit.MILLISECONDS
);

class UserEnrichmentFunction extends RichAsyncFunction<UserEvent, EnrichedUserEvent> {
    private transient UserProfileService profileService;
    private transient MetricGroup asyncMetrics;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Initialize external service
        profileService = new UserProfileService();
        
        // Get metrics for monitoring
        asyncMetrics = getRuntimeContext()
            .getMetricGroup()
            .addGroup("async-enrichment");
    }

    @Override
    public void asyncInvoke(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {
        // Async call to external service
        CompletableFuture<UserProfile> profileFuture = profileService.getUserProfile(event.getUserId());
        
        profileFuture.whenComplete((profile, throwable) -> {
            if (throwable != null) {
                // Handle failure - could emit default value or propagate error
                resultFuture.complete(Collections.singletonList(
                    new EnrichedUserEvent(event, UserProfile.defaultProfile())
                ));
            } else {
                resultFuture.complete(Collections.singletonList(
                    new EnrichedUserEvent(event, profile)
                ));
            }
        });
    }

    @Override
    public void timeout(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {
        // Handle timeout - provide default enrichment
        resultFuture.complete(Collections.singletonList(
            new EnrichedUserEvent(event, UserProfile.timeoutProfile())
        ));
    }

    @Override
    public void close() throws Exception {
        if (profileService != null) {
            profileService.close();
        }
        super.close();
    }
}

Types

Async Function Interface

/**
 * Interface for asynchronous functions
 * @param <IN> - input type
 * @param <OUT> - output type
 */
interface AsyncFunction<IN, OUT> extends Function {
    /**
     * Trigger async operation for the given input
     * @param input - input element
     * @param resultFuture - future to complete with results
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
    
    /**
     * Optional method to handle timeouts
     * @param input - input element that timed out
     * @param resultFuture - future to complete with results or error
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(new TimeoutException("Async operation timed out"));
    }
}

/**
 * Rich async function with lifecycle methods
 */
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
    // Inherits lifecycle methods: open(), close(), getRuntimeContext()
}

Result Future

/**
 * Future for completing asynchronous operations
 * @param <OUT> - output type
 */
interface ResultFuture<OUT> {
    /**
     * Complete the async operation with results
     * @param result - collection of result elements
     */
    void complete(Collection<OUT> result);
    
    /**
     * Complete the async operation with an exception
     * @param error - exception that occurred
     */
    void completeExceptionally(Throwable error);
}

AsyncDataStream Utility

/**
 * Utility class for creating async operators
 */
class AsyncDataStream {
    // Static factory methods for creating async operators
    static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
        DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);
    
    static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
        DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);
        
    static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
        DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);
        
    static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
        DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11

docs

async-io.md

checkpointing.md

datastream-transformations.md

execution-environment.md

index.md

keyed-streams-state.md

process-functions.md

sources-sinks.md

time-watermarks.md

windowing.md

tile.json