0
# Async I/O Operations
1
2
AsyncDataStream provides high-performance async operations for enriching streams with external data without blocking stream processing. This is essential for calling external services, databases, or APIs while maintaining streaming performance.
3
4
## Capabilities
5
6
### Unordered Async Operations
7
8
Async operations where result order doesn't need to match input order (higher throughput).
9
10
```scala { .api }
11
object AsyncDataStream {
12
/**
13
* Apply async transformation with unordered results using AsyncFunction
14
* @param input Input DataStream to transform
15
* @param asyncFunction AsyncFunction implementation
16
* @param timeout Timeout for async operations
17
* @param timeUnit Time unit for timeout
18
* @param capacity Queue capacity for async operations
19
* @return DataStream with async transformation results
20
*/
21
def unorderedWait[IN, OUT: TypeInformation](
22
input: DataStream[IN],
23
asyncFunction: AsyncFunction[IN, OUT],
24
timeout: Long,
25
timeUnit: TimeUnit,
26
capacity: Int
27
): DataStream[OUT]
28
29
/**
30
* Apply async transformation with unordered results (default capacity)
31
* @param input Input DataStream to transform
32
* @param asyncFunction AsyncFunction implementation
33
* @param timeout Timeout for async operations
34
* @param timeUnit Time unit for timeout
35
* @return DataStream with async transformation results
36
*/
37
def unorderedWait[IN, OUT: TypeInformation](
38
input: DataStream[IN],
39
asyncFunction: AsyncFunction[IN, OUT],
40
timeout: Long,
41
timeUnit: TimeUnit
42
): DataStream[OUT]
43
44
/**
45
* Apply async transformation using function syntax (with capacity)
46
* @param input Input DataStream to transform
47
* @param timeout Timeout for async operations
48
* @param timeUnit Time unit for timeout
49
* @param capacity Queue capacity for async operations
50
* @param asyncFunction Function taking input and ResultFuture
51
* @return DataStream with async transformation results
52
*/
53
def unorderedWait[IN, OUT: TypeInformation](
54
input: DataStream[IN],
55
timeout: Long,
56
timeUnit: TimeUnit,
57
capacity: Int
58
)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
59
60
/**
61
* Apply async transformation using function syntax (default capacity)
62
* @param input Input DataStream to transform
63
* @param timeout Timeout for async operations
64
* @param timeUnit Time unit for timeout
65
* @param asyncFunction Function taking input and ResultFuture
66
* @return DataStream with async transformation results
67
*/
68
def unorderedWait[IN, OUT: TypeInformation](
69
input: DataStream[IN],
70
timeout: Long,
71
timeUnit: TimeUnit
72
)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
73
}
74
```
75
76
### Ordered Async Operations
77
78
Async operations where result order matches input order (preserves stream ordering).
79
80
```scala { .api }
81
object AsyncDataStream {
82
/**
83
* Apply async transformation with ordered results using AsyncFunction
84
* @param input Input DataStream to transform
85
* @param asyncFunction AsyncFunction implementation
86
* @param timeout Timeout for async operations
87
* @param timeUnit Time unit for timeout
88
* @param capacity Queue capacity for async operations
89
* @return DataStream with async transformation results (ordered)
90
*/
91
def orderedWait[IN, OUT: TypeInformation](
92
input: DataStream[IN],
93
asyncFunction: AsyncFunction[IN, OUT],
94
timeout: Long,
95
timeUnit: TimeUnit,
96
capacity: Int
97
): DataStream[OUT]
98
99
/**
100
* Apply async transformation with ordered results (default capacity)
101
* @param input Input DataStream to transform
102
* @param asyncFunction AsyncFunction implementation
103
* @param timeout Timeout for async operations
104
* @param timeUnit Time unit for timeout
105
* @return DataStream with async transformation results (ordered)
106
*/
107
def orderedWait[IN, OUT: TypeInformation](
108
input: DataStream[IN],
109
asyncFunction: AsyncFunction[IN, OUT],
110
timeout: Long,
111
timeUnit: TimeUnit
112
): DataStream[OUT]
113
114
/**
115
* Apply async transformation using function syntax with ordering (with capacity)
116
* @param input Input DataStream to transform
117
* @param timeout Timeout for async operations
118
* @param timeUnit Time unit for timeout
119
* @param capacity Queue capacity for async operations
120
* @param asyncFunction Function taking input and ResultFuture
121
* @return DataStream with async transformation results (ordered)
122
*/
123
def orderedWait[IN, OUT: TypeInformation](
124
input: DataStream[IN],
125
timeout: Long,
126
timeUnit: TimeUnit,
127
capacity: Int
128
)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
129
130
/**
131
* Apply async transformation using function syntax with ordering (default capacity)
132
* @param input Input DataStream to transform
133
* @param timeout Timeout for async operations
134
* @param timeUnit Time unit for timeout
135
* @param asyncFunction Function taking input and ResultFuture
136
* @return DataStream with async transformation results (ordered)
137
*/
138
def orderedWait[IN, OUT: TypeInformation](
139
input: DataStream[IN],
140
timeout: Long,
141
timeUnit: TimeUnit
142
)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
143
}
144
```
145
146
**Usage Examples:**
147
148
```scala
149
import org.apache.flink.streaming.api.scala._
150
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
151
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
152
import java.util.concurrent.TimeUnit
153
import scala.concurrent.Future
154
import scala.util.{Success, Failure}
155
156
case class User(id: String, name: String)
157
case class UserProfile(id: String, name: String, email: String, department: String)
158
159
val users = env.fromElements(
160
User("u1", "Alice"),
161
User("u2", "Bob"),
162
User("u3", "Charlie")
163
)
164
165
// Using AsyncFunction implementation
166
class DatabaseLookupFunction extends AsyncFunction[User, UserProfile] {
167
override def asyncInvoke(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {
168
// Simulate async database call
169
Future {
170
// Database lookup simulation
171
UserProfile(user.id, user.name, s"${user.name.toLowerCase}@company.com", "Engineering")
172
}.onComplete {
173
case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))
174
case Failure(exception) => resultFuture.completeExceptionally(exception)
175
}
176
}
177
}
178
179
// Unordered async transformation (higher throughput)
180
val enrichedUsersUnordered = AsyncDataStream.unorderedWait(
181
users,
182
new DatabaseLookupFunction,
183
5000, // 5 second timeout
184
TimeUnit.MILLISECONDS
185
)
186
187
// Ordered async transformation (preserves order)
188
val enrichedUsersOrdered = AsyncDataStream.orderedWait(
189
users,
190
new DatabaseLookupFunction,
191
5000,
192
TimeUnit.MILLISECONDS
193
)
194
195
// Using function syntax
196
val enrichedWithFunction = AsyncDataStream.unorderedWait(
197
users,
198
5000,
199
TimeUnit.MILLISECONDS
200
) { (user: User, resultFuture: ResultFuture[UserProfile]) =>
201
// Async operation using function syntax
202
Future {
203
UserProfile(user.id, user.name, s"${user.name.toLowerCase}@company.com", "Engineering")
204
}.onComplete {
205
case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))
206
case Failure(exception) => resultFuture.completeExceptionally(exception)
207
}
208
}
209
```
210
211
### AsyncFunction Interface
212
213
Base interface for implementing async operations.
214
215
```scala { .api }
216
trait AsyncFunction[IN, OUT] {
217
/**
218
* Async invocation method - implement async logic here
219
* @param input Input element to process
220
* @param resultFuture ResultFuture to complete with results
221
*/
222
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
223
224
/**
225
* Optional timeout handling method
226
* @param input Input element that timed out
227
* @param resultFuture ResultFuture to complete for timeout case
228
*/
229
def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
230
resultFuture.completeExceptionally(
231
new RuntimeException(s"Async operation timed out for input: $input")
232
)
233
}
234
}
235
```
236
237
### RichAsyncFunction
238
239
Rich version of AsyncFunction with access to runtime context.
240
241
```scala { .api }
242
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] with RichFunction {
243
/**
244
* Get the runtime context
245
* @return RuntimeContext for accessing configuration and metrics
246
*/
247
def getRuntimeContext: RuntimeContext
248
249
/**
250
* Open method called once per parallel instance
251
* @param parameters Configuration parameters
252
*/
253
override def open(parameters: Configuration): Unit = {}
254
255
/**
256
* Close method called when function is shut down
257
*/
258
override def close(): Unit = {}
259
260
/**
261
* Set runtime context (called by Flink runtime)
262
* @param t Runtime context
263
*/
264
override def setRuntimeContext(t: RuntimeContext): Unit
265
}
266
```
267
268
**Usage Examples:**
269
270
```scala
271
import org.apache.flink.streaming.api.scala.async.RichAsyncFunction
272
import org.apache.flink.configuration.Configuration
273
import java.sql.{Connection, DriverManager}
274
275
class DatabaseAsyncFunction extends RichAsyncFunction[User, UserProfile] {
276
private var connection: Connection = _
277
278
override def open(parameters: Configuration): Unit = {
279
// Initialize database connection
280
connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "")
281
// Setup connection pool, initialize client, etc.
282
}
283
284
override def asyncInvoke(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {
285
Future {
286
// Use the connection for database lookup
287
val stmt = connection.prepareStatement("SELECT * FROM user_profiles WHERE id = ?")
288
stmt.setString(1, user.id)
289
val rs = stmt.executeQuery()
290
291
if (rs.next()) {
292
UserProfile(
293
rs.getString("id"),
294
rs.getString("name"),
295
rs.getString("email"),
296
rs.getString("department")
297
)
298
} else {
299
throw new RuntimeException(s"User profile not found for ${user.id}")
300
}
301
}.onComplete {
302
case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))
303
case Failure(exception) => resultFuture.completeExceptionally(exception)
304
}
305
}
306
307
override def timeout(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {
308
// Custom timeout handling
309
resultFuture.complete(java.util.Collections.singletonList(
310
UserProfile(user.id, user.name, "unknown@company.com", "Unknown")
311
))
312
}
313
314
override def close(): Unit = {
315
if (connection != null) {
316
connection.close()
317
}
318
}
319
}
320
```
321
322
### ResultFuture Interface
323
324
Interface for completing async operations with results or errors.
325
326
```scala { .api }
327
trait ResultFuture[T] {
328
/**
329
* Complete the async operation with a collection of results
330
* @param result Collection of result elements
331
*/
332
def complete(result: java.util.Collection[T]): Unit
333
334
/**
335
* Complete the async operation with varargs results
336
* @param result Result elements as varargs
337
*/
338
def complete(result: T*): Unit = {
339
complete(java.util.Arrays.asList(result: _*))
340
}
341
342
/**
343
* Complete the async operation with an exception
344
* @param error Exception that occurred during async operation
345
*/
346
def completeExceptionally(error: Throwable): Unit
347
}
348
```
349
350
## Retry Strategies
351
352
Support for retry logic in async operations.
353
354
```scala { .api }
355
// Retry strategy interface
356
trait AsyncRetryStrategy[OUT] {
357
def canRetry(currentAttempts: Int): Boolean
358
def getRetryDelay(currentAttempts: Int): Long
359
}
360
361
// Retry predicate interface
362
trait AsyncRetryPredicate[OUT] {
363
def resultPredicate(result: OUT): Boolean
364
def exceptionPredicate(exception: Throwable): Boolean
365
}
366
367
// Built-in retry strategies
368
object AsyncRetryStrategies {
369
def fixedDelay(maxAttempts: Int, delay: Long): AsyncRetryStrategy[_]
370
def exponentialBackoff(maxAttempts: Int, initialDelay: Long, multiplier: Double): AsyncRetryStrategy[_]
371
def noRetry(): AsyncRetryStrategy[_]
372
}
373
374
// Built-in retry predicates
375
object RetryPredicates {
376
def hasException[OUT](exceptionClass: Class[_ <: Throwable]): AsyncRetryPredicate[OUT]
377
def hasResult[OUT](predicate: OUT => Boolean): AsyncRetryPredicate[OUT]
378
}
379
```
380
381
## Types
382
383
```scala { .api }
384
// Time units for timeout specification
385
object TimeUnit extends Enumeration {
386
val NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS = Value
387
}
388
389
// Runtime context for rich functions
390
trait RuntimeContext {
391
def getTaskName: String
392
def getMetricGroup: MetricGroup
393
def getNumberOfParallelSubtasks: Int
394
def getIndexOfThisSubtask: Int
395
def getExecutionConfig: ExecutionConfig
396
def getUserCodeClassLoader: ClassLoader
397
}
398
399
// Rich function base interface
400
trait RichFunction {
401
def open(parameters: Configuration): Unit
402
def close(): Unit
403
def getRuntimeContext: RuntimeContext
404
def setRuntimeContext(t: RuntimeContext): Unit
405
}
406
407
// Configuration for open method
408
class Configuration extends java.util.HashMap[String, String] {
409
def getString(key: String, defaultValue: String): String
410
def getInteger(key: String, defaultValue: Int): Int
411
def getBoolean(key: String, defaultValue: Boolean): Boolean
412
def getFloat(key: String, defaultValue: Float): Float
413
def getDouble(key: String, defaultValue: Double): Double
414
def getLong(key: String, defaultValue: Long): Long
415
}
416
```