Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Non-blocking external system integration with configurable parallelism and error handling. Demonstrates async functions, thread pool management, and ordered/unordered async processing patterns.
Comprehensive async I/O example with configurable parameters for simulating external system interactions.
/**
* Example illustrating asynchronous I/O operations with external systems
* Supports ordered/unordered processing, checkpointing, and error simulation
* @param args Command line arguments for configuration
*/
public class AsyncIOExample {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run with default configuration
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.async.AsyncIOExample
# Run with custom configuration
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.async.AsyncIOExample \
--fsStatePath /tmp/checkpoints \
--checkpointMode exactly_once \
--maxCount 50000 \
--sleepFactor 200 \
--failRatio 0.01 \
--waitMode ordered \
--eventType EventTime \
--timeout 5000Scala implementation of async I/O patterns using functional programming constructs.
/**
* Scala version of async I/O example
* @param args Command line arguments
*/
object AsyncIOExample {
def main(args: Array[String]): Unit;
}private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
synchronized (SampleAsyncFunction.class) {
if (counter == 0) {
executorService = Executors.newFixedThreadPool(30);
}
++counter;
}
}
@Override
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector)
throws Exception {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// Simulate async operation delay
long sleep = (long) (random.nextFloat() * sleepFactor);
Thread.sleep(sleep);
// Simulate occasional failures
if (random.nextFloat() < failRatio) {
collector.collect(new Exception("Simulated async error"));
} else {
collector.collect(Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
collector.collect(new ArrayList<String>(0));
}
}
});
}
@Override
public void close() throws Exception {
synchronized (SampleAsyncFunction.class) {
--counter;
if (counter == 0) {
executorService.shutdown();
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
}
}
}
}// Create async function
AsyncFunction<Integer, String> function = new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
// Ordered async processing (maintains element order)
DataStream<String> orderedResult = AsyncDataStream.orderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20 // Queue capacity
).setParallelism(taskNum);
// Unordered async processing (higher throughput)
DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20 // Queue capacity
).setParallelism(taskNum);private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {
private volatile boolean isRunning = true;
private int counter = 0;
private int start = 0;
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(start);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
for (Integer i : state) {
this.start = i;
}
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while ((start < counter || counter == -1) && isRunning) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(start);
++start;
if (start == Integer.MAX_VALUE) {
start = 0; // Loop back to 0
}
}
Thread.sleep(10L);
}
}
@Override
public void cancel() {
isRunning = false;
}
}// State backend configuration
if (statePath != null) {
env.setStateBackend(new FsStateBackend(statePath));
}
// Checkpointing mode
if (EXACTLY_ONCE_MODE.equals(cpMode)) {
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
} else {
env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
}// Event time processing
if (EVENT_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
} else if (INGESTION_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}
// Processing time is default/**
* Async function constructor parameters
* @param sleepFactor Maximum sleep time for simulating async delay
* @param failRatio Probability of generating exceptions (0.0 to 1.0)
* @param shutdownWaitTS Milliseconds to wait for executor shutdown
*/
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS);--fsStatePath: File system path for checkpointing state--checkpointMode: exactly_once or at_least_once--maxCount: Maximum number of elements from source (-1 for infinite)--sleepFactor: Async operation delay simulation factor--failRatio: Error simulation ratio (0.0 to 1.0)--waitMode: ordered or unordered async processing--waitOperatorParallelism: Parallelism for async wait operator--eventType: EventTime, IngestionTime, or ProcessingTime--shutdownWaitTS: Thread pool shutdown timeout (milliseconds)--timeout: Timeout for async operations (milliseconds)--fsStatePath /tmp/flink-checkpoints
--checkpointMode exactly_once
--maxCount 100000
--sleepFactor 100
--failRatio 0.001
--waitMode ordered
--waitOperatorParallelism 4
--eventType EventTime
--shutdownWaitTS 20000
--timeout 10000@Override
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector)
throws Exception {
try {
// Perform async operation
CompletableFuture<String> result = externalSystemCall(input);
result.whenComplete((value, exception) -> {
if (exception != null) {
collector.collect(exception);
} else {
collector.collect(Collections.singletonList(value));
}
});
} catch (Exception e) {
collector.collect(e);
}
}// Set timeout for async operations
AsyncDataStream.orderedWait(
inputStream,
asyncFunction,
5000L, // 5 second timeout
TimeUnit.MILLISECONDS,
100 // Queue capacity
);@Override
public void open(Configuration parameters) throws Exception {
synchronized (SampleAsyncFunction.class) {
if (counter == 0) {
// Create shared thread pool
executorService = Executors.newFixedThreadPool(30);
}
++counter;
}
}
@Override
public void close() throws Exception {
synchronized (SampleAsyncFunction.class) {
--counter;
if (counter == 0) {
// Shutdown thread pool when last instance closes
executorService.shutdown();
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
}
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10