0
# Async Operations
1
2
Flink's Async I/O functionality allows you to perform non-blocking calls to external systems (databases, web services, etc.) without blocking the stream processing pipeline. This can significantly improve throughput when dealing with external data enrichment or lookups.
3
4
## Capabilities
5
6
### AsyncDataStream
7
8
Main entry point for creating asynchronous data streams.
9
10
```scala { .api }
11
/**
12
* AsyncDataStream operations for creating async streams
13
*/
14
object AsyncDataStream {
15
def orderedWait[IN, OUT](
16
stream: DataStream[IN],
17
function: AsyncFunction[IN, OUT],
18
timeout: Long,
19
timeUnit: TimeUnit
20
): DataStream[OUT]
21
22
def orderedWait[IN, OUT](
23
stream: DataStream[IN],
24
function: AsyncFunction[IN, OUT],
25
timeout: Long,
26
timeUnit: TimeUnit,
27
capacity: Int
28
): DataStream[OUT]
29
30
def unorderedWait[IN, OUT](
31
stream: DataStream[IN],
32
function: AsyncFunction[IN, OUT],
33
timeout: Long,
34
timeUnit: TimeUnit
35
): DataStream[OUT]
36
37
def unorderedWait[IN, OUT](
38
stream: DataStream[IN],
39
function: AsyncFunction[IN, OUT],
40
timeout: Long,
41
timeUnit: TimeUnit,
42
capacity: Int
43
): DataStream[OUT]
44
}
45
```
46
47
**Usage Examples:**
48
49
```scala
50
import org.apache.flink.streaming.api.scala._
51
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
52
import java.util.concurrent.TimeUnit
53
54
case class User(id: Int, name: String)
55
case class UserWithEmail(id: Int, name: String, email: String)
56
57
val users = env.fromElements(
58
User(1, "Alice"),
59
User(2, "Bob"),
60
User(3, "Charlie")
61
)
62
63
// Ordered async operation - maintains order of results
64
val enrichedUsersOrdered = AsyncDataStream.orderedWait(
65
users,
66
new UserEmailAsyncFunction(),
67
5000, // 5 second timeout
68
TimeUnit.MILLISECONDS
69
)
70
71
// Unordered async operation - better throughput, no order guarantee
72
val enrichedUsersUnordered = AsyncDataStream.unorderedWait(
73
users,
74
new UserEmailAsyncFunction(),
75
5000, // 5 second timeout
76
TimeUnit.MILLISECONDS,
77
100 // capacity of 100 concurrent requests
78
)
79
```
80
81
### AsyncFunction Interface
82
83
Core interface for implementing asynchronous functions.
84
85
```scala { .api }
86
/**
87
* AsyncFunction interface for async operations
88
*/
89
trait AsyncFunction[IN, OUT] {
90
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
91
92
// Optional timeout handling
93
def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
94
resultFuture.completeExceptionally(
95
new TimeoutException("Async operation timed out")
96
)
97
}
98
}
99
```
100
101
**Usage Examples:**
102
103
```scala
104
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
105
import scala.concurrent.{ExecutionContext, Future}
106
import scala.util.{Success, Failure}
107
import java.util.concurrent.{CompletableFuture, TimeoutException}
108
109
// Async function using Scala Future
110
class UserEmailAsyncFunction extends AsyncFunction[User, UserWithEmail] {
111
112
implicit val ec: ExecutionContext = ExecutionContext.global
113
114
override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
115
// Simulate async database lookup
116
val emailFuture: Future[String] = lookupUserEmail(user.id)
117
118
emailFuture.onComplete {
119
case Success(email) =>
120
resultFuture.complete(List(UserWithEmail(user.id, user.name, email)).asJava)
121
case Failure(exception) =>
122
resultFuture.completeExceptionally(exception)
123
}
124
}
125
126
override def timeout(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
127
// Custom timeout handling
128
resultFuture.complete(List(UserWithEmail(user.id, user.name, "timeout@example.com")).asJava)
129
}
130
131
private def lookupUserEmail(userId: Int): Future[String] = {
132
// Simulate async call to external service
133
Future {
134
Thread.sleep(100) // Simulate network delay
135
s"user$userId@example.com"
136
}
137
}
138
}
139
140
// Async function using Java CompletableFuture
141
class UserAsyncFunctionJava extends AsyncFunction[User, UserWithEmail] {
142
143
override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
144
val future = CompletableFuture
145
.supplyAsync(() => s"user${user.id}@example.com")
146
.thenApply(email => UserWithEmail(user.id, user.name, email))
147
148
future.whenComplete((result, exception) => {
149
if (exception == null) {
150
resultFuture.complete(List(result).asJava)
151
} else {
152
resultFuture.completeExceptionally(exception)
153
}
154
})
155
}
156
}
157
```
158
159
### RichAsyncFunction
160
161
Rich version of AsyncFunction with access to runtime context and lifecycle methods.
162
163
```scala { .api }
164
/**
165
* RichAsyncFunction with lifecycle and context access
166
*/
167
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] {
168
def open(parameters: Configuration): Unit = {}
169
def close(): Unit = {}
170
def getRuntimeContext: RuntimeContext
171
def setRuntimeContext(context: RuntimeContext): Unit
172
def getIterationRuntimeContext: IterationRuntimeContext
173
}
174
```
175
176
**Usage Examples:**
177
178
```scala
179
import org.apache.flink.streaming.api.scala.async.RichAsyncFunction
180
import org.apache.flink.configuration.Configuration
181
import java.util.concurrent.{ExecutorService, Executors}
182
183
class DatabaseAsyncFunction extends RichAsyncFunction[String, String] {
184
185
private var executor: ExecutorService = _
186
private var connectionPool: DatabaseConnectionPool = _
187
188
override def open(parameters: Configuration): Unit = {
189
// Initialize resources
190
executor = Executors.newFixedThreadPool(10)
191
connectionPool = new DatabaseConnectionPool(
192
getRuntimeContext.getExecutionConfig.getGlobalJobParameters
193
)
194
}
195
196
override def close(): Unit = {
197
// Cleanup resources
198
if (executor != null) {
199
executor.shutdown()
200
}
201
if (connectionPool != null) {
202
connectionPool.close()
203
}
204
}
205
206
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
207
executor.submit(new Runnable {
208
override def run(): Unit = {
209
try {
210
val connection = connectionPool.getConnection()
211
val result = connection.query(s"SELECT value FROM table WHERE key = '$input'")
212
resultFuture.complete(List(result).asJava)
213
} catch {
214
case ex: Exception => resultFuture.completeExceptionally(ex)
215
}
216
}
217
})
218
}
219
}
220
```
221
222
### ResultFuture Interface
223
224
Interface for completing asynchronous operations.
225
226
```scala { .api }
227
/**
228
* ResultFuture interface for completing async operations
229
*/
230
trait ResultFuture[OUT] {
231
def complete(result: java.util.Collection[OUT]): Unit
232
def completeExceptionally(throwable: Throwable): Unit
233
}
234
```
235
236
**Usage Examples:**
237
238
```scala
239
// Complete with single result
240
resultFuture.complete(List(result).asJava)
241
242
// Complete with multiple results
243
resultFuture.complete(List(result1, result2, result3).asJava)
244
245
// Complete with empty result (filter effect)
246
resultFuture.complete(List.empty[OUT].asJava)
247
248
// Complete with exception
249
resultFuture.completeExceptionally(new RuntimeException("Async operation failed"))
250
```
251
252
### Async Function Patterns
253
254
Common patterns for implementing async functions.
255
256
**Database Lookup Pattern:**
257
258
```scala
259
class DatabaseLookupFunction extends AsyncFunction[String, (String, String)] {
260
261
private val httpClient = HttpAsyncClients.createDefault()
262
263
override def asyncInvoke(key: String, resultFuture: ResultFuture[(String, String)]): Unit = {
264
val request = new HttpGet(s"http://api.example.com/lookup?key=$key")
265
266
httpClient.execute(request, new FutureCallback[HttpResponse] {
267
override def completed(response: HttpResponse): Unit = {
268
val value = EntityUtils.toString(response.getEntity)
269
resultFuture.complete(List((key, value)).asJava)
270
}
271
272
override def failed(ex: Exception): Unit = {
273
resultFuture.completeExceptionally(ex)
274
}
275
276
override def cancelled(): Unit = {
277
resultFuture.completeExceptionally(new RuntimeException("Request cancelled"))
278
}
279
})
280
}
281
}
282
```
283
284
**Caching Pattern:**
285
286
```scala
287
import scala.collection.concurrent.TrieMap
288
289
class CachedAsyncFunction extends RichAsyncFunction[String, String] {
290
291
private val cache = new TrieMap[String, String]()
292
private var cacheHits: Counter = _
293
private var cacheMisses: Counter = _
294
295
override def open(parameters: Configuration): Unit = {
296
cacheHits = getRuntimeContext.getMetricGroup.counter("cache_hits")
297
cacheMisses = getRuntimeContext.getMetricGroup.counter("cache_misses")
298
}
299
300
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
301
cache.get(input) match {
302
case Some(cachedValue) =>
303
cacheHits.inc()
304
resultFuture.complete(List(cachedValue).asJava)
305
306
case None =>
307
cacheMisses.inc()
308
// Perform async lookup
309
performAsyncLookup(input).onComplete {
310
case Success(value) =>
311
cache.put(input, value)
312
resultFuture.complete(List(value).asJava)
313
case Failure(exception) =>
314
resultFuture.completeExceptionally(exception)
315
}
316
}
317
}
318
319
private def performAsyncLookup(input: String): Future[String] = {
320
// Implementation of actual async lookup
321
???
322
}
323
}
324
```
325
326
### Configuration and Tuning
327
328
Important configuration parameters for async operations.
329
330
```scala { .api }
331
/**
332
* Configuration parameters for async operations
333
*/
334
// Timeout: Maximum time to wait for async operation
335
// Capacity: Maximum number of concurrent async operations
336
// Order: Whether to maintain order of results
337
338
// Environment-level configuration
339
val env = StreamExecutionEnvironment.getExecutionEnvironment
340
env.getConfig.setAsyncTimeout(Duration.ofSeconds(10))
341
```
342
343
## Types
344
345
```scala { .api }
346
// Main async types
347
object AsyncDataStream
348
trait AsyncFunction[IN, OUT]
349
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT]
350
trait ResultFuture[OUT]
351
352
// Configuration and context types
353
class Configuration
354
trait RuntimeContext
355
trait IterationRuntimeContext
356
357
// Java interop types
358
class TimeUnit
359
class TimeoutException
360
class CompletableFuture[T]
361
362
// Metric types
363
trait Counter
364
trait MetricGroup
365
```