0
# Asynchronous I/O Examples
1
2
Non-blocking external system integration with configurable parallelism and error handling. Demonstrates async functions, thread pool management, and ordered/unordered async processing patterns.
3
4
## Capabilities
5
6
### AsyncIOExample (Java)
7
8
Comprehensive async I/O example with configurable parameters for simulating external system interactions.
9
10
```java { .api }
11
/**
12
* Example illustrating asynchronous I/O operations with external systems
13
* Supports ordered/unordered processing, checkpointing, and error simulation
14
* @param args Command line arguments for configuration
15
*/
16
public class AsyncIOExample {
17
public static void main(String[] args) throws Exception;
18
}
19
```
20
21
**Usage Example:**
22
23
```bash
24
# Run with default configuration
25
java -cp flink-examples-streaming_2.10-1.3.3.jar \
26
org.apache.flink.streaming.examples.async.AsyncIOExample
27
28
# Run with custom configuration
29
java -cp flink-examples-streaming_2.10-1.3.3.jar \
30
org.apache.flink.streaming.examples.async.AsyncIOExample \
31
--fsStatePath /tmp/checkpoints \
32
--checkpointMode exactly_once \
33
--maxCount 50000 \
34
--sleepFactor 200 \
35
--failRatio 0.01 \
36
--waitMode ordered \
37
--eventType EventTime \
38
--timeout 5000
39
```
40
41
### AsyncIOExample (Scala)
42
43
Scala implementation of async I/O patterns using functional programming constructs.
44
45
```scala { .api }
46
/**
47
* Scala version of async I/O example
48
* @param args Command line arguments
49
*/
50
object AsyncIOExample {
51
def main(args: Array[String]): Unit;
52
}
53
```
54
55
## Key Async Patterns
56
57
### Async Function Implementation
58
59
```java
60
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
61
private static ExecutorService executorService;
62
63
@Override
64
public void open(Configuration parameters) throws Exception {
65
super.open(parameters);
66
synchronized (SampleAsyncFunction.class) {
67
if (counter == 0) {
68
executorService = Executors.newFixedThreadPool(30);
69
}
70
++counter;
71
}
72
}
73
74
@Override
75
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector)
76
throws Exception {
77
executorService.submit(new Runnable() {
78
@Override
79
public void run() {
80
try {
81
// Simulate async operation delay
82
long sleep = (long) (random.nextFloat() * sleepFactor);
83
Thread.sleep(sleep);
84
85
// Simulate occasional failures
86
if (random.nextFloat() < failRatio) {
87
collector.collect(new Exception("Simulated async error"));
88
} else {
89
collector.collect(Collections.singletonList("key-" + (input % 10)));
90
}
91
} catch (InterruptedException e) {
92
collector.collect(new ArrayList<String>(0));
93
}
94
}
95
});
96
}
97
98
@Override
99
public void close() throws Exception {
100
synchronized (SampleAsyncFunction.class) {
101
--counter;
102
if (counter == 0) {
103
executorService.shutdown();
104
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
105
executorService.shutdownNow();
106
}
107
}
108
}
109
}
110
}
111
```
112
113
### Ordered vs Unordered Async Processing
114
115
```java
116
// Create async function
117
AsyncFunction<Integer, String> function = new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
118
119
// Ordered async processing (maintains element order)
120
DataStream<String> orderedResult = AsyncDataStream.orderedWait(
121
inputStream,
122
function,
123
timeout,
124
TimeUnit.MILLISECONDS,
125
20 // Queue capacity
126
).setParallelism(taskNum);
127
128
// Unordered async processing (higher throughput)
129
DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(
130
inputStream,
131
function,
132
timeout,
133
TimeUnit.MILLISECONDS,
134
20 // Queue capacity
135
).setParallelism(taskNum);
136
```
137
138
### Checkpointed Source Function
139
140
```java
141
private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {
142
private volatile boolean isRunning = true;
143
private int counter = 0;
144
private int start = 0;
145
146
@Override
147
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
148
return Collections.singletonList(start);
149
}
150
151
@Override
152
public void restoreState(List<Integer> state) throws Exception {
153
for (Integer i : state) {
154
this.start = i;
155
}
156
}
157
158
@Override
159
public void run(SourceContext<Integer> ctx) throws Exception {
160
while ((start < counter || counter == -1) && isRunning) {
161
synchronized (ctx.getCheckpointLock()) {
162
ctx.collect(start);
163
++start;
164
if (start == Integer.MAX_VALUE) {
165
start = 0; // Loop back to 0
166
}
167
}
168
Thread.sleep(10L);
169
}
170
}
171
172
@Override
173
public void cancel() {
174
isRunning = false;
175
}
176
}
177
```
178
179
## Configuration Options
180
181
### Checkpointing Configuration
182
183
```java
184
// State backend configuration
185
if (statePath != null) {
186
env.setStateBackend(new FsStateBackend(statePath));
187
}
188
189
// Checkpointing mode
190
if (EXACTLY_ONCE_MODE.equals(cpMode)) {
191
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
192
} else {
193
env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
194
}
195
```
196
197
### Time Characteristics
198
199
```java
200
// Event time processing
201
if (EVENT_TIME.equals(timeType)) {
202
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
203
} else if (INGESTION_TIME.equals(timeType)) {
204
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
205
}
206
// Processing time is default
207
```
208
209
### Async Function Parameters
210
211
```java { .api }
212
/**
213
* Async function constructor parameters
214
* @param sleepFactor Maximum sleep time for simulating async delay
215
* @param failRatio Probability of generating exceptions (0.0 to 1.0)
216
* @param shutdownWaitTS Milliseconds to wait for executor shutdown
217
*/
218
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS);
219
```
220
221
## Command Line Parameters
222
223
### Required Parameters
224
- `--fsStatePath`: File system path for checkpointing state
225
- `--checkpointMode`: `exactly_once` or `at_least_once`
226
- `--maxCount`: Maximum number of elements from source (-1 for infinite)
227
- `--sleepFactor`: Async operation delay simulation factor
228
- `--failRatio`: Error simulation ratio (0.0 to 1.0)
229
- `--waitMode`: `ordered` or `unordered` async processing
230
- `--waitOperatorParallelism`: Parallelism for async wait operator
231
- `--eventType`: `EventTime`, `IngestionTime`, or `ProcessingTime`
232
- `--shutdownWaitTS`: Thread pool shutdown timeout (milliseconds)
233
- `--timeout`: Timeout for async operations (milliseconds)
234
235
### Parameter Examples
236
```bash
237
--fsStatePath /tmp/flink-checkpoints
238
--checkpointMode exactly_once
239
--maxCount 100000
240
--sleepFactor 100
241
--failRatio 0.001
242
--waitMode ordered
243
--waitOperatorParallelism 4
244
--eventType EventTime
245
--shutdownWaitTS 20000
246
--timeout 10000
247
```
248
249
## Error Handling Patterns
250
251
### Exception Handling in Async Functions
252
```java
253
@Override
254
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector)
255
throws Exception {
256
try {
257
// Perform async operation
258
CompletableFuture<String> result = externalSystemCall(input);
259
result.whenComplete((value, exception) -> {
260
if (exception != null) {
261
collector.collect(exception);
262
} else {
263
collector.collect(Collections.singletonList(value));
264
}
265
});
266
} catch (Exception e) {
267
collector.collect(e);
268
}
269
}
270
```
271
272
### Timeout Configuration
273
```java
274
// Set timeout for async operations
275
AsyncDataStream.orderedWait(
276
inputStream,
277
asyncFunction,
278
5000L, // 5 second timeout
279
TimeUnit.MILLISECONDS,
280
100 // Queue capacity
281
);
282
```
283
284
## Thread Pool Management
285
286
### Thread Pool Lifecycle
287
```java
288
@Override
289
public void open(Configuration parameters) throws Exception {
290
synchronized (SampleAsyncFunction.class) {
291
if (counter == 0) {
292
// Create shared thread pool
293
executorService = Executors.newFixedThreadPool(30);
294
}
295
++counter;
296
}
297
}
298
299
@Override
300
public void close() throws Exception {
301
synchronized (SampleAsyncFunction.class) {
302
--counter;
303
if (counter == 0) {
304
// Shutdown thread pool when last instance closes
305
executorService.shutdown();
306
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
307
executorService.shutdownNow();
308
}
309
}
310
}
311
}
312
```
313
314
## Dependencies
315
316
```xml
317
<dependency>
318
<groupId>org.apache.flink</groupId>
319
<artifactId>flink-streaming-java_2.10</artifactId>
320
<version>1.3.3</version>
321
</dependency>
322
323
<dependency>
324
<groupId>org.apache.flink</groupId>
325
<artifactId>flink-runtime_2.10</artifactId>
326
<version>1.3.3</version>
327
</dependency>
328
```
329
330
## Required Imports
331
332
```java
333
import org.apache.flink.api.common.functions.FlatMapFunction;
334
import org.apache.flink.api.java.tuple.Tuple2;
335
import org.apache.flink.configuration.Configuration;
336
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
337
import org.apache.flink.streaming.api.CheckpointingMode;
338
import org.apache.flink.streaming.api.TimeCharacteristic;
339
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
340
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
341
import org.apache.flink.streaming.api.datastream.DataStream;
342
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
343
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
344
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
345
import org.apache.flink.streaming.api.functions.source.SourceFunction;
346
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
347
```