0
# Async I/O Operations
1
2
AsyncDataStream provides utilities for asynchronous I/O operations in Apache Flink, enabling efficient integration with external systems without blocking stream processing. This is particularly useful for database lookups, REST API calls, and other I/O-bound operations.
3
4
## Capabilities
5
6
### Async Data Stream Operations
7
8
Create asynchronous processing operators that can handle concurrent I/O operations.
9
10
```java { .api }
11
/**
12
* Apply async function with ordered results
13
* @param in - input DataStream
14
* @param func - async function to apply
15
* @param timeout - timeout for async operations
16
* @param timeUnit - time unit for timeout
17
* @return async processed DataStream with ordered results
18
*/
19
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
20
DataStream<IN> in,
21
AsyncFunction<IN, OUT> func,
22
long timeout,
23
TimeUnit timeUnit
24
);
25
26
/**
27
* Apply async function with unordered results for better performance
28
* @param in - input DataStream
29
* @param func - async function to apply
30
* @param timeout - timeout for async operations
31
* @param timeUnit - time unit for timeout
32
* @return async processed DataStream with unordered results
33
*/
34
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
35
DataStream<IN> in,
36
AsyncFunction<IN, OUT> func,
37
long timeout,
38
TimeUnit timeUnit
39
);
40
41
/**
42
* Apply async function with ordered results and capacity
43
* @param in - input DataStream
44
* @param func - async function to apply
45
* @param timeout - timeout for async operations
46
* @param timeUnit - time unit for timeout
47
* @param capacity - capacity of async operator
48
* @return async processed DataStream
49
*/
50
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
51
DataStream<IN> in,
52
AsyncFunction<IN, OUT> func,
53
long timeout,
54
TimeUnit timeUnit,
55
int capacity
56
);
57
58
/**
59
* Apply async function with unordered results and capacity
60
* @param in - input DataStream
61
* @param func - async function to apply
62
* @param timeout - timeout for async operations
63
* @param timeUnit - time unit for timeout
64
* @param capacity - capacity of async operator
65
* @return async processed DataStream
66
*/
67
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
68
DataStream<IN> in,
69
AsyncFunction<IN, OUT> func,
70
long timeout,
71
TimeUnit timeUnit,
72
int capacity
73
);
74
```
75
76
**Usage Examples:**
77
78
```java
79
// Database lookup example
80
DataStream<String> input = env.fromElements("key1", "key2", "key3");
81
82
// Ordered async processing - maintains event order
83
DataStream<String> orderedResult = AsyncDataStream.orderedWait(
84
input,
85
new DatabaseAsyncFunction(),
86
1000, TimeUnit.MILLISECONDS
87
);
88
89
// Unordered async processing - better performance, no order guarantee
90
DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(
91
input,
92
new DatabaseAsyncFunction(),
93
1000, TimeUnit.MILLISECONDS,
94
100 // capacity
95
);
96
97
// Custom async function for database lookup
98
class DatabaseAsyncFunction implements AsyncFunction<String, String> {
99
private transient DatabaseClient client;
100
101
@Override
102
public void open(Configuration parameters) throws Exception {
103
client = new DatabaseClient();
104
}
105
106
@Override
107
public void asyncInvoke(String key, ResultFuture<String> resultFuture) throws Exception {
108
// Perform async database lookup
109
CompletableFuture<String> future = client.asyncGet(key);
110
111
future.whenComplete((result, throwable) -> {
112
if (throwable != null) {
113
resultFuture.completeExceptionally(throwable);
114
} else {
115
resultFuture.complete(Collections.singletonList(result));
116
}
117
});
118
}
119
}
120
```
121
122
### Rich Async Function
123
124
Use RichAsyncFunction for async operations with access to runtime context and lifecycle methods.
125
126
```java { .api }
127
/**
128
* Rich async function with lifecycle methods and runtime context access
129
*/
130
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
131
// Inherits open(), close(), getRuntimeContext() from AbstractRichFunction
132
abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
133
134
// Optional timeout handling
135
void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception;
136
}
137
```
138
139
**Usage Examples:**
140
141
```java
142
DataStream<UserEvent> events = env.addSource(new UserEventSource());
143
144
DataStream<EnrichedUserEvent> enriched = AsyncDataStream.orderedWait(
145
events,
146
new UserEnrichmentFunction(),
147
2000, TimeUnit.MILLISECONDS
148
);
149
150
class UserEnrichmentFunction extends RichAsyncFunction<UserEvent, EnrichedUserEvent> {
151
private transient UserProfileService profileService;
152
private transient MetricGroup asyncMetrics;
153
154
@Override
155
public void open(Configuration parameters) throws Exception {
156
super.open(parameters);
157
158
// Initialize external service
159
profileService = new UserProfileService();
160
161
// Get metrics for monitoring
162
asyncMetrics = getRuntimeContext()
163
.getMetricGroup()
164
.addGroup("async-enrichment");
165
}
166
167
@Override
168
public void asyncInvoke(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {
169
// Async call to external service
170
CompletableFuture<UserProfile> profileFuture = profileService.getUserProfile(event.getUserId());
171
172
profileFuture.whenComplete((profile, throwable) -> {
173
if (throwable != null) {
174
// Handle failure - could emit default value or propagate error
175
resultFuture.complete(Collections.singletonList(
176
new EnrichedUserEvent(event, UserProfile.defaultProfile())
177
));
178
} else {
179
resultFuture.complete(Collections.singletonList(
180
new EnrichedUserEvent(event, profile)
181
));
182
}
183
});
184
}
185
186
@Override
187
public void timeout(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {
188
// Handle timeout - provide default enrichment
189
resultFuture.complete(Collections.singletonList(
190
new EnrichedUserEvent(event, UserProfile.timeoutProfile())
191
));
192
}
193
194
@Override
195
public void close() throws Exception {
196
if (profileService != null) {
197
profileService.close();
198
}
199
super.close();
200
}
201
}
202
```
203
204
## Types
205
206
### Async Function Interface
207
208
```java { .api }
209
/**
210
* Interface for asynchronous functions
211
* @param <IN> - input type
212
* @param <OUT> - output type
213
*/
214
interface AsyncFunction<IN, OUT> extends Function {
215
/**
216
* Trigger async operation for the given input
217
* @param input - input element
218
* @param resultFuture - future to complete with results
219
*/
220
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
221
222
/**
223
* Optional method to handle timeouts
224
* @param input - input element that timed out
225
* @param resultFuture - future to complete with results or error
226
*/
227
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
228
resultFuture.completeExceptionally(new TimeoutException("Async operation timed out"));
229
}
230
}
231
232
/**
233
* Rich async function with lifecycle methods
234
*/
235
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
236
// Inherits lifecycle methods: open(), close(), getRuntimeContext()
237
}
238
```
239
240
### Result Future
241
242
```java { .api }
243
/**
244
* Future for completing asynchronous operations
245
* @param <OUT> - output type
246
*/
247
interface ResultFuture<OUT> {
248
/**
249
* Complete the async operation with results
250
* @param result - collection of result elements
251
*/
252
void complete(Collection<OUT> result);
253
254
/**
255
* Complete the async operation with an exception
256
* @param error - exception that occurred
257
*/
258
void completeExceptionally(Throwable error);
259
}
260
```
261
262
### AsyncDataStream Utility
263
264
```java { .api }
265
/**
266
* Utility class for creating async operators
267
*/
268
class AsyncDataStream {
269
// Static factory methods for creating async operators
270
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
271
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);
272
273
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
274
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);
275
276
static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
277
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);
278
279
static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
280
DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);
281
}
282
```