CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

async.mddocs/

Asynchronous I/O Examples

Non-blocking external system integration with configurable parallelism and error handling. Demonstrates async functions, thread pool management, and ordered/unordered async processing patterns.

Capabilities

AsyncIOExample (Java)

Comprehensive async I/O example with configurable parameters for simulating external system interactions.

/**
 * Example illustrating asynchronous I/O operations with external systems
 * Supports ordered/unordered processing, checkpointing, and error simulation
 * @param args Command line arguments for configuration
 */
public class AsyncIOExample {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run with default configuration
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.async.AsyncIOExample

# Run with custom configuration
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.async.AsyncIOExample \
  --fsStatePath /tmp/checkpoints \
  --checkpointMode exactly_once \
  --maxCount 50000 \
  --sleepFactor 200 \
  --failRatio 0.01 \
  --waitMode ordered \
  --eventType EventTime \
  --timeout 5000

AsyncIOExample (Scala)

Scala implementation of async I/O patterns using functional programming constructs.

/**
 * Scala version of async I/O example
 * @param args Command line arguments
 */
object AsyncIOExample {
    def main(args: Array[String]): Unit;
}

Key Async Patterns

Async Function Implementation

private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
    private static ExecutorService executorService;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        synchronized (SampleAsyncFunction.class) {
            if (counter == 0) {
                executorService = Executors.newFixedThreadPool(30);
            }
            ++counter;
        }
    }
    
    @Override
    public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) 
        throws Exception {
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // Simulate async operation delay
                    long sleep = (long) (random.nextFloat() * sleepFactor);
                    Thread.sleep(sleep);
                    
                    // Simulate occasional failures
                    if (random.nextFloat() < failRatio) {
                        collector.collect(new Exception("Simulated async error"));
                    } else {
                        collector.collect(Collections.singletonList("key-" + (input % 10)));
                    }
                } catch (InterruptedException e) {
                    collector.collect(new ArrayList<String>(0));
                }
            }
        });
    }
    
    @Override
    public void close() throws Exception {
        synchronized (SampleAsyncFunction.class) {
            --counter;
            if (counter == 0) {
                executorService.shutdown();
                if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
                    executorService.shutdownNow();
                }
            }
        }
    }
}

Ordered vs Unordered Async Processing

// Create async function
AsyncFunction<Integer, String> function = new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);

// Ordered async processing (maintains element order)
DataStream<String> orderedResult = AsyncDataStream.orderedWait(
    inputStream,
    function,
    timeout,
    TimeUnit.MILLISECONDS,
    20  // Queue capacity
).setParallelism(taskNum);

// Unordered async processing (higher throughput)
DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(
    inputStream,
    function, 
    timeout,
    TimeUnit.MILLISECONDS,
    20  // Queue capacity
).setParallelism(taskNum);

Checkpointed Source Function

private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {
    private volatile boolean isRunning = true;
    private int counter = 0;
    private int start = 0;
    
    @Override
    public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
        return Collections.singletonList(start);
    }
    
    @Override
    public void restoreState(List<Integer> state) throws Exception {
        for (Integer i : state) {
            this.start = i;
        }
    }
    
    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while ((start < counter || counter == -1) && isRunning) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(start);
                ++start;
                if (start == Integer.MAX_VALUE) {
                    start = 0; // Loop back to 0
                }
            }
            Thread.sleep(10L);
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
}

Configuration Options

Checkpointing Configuration

// State backend configuration
if (statePath != null) {
    env.setStateBackend(new FsStateBackend(statePath));
}

// Checkpointing mode
if (EXACTLY_ONCE_MODE.equals(cpMode)) {
    env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
} else {
    env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
}

Time Characteristics

// Event time processing
if (EVENT_TIME.equals(timeType)) {
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
} else if (INGESTION_TIME.equals(timeType)) {
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}
// Processing time is default

Async Function Parameters

/**
 * Async function constructor parameters
 * @param sleepFactor Maximum sleep time for simulating async delay
 * @param failRatio Probability of generating exceptions (0.0 to 1.0)
 * @param shutdownWaitTS Milliseconds to wait for executor shutdown
 */
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS);

Command Line Parameters

Required Parameters

  • --fsStatePath: File system path for checkpointing state
  • --checkpointMode: exactly_once or at_least_once
  • --maxCount: Maximum number of elements from source (-1 for infinite)
  • --sleepFactor: Async operation delay simulation factor
  • --failRatio: Error simulation ratio (0.0 to 1.0)
  • --waitMode: ordered or unordered async processing
  • --waitOperatorParallelism: Parallelism for async wait operator
  • --eventType: EventTime, IngestionTime, or ProcessingTime
  • --shutdownWaitTS: Thread pool shutdown timeout (milliseconds)
  • --timeout: Timeout for async operations (milliseconds)

Parameter Examples

--fsStatePath /tmp/flink-checkpoints
--checkpointMode exactly_once
--maxCount 100000
--sleepFactor 100
--failRatio 0.001
--waitMode ordered
--waitOperatorParallelism 4
--eventType EventTime
--shutdownWaitTS 20000
--timeout 10000

Error Handling Patterns

Exception Handling in Async Functions

@Override
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) 
    throws Exception {
    try {
        // Perform async operation
        CompletableFuture<String> result = externalSystemCall(input);
        result.whenComplete((value, exception) -> {
            if (exception != null) {
                collector.collect(exception);
            } else {
                collector.collect(Collections.singletonList(value));
            }
        });
    } catch (Exception e) {
        collector.collect(e);
    }
}

Timeout Configuration

// Set timeout for async operations
AsyncDataStream.orderedWait(
    inputStream,
    asyncFunction,
    5000L,                    // 5 second timeout
    TimeUnit.MILLISECONDS,
    100                       // Queue capacity
);

Thread Pool Management

Thread Pool Lifecycle

@Override
public void open(Configuration parameters) throws Exception {
    synchronized (SampleAsyncFunction.class) {
        if (counter == 0) {
            // Create shared thread pool
            executorService = Executors.newFixedThreadPool(30);
        }
        ++counter;
    }
}

@Override
public void close() throws Exception {
    synchronized (SampleAsyncFunction.class) {
        --counter;
        if (counter == 0) {
            // Shutdown thread pool when last instance closes
            executorService.shutdown();
            if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
                executorService.shutdownNow();
            }
        }
    }
}

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json