Flink's Async I/O functionality allows you to perform non-blocking calls to external systems (databases, web services, etc.) without blocking the stream processing pipeline. This can significantly improve throughput when dealing with external data enrichment or lookups.
Main entry point for creating asynchronous data streams.
/**
* AsyncDataStream operations for creating async streams
*/
object AsyncDataStream {
def orderedWait[IN, OUT](
stream: DataStream[IN],
function: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
def orderedWait[IN, OUT](
stream: DataStream[IN],
function: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit,
capacity: Int
): DataStream[OUT]
def unorderedWait[IN, OUT](
stream: DataStream[IN],
function: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
def unorderedWait[IN, OUT](
stream: DataStream[IN],
function: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit,
capacity: Int
): DataStream[OUT]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
import java.util.concurrent.TimeUnit
case class User(id: Int, name: String)
case class UserWithEmail(id: Int, name: String, email: String)
val users = env.fromElements(
User(1, "Alice"),
User(2, "Bob"),
User(3, "Charlie")
)
// Ordered async operation - maintains order of results
val enrichedUsersOrdered = AsyncDataStream.orderedWait(
users,
new UserEmailAsyncFunction(),
5000, // 5 second timeout
TimeUnit.MILLISECONDS
)
// Unordered async operation - better throughput, no order guarantee
val enrichedUsersUnordered = AsyncDataStream.unorderedWait(
users,
new UserEmailAsyncFunction(),
5000, // 5 second timeout
TimeUnit.MILLISECONDS,
100 // capacity of 100 concurrent requests
)Core interface for implementing asynchronous functions.
/**
* AsyncFunction interface for async operations
*/
trait AsyncFunction[IN, OUT] {
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
// Optional timeout handling
def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
resultFuture.completeExceptionally(
new TimeoutException("Async operation timed out")
)
}
}Usage Examples:
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Success, Failure}
import java.util.concurrent.{CompletableFuture, TimeoutException}
// Async function using Scala Future
class UserEmailAsyncFunction extends AsyncFunction[User, UserWithEmail] {
implicit val ec: ExecutionContext = ExecutionContext.global
override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
// Simulate async database lookup
val emailFuture: Future[String] = lookupUserEmail(user.id)
emailFuture.onComplete {
case Success(email) =>
resultFuture.complete(List(UserWithEmail(user.id, user.name, email)).asJava)
case Failure(exception) =>
resultFuture.completeExceptionally(exception)
}
}
override def timeout(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
// Custom timeout handling
resultFuture.complete(List(UserWithEmail(user.id, user.name, "timeout@example.com")).asJava)
}
private def lookupUserEmail(userId: Int): Future[String] = {
// Simulate async call to external service
Future {
Thread.sleep(100) // Simulate network delay
s"user$userId@example.com"
}
}
}
// Async function using Java CompletableFuture
class UserAsyncFunctionJava extends AsyncFunction[User, UserWithEmail] {
override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
val future = CompletableFuture
.supplyAsync(() => s"user${user.id}@example.com")
.thenApply(email => UserWithEmail(user.id, user.name, email))
future.whenComplete((result, exception) => {
if (exception == null) {
resultFuture.complete(List(result).asJava)
} else {
resultFuture.completeExceptionally(exception)
}
})
}
}Rich version of AsyncFunction with access to runtime context and lifecycle methods.
/**
* RichAsyncFunction with lifecycle and context access
*/
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] {
def open(parameters: Configuration): Unit = {}
def close(): Unit = {}
def getRuntimeContext: RuntimeContext
def setRuntimeContext(context: RuntimeContext): Unit
def getIterationRuntimeContext: IterationRuntimeContext
}Usage Examples:
import org.apache.flink.streaming.api.scala.async.RichAsyncFunction
import org.apache.flink.configuration.Configuration
import java.util.concurrent.{ExecutorService, Executors}
class DatabaseAsyncFunction extends RichAsyncFunction[String, String] {
private var executor: ExecutorService = _
private var connectionPool: DatabaseConnectionPool = _
override def open(parameters: Configuration): Unit = {
// Initialize resources
executor = Executors.newFixedThreadPool(10)
connectionPool = new DatabaseConnectionPool(
getRuntimeContext.getExecutionConfig.getGlobalJobParameters
)
}
override def close(): Unit = {
// Cleanup resources
if (executor != null) {
executor.shutdown()
}
if (connectionPool != null) {
connectionPool.close()
}
}
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
executor.submit(new Runnable {
override def run(): Unit = {
try {
val connection = connectionPool.getConnection()
val result = connection.query(s"SELECT value FROM table WHERE key = '$input'")
resultFuture.complete(List(result).asJava)
} catch {
case ex: Exception => resultFuture.completeExceptionally(ex)
}
}
})
}
}Interface for completing asynchronous operations.
/**
* ResultFuture interface for completing async operations
*/
trait ResultFuture[OUT] {
def complete(result: java.util.Collection[OUT]): Unit
def completeExceptionally(throwable: Throwable): Unit
}Usage Examples:
// Complete with single result
resultFuture.complete(List(result).asJava)
// Complete with multiple results
resultFuture.complete(List(result1, result2, result3).asJava)
// Complete with empty result (filter effect)
resultFuture.complete(List.empty[OUT].asJava)
// Complete with exception
resultFuture.completeExceptionally(new RuntimeException("Async operation failed"))Common patterns for implementing async functions.
Database Lookup Pattern:
class DatabaseLookupFunction extends AsyncFunction[String, (String, String)] {
private val httpClient = HttpAsyncClients.createDefault()
override def asyncInvoke(key: String, resultFuture: ResultFuture[(String, String)]): Unit = {
val request = new HttpGet(s"http://api.example.com/lookup?key=$key")
httpClient.execute(request, new FutureCallback[HttpResponse] {
override def completed(response: HttpResponse): Unit = {
val value = EntityUtils.toString(response.getEntity)
resultFuture.complete(List((key, value)).asJava)
}
override def failed(ex: Exception): Unit = {
resultFuture.completeExceptionally(ex)
}
override def cancelled(): Unit = {
resultFuture.completeExceptionally(new RuntimeException("Request cancelled"))
}
})
}
}Caching Pattern:
import scala.collection.concurrent.TrieMap
class CachedAsyncFunction extends RichAsyncFunction[String, String] {
private val cache = new TrieMap[String, String]()
private var cacheHits: Counter = _
private var cacheMisses: Counter = _
override def open(parameters: Configuration): Unit = {
cacheHits = getRuntimeContext.getMetricGroup.counter("cache_hits")
cacheMisses = getRuntimeContext.getMetricGroup.counter("cache_misses")
}
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
cache.get(input) match {
case Some(cachedValue) =>
cacheHits.inc()
resultFuture.complete(List(cachedValue).asJava)
case None =>
cacheMisses.inc()
// Perform async lookup
performAsyncLookup(input).onComplete {
case Success(value) =>
cache.put(input, value)
resultFuture.complete(List(value).asJava)
case Failure(exception) =>
resultFuture.completeExceptionally(exception)
}
}
}
private def performAsyncLookup(input: String): Future[String] = {
// Implementation of actual async lookup
???
}
}Important configuration parameters for async operations.
/**
* Configuration parameters for async operations
*/
// Timeout: Maximum time to wait for async operation
// Capacity: Maximum number of concurrent async operations
// Order: Whether to maintain order of results
// Environment-level configuration
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAsyncTimeout(Duration.ofSeconds(10))// Main async types
object AsyncDataStream
trait AsyncFunction[IN, OUT]
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT]
trait ResultFuture[OUT]
// Configuration and context types
class Configuration
trait RuntimeContext
trait IterationRuntimeContext
// Java interop types
class TimeUnit
class TimeoutException
class CompletableFuture[T]
// Metric types
trait Counter
trait MetricGroup