or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md
tile.json

async-operations.mddocs/

Async Operations

Flink's Async I/O functionality allows you to perform non-blocking calls to external systems (databases, web services, etc.) without blocking the stream processing pipeline. This can significantly improve throughput when dealing with external data enrichment or lookups.

Capabilities

AsyncDataStream

Main entry point for creating asynchronous data streams.

/**
 * AsyncDataStream operations for creating async streams
 */
object AsyncDataStream {
  def orderedWait[IN, OUT](
    stream: DataStream[IN],
    function: AsyncFunction[IN, OUT], 
    timeout: Long,
    timeUnit: TimeUnit
  ): DataStream[OUT]
  
  def orderedWait[IN, OUT](
    stream: DataStream[IN],
    function: AsyncFunction[IN, OUT],
    timeout: Long,
    timeUnit: TimeUnit,
    capacity: Int
  ): DataStream[OUT]
  
  def unorderedWait[IN, OUT](
    stream: DataStream[IN],
    function: AsyncFunction[IN, OUT],
    timeout: Long, 
    timeUnit: TimeUnit
  ): DataStream[OUT]
  
  def unorderedWait[IN, OUT](
    stream: DataStream[IN],
    function: AsyncFunction[IN, OUT],
    timeout: Long,
    timeUnit: TimeUnit,
    capacity: Int
  ): DataStream[OUT]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
import java.util.concurrent.TimeUnit

case class User(id: Int, name: String)
case class UserWithEmail(id: Int, name: String, email: String)

val users = env.fromElements(
  User(1, "Alice"),
  User(2, "Bob"), 
  User(3, "Charlie")
)

// Ordered async operation - maintains order of results
val enrichedUsersOrdered = AsyncDataStream.orderedWait(
  users,
  new UserEmailAsyncFunction(),
  5000,  // 5 second timeout
  TimeUnit.MILLISECONDS
)

// Unordered async operation - better throughput, no order guarantee
val enrichedUsersUnordered = AsyncDataStream.unorderedWait(
  users,
  new UserEmailAsyncFunction(),
  5000,  // 5 second timeout
  TimeUnit.MILLISECONDS,
  100    // capacity of 100 concurrent requests
)

AsyncFunction Interface

Core interface for implementing asynchronous functions.

/**
 * AsyncFunction interface for async operations
 */
trait AsyncFunction[IN, OUT] {
  def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
  
  // Optional timeout handling
  def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
    resultFuture.completeExceptionally(
      new TimeoutException("Async operation timed out")
    )
  }
}

Usage Examples:

import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Success, Failure}
import java.util.concurrent.{CompletableFuture, TimeoutException}

// Async function using Scala Future
class UserEmailAsyncFunction extends AsyncFunction[User, UserWithEmail] {
  
  implicit val ec: ExecutionContext = ExecutionContext.global
  
  override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
    // Simulate async database lookup
    val emailFuture: Future[String] = lookupUserEmail(user.id)
    
    emailFuture.onComplete {
      case Success(email) =>
        resultFuture.complete(List(UserWithEmail(user.id, user.name, email)).asJava)
      case Failure(exception) =>
        resultFuture.completeExceptionally(exception)
    }
  }
  
  override def timeout(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
    // Custom timeout handling
    resultFuture.complete(List(UserWithEmail(user.id, user.name, "timeout@example.com")).asJava)
  }
  
  private def lookupUserEmail(userId: Int): Future[String] = {
    // Simulate async call to external service
    Future {
      Thread.sleep(100) // Simulate network delay
      s"user$userId@example.com"
    }
  }
}

// Async function using Java CompletableFuture
class UserAsyncFunctionJava extends AsyncFunction[User, UserWithEmail] {
  
  override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {
    val future = CompletableFuture
      .supplyAsync(() => s"user${user.id}@example.com")
      .thenApply(email => UserWithEmail(user.id, user.name, email))
    
    future.whenComplete((result, exception) => {
      if (exception == null) {
        resultFuture.complete(List(result).asJava)
      } else {
        resultFuture.completeExceptionally(exception)
      }
    })
  }
}

RichAsyncFunction

Rich version of AsyncFunction with access to runtime context and lifecycle methods.

/**
 * RichAsyncFunction with lifecycle and context access
 */
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] {
  def open(parameters: Configuration): Unit = {}
  def close(): Unit = {}
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(context: RuntimeContext): Unit
  def getIterationRuntimeContext: IterationRuntimeContext
}

Usage Examples:

import org.apache.flink.streaming.api.scala.async.RichAsyncFunction
import org.apache.flink.configuration.Configuration
import java.util.concurrent.{ExecutorService, Executors}

class DatabaseAsyncFunction extends RichAsyncFunction[String, String] {
  
  private var executor: ExecutorService = _
  private var connectionPool: DatabaseConnectionPool = _
  
  override def open(parameters: Configuration): Unit = {
    // Initialize resources
    executor = Executors.newFixedThreadPool(10)
    connectionPool = new DatabaseConnectionPool(
      getRuntimeContext.getExecutionConfig.getGlobalJobParameters
    )
  }
  
  override def close(): Unit = {
    // Cleanup resources
    if (executor != null) {
      executor.shutdown()
    }
    if (connectionPool != null) {
      connectionPool.close()
    }
  }
  
  override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
    executor.submit(new Runnable {
      override def run(): Unit = {
        try {
          val connection = connectionPool.getConnection()
          val result = connection.query(s"SELECT value FROM table WHERE key = '$input'")
          resultFuture.complete(List(result).asJava)
        } catch {
          case ex: Exception => resultFuture.completeExceptionally(ex)
        }
      }
    })
  }
}

ResultFuture Interface

Interface for completing asynchronous operations.

/**
 * ResultFuture interface for completing async operations
 */
trait ResultFuture[OUT] {
  def complete(result: java.util.Collection[OUT]): Unit
  def completeExceptionally(throwable: Throwable): Unit
}

Usage Examples:

// Complete with single result
resultFuture.complete(List(result).asJava)

// Complete with multiple results
resultFuture.complete(List(result1, result2, result3).asJava)

// Complete with empty result (filter effect)
resultFuture.complete(List.empty[OUT].asJava)

// Complete with exception
resultFuture.completeExceptionally(new RuntimeException("Async operation failed"))

Async Function Patterns

Common patterns for implementing async functions.

Database Lookup Pattern:

class DatabaseLookupFunction extends AsyncFunction[String, (String, String)] {
  
  private val httpClient = HttpAsyncClients.createDefault()
  
  override def asyncInvoke(key: String, resultFuture: ResultFuture[(String, String)]): Unit = {
    val request = new HttpGet(s"http://api.example.com/lookup?key=$key")
    
    httpClient.execute(request, new FutureCallback[HttpResponse] {
      override def completed(response: HttpResponse): Unit = {
        val value = EntityUtils.toString(response.getEntity)
        resultFuture.complete(List((key, value)).asJava)
      }
      
      override def failed(ex: Exception): Unit = {
        resultFuture.completeExceptionally(ex)
      }
      
      override def cancelled(): Unit = {
        resultFuture.completeExceptionally(new RuntimeException("Request cancelled"))
      }
    })
  }
}

Caching Pattern:

import scala.collection.concurrent.TrieMap

class CachedAsyncFunction extends RichAsyncFunction[String, String] {
  
  private val cache = new TrieMap[String, String]()
  private var cacheHits: Counter = _
  private var cacheMisses: Counter = _
  
  override def open(parameters: Configuration): Unit = {
    cacheHits = getRuntimeContext.getMetricGroup.counter("cache_hits")
    cacheMisses = getRuntimeContext.getMetricGroup.counter("cache_misses")
  }
  
  override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
    cache.get(input) match {
      case Some(cachedValue) =>
        cacheHits.inc()
        resultFuture.complete(List(cachedValue).asJava)
        
      case None =>
        cacheMisses.inc()
        // Perform async lookup
        performAsyncLookup(input).onComplete {
          case Success(value) =>
            cache.put(input, value)
            resultFuture.complete(List(value).asJava)
          case Failure(exception) =>
            resultFuture.completeExceptionally(exception)
        }
    }
  }
  
  private def performAsyncLookup(input: String): Future[String] = {
    // Implementation of actual async lookup
    ???
  }
}

Configuration and Tuning

Important configuration parameters for async operations.

/**
 * Configuration parameters for async operations
 */
// Timeout: Maximum time to wait for async operation
// Capacity: Maximum number of concurrent async operations
// Order: Whether to maintain order of results

// Environment-level configuration
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAsyncTimeout(Duration.ofSeconds(10))

Types

// Main async types
object AsyncDataStream
trait AsyncFunction[IN, OUT]
abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT]
trait ResultFuture[OUT]

// Configuration and context types
class Configuration
trait RuntimeContext
trait IterationRuntimeContext

// Java interop types
class TimeUnit
class TimeoutException
class CompletableFuture[T]

// Metric types
trait Counter
trait MetricGroup