CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

async-io.mddocs/

Async I/O Operations

AsyncDataStream provides high-performance async operations for enriching streams with external data without blocking stream processing. This is essential for calling external services, databases, or APIs while maintaining streaming performance.

Capabilities

Unordered Async Operations

Async operations where result order doesn't need to match input order (higher throughput).

object AsyncDataStream {
  /**
   * Apply async transformation with unordered results using AsyncFunction
   * @param input Input DataStream to transform
   * @param asyncFunction AsyncFunction implementation
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @param capacity Queue capacity for async operations
   * @return DataStream with async transformation results
   */
  def unorderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    asyncFunction: AsyncFunction[IN, OUT], 
    timeout: Long, 
    timeUnit: TimeUnit, 
    capacity: Int
  ): DataStream[OUT]
  
  /**
   * Apply async transformation with unordered results (default capacity)
   * @param input Input DataStream to transform
   * @param asyncFunction AsyncFunction implementation
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @return DataStream with async transformation results
   */
  def unorderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    asyncFunction: AsyncFunction[IN, OUT], 
    timeout: Long, 
    timeUnit: TimeUnit
  ): DataStream[OUT]
  
  /**
   * Apply async transformation using function syntax (with capacity)
   * @param input Input DataStream to transform
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @param capacity Queue capacity for async operations
   * @param asyncFunction Function taking input and ResultFuture
   * @return DataStream with async transformation results
   */
  def unorderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    timeout: Long, 
    timeUnit: TimeUnit, 
    capacity: Int
  )(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
  
  /**
   * Apply async transformation using function syntax (default capacity)
   * @param input Input DataStream to transform
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @param asyncFunction Function taking input and ResultFuture
   * @return DataStream with async transformation results
   */
  def unorderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    timeout: Long, 
    timeUnit: TimeUnit
  )(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
}

Ordered Async Operations

Async operations where result order matches input order (preserves stream ordering).

object AsyncDataStream {
  /**
   * Apply async transformation with ordered results using AsyncFunction
   * @param input Input DataStream to transform
   * @param asyncFunction AsyncFunction implementation
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @param capacity Queue capacity for async operations
   * @return DataStream with async transformation results (ordered)
   */
  def orderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    asyncFunction: AsyncFunction[IN, OUT], 
    timeout: Long, 
    timeUnit: TimeUnit, 
    capacity: Int
  ): DataStream[OUT]
  
  /**
   * Apply async transformation with ordered results (default capacity)
   * @param input Input DataStream to transform
   * @param asyncFunction AsyncFunction implementation
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @return DataStream with async transformation results (ordered)
   */
  def orderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    asyncFunction: AsyncFunction[IN, OUT], 
    timeout: Long, 
    timeUnit: TimeUnit
  ): DataStream[OUT]
  
  /**
   * Apply async transformation using function syntax with ordering (with capacity)
   * @param input Input DataStream to transform
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @param capacity Queue capacity for async operations
   * @param asyncFunction Function taking input and ResultFuture
   * @return DataStream with async transformation results (ordered)
   */
  def orderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    timeout: Long, 
    timeUnit: TimeUnit, 
    capacity: Int
  )(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
  
  /**
   * Apply async transformation using function syntax with ordering (default capacity)
   * @param input Input DataStream to transform
   * @param timeout Timeout for async operations
   * @param timeUnit Time unit for timeout
   * @param asyncFunction Function taking input and ResultFuture
   * @return DataStream with async transformation results (ordered)
   */
  def orderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    timeout: Long, 
    timeUnit: TimeUnit
  )(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import scala.util.{Success, Failure}

case class User(id: String, name: String)
case class UserProfile(id: String, name: String, email: String, department: String)

val users = env.fromElements(
  User("u1", "Alice"),
  User("u2", "Bob"),
  User("u3", "Charlie")
)

// Using AsyncFunction implementation
class DatabaseLookupFunction extends AsyncFunction[User, UserProfile] {
  override def asyncInvoke(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {
    // Simulate async database call
    Future {
      // Database lookup simulation
      UserProfile(user.id, user.name, s"${user.name.toLowerCase}@company.com", "Engineering")
    }.onComplete {
      case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))
      case Failure(exception) => resultFuture.completeExceptionally(exception)
    }
  }
}

// Unordered async transformation (higher throughput)
val enrichedUsersUnordered = AsyncDataStream.unorderedWait(
  users,
  new DatabaseLookupFunction,
  5000, // 5 second timeout
  TimeUnit.MILLISECONDS
)

// Ordered async transformation (preserves order)
val enrichedUsersOrdered = AsyncDataStream.orderedWait(
  users,
  new DatabaseLookupFunction,
  5000,
  TimeUnit.MILLISECONDS
)

// Using function syntax
val enrichedWithFunction = AsyncDataStream.unorderedWait(
  users,
  5000,
  TimeUnit.MILLISECONDS
) { (user: User, resultFuture: ResultFuture[UserProfile]) =>
  // Async operation using function syntax
  Future {
    UserProfile(user.id, user.name, s"${user.name.toLowerCase}@company.com", "Engineering")
  }.onComplete {
    case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))
    case Failure(exception) => resultFuture.completeExceptionally(exception)
  }
}

AsyncFunction Interface

Base interface for implementing async operations.

trait AsyncFunction[IN, OUT] {
  /**
   * Async invocation method - implement async logic here
   * @param input Input element to process
   * @param resultFuture ResultFuture to complete with results
   */
  def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
  
  /**
   * Optional timeout handling method
   * @param input Input element that timed out
   * @param resultFuture ResultFuture to complete for timeout case
   */
  def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
    resultFuture.completeExceptionally(
      new RuntimeException(s"Async operation timed out for input: $input")
    )
  }
}

RichAsyncFunction

Rich version of AsyncFunction with access to runtime context.

abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] with RichFunction {
  /**
   * Get the runtime context
   * @return RuntimeContext for accessing configuration and metrics
   */
  def getRuntimeContext: RuntimeContext
  
  /**
   * Open method called once per parallel instance
   * @param parameters Configuration parameters
   */
  override def open(parameters: Configuration): Unit = {}
  
  /**
   * Close method called when function is shut down
   */
  override def close(): Unit = {}
  
  /**
   * Set runtime context (called by Flink runtime)
   * @param t Runtime context
   */
  override def setRuntimeContext(t: RuntimeContext): Unit
}

Usage Examples:

import org.apache.flink.streaming.api.scala.async.RichAsyncFunction
import org.apache.flink.configuration.Configuration
import java.sql.{Connection, DriverManager}

class DatabaseAsyncFunction extends RichAsyncFunction[User, UserProfile] {
  private var connection: Connection = _
  
  override def open(parameters: Configuration): Unit = {
    // Initialize database connection
    connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "")
    // Setup connection pool, initialize client, etc.
  }
  
  override def asyncInvoke(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {
    Future {
      // Use the connection for database lookup
      val stmt = connection.prepareStatement("SELECT * FROM user_profiles WHERE id = ?")
      stmt.setString(1, user.id)
      val rs = stmt.executeQuery()
      
      if (rs.next()) {
        UserProfile(
          rs.getString("id"),
          rs.getString("name"), 
          rs.getString("email"),
          rs.getString("department")
        )
      } else {
        throw new RuntimeException(s"User profile not found for ${user.id}")
      }
    }.onComplete {
      case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))
      case Failure(exception) => resultFuture.completeExceptionally(exception)
    }
  }
  
  override def timeout(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {
    // Custom timeout handling
    resultFuture.complete(java.util.Collections.singletonList(
      UserProfile(user.id, user.name, "unknown@company.com", "Unknown")
    ))
  }
  
  override def close(): Unit = {
    if (connection != null) {
      connection.close()
    }
  }
}

ResultFuture Interface

Interface for completing async operations with results or errors.

trait ResultFuture[T] {
  /**
   * Complete the async operation with a collection of results
   * @param result Collection of result elements
   */
  def complete(result: java.util.Collection[T]): Unit
  
  /**
   * Complete the async operation with varargs results
   * @param result Result elements as varargs
   */
  def complete(result: T*): Unit = {
    complete(java.util.Arrays.asList(result: _*))
  }
  
  /**
   * Complete the async operation with an exception
   * @param error Exception that occurred during async operation
   */
  def completeExceptionally(error: Throwable): Unit
}

Retry Strategies

Support for retry logic in async operations.

// Retry strategy interface
trait AsyncRetryStrategy[OUT] {
  def canRetry(currentAttempts: Int): Boolean
  def getRetryDelay(currentAttempts: Int): Long
}

// Retry predicate interface  
trait AsyncRetryPredicate[OUT] {
  def resultPredicate(result: OUT): Boolean
  def exceptionPredicate(exception: Throwable): Boolean
}

// Built-in retry strategies
object AsyncRetryStrategies {
  def fixedDelay(maxAttempts: Int, delay: Long): AsyncRetryStrategy[_]
  def exponentialBackoff(maxAttempts: Int, initialDelay: Long, multiplier: Double): AsyncRetryStrategy[_]
  def noRetry(): AsyncRetryStrategy[_]
}

// Built-in retry predicates
object RetryPredicates {
  def hasException[OUT](exceptionClass: Class[_ <: Throwable]): AsyncRetryPredicate[OUT]
  def hasResult[OUT](predicate: OUT => Boolean): AsyncRetryPredicate[OUT]
}

Types

// Time units for timeout specification
object TimeUnit extends Enumeration {
  val NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS = Value
}

// Runtime context for rich functions
trait RuntimeContext {
  def getTaskName: String
  def getMetricGroup: MetricGroup
  def getNumberOfParallelSubtasks: Int
  def getIndexOfThisSubtask: Int
  def getExecutionConfig: ExecutionConfig
  def getUserCodeClassLoader: ClassLoader
}

// Rich function base interface
trait RichFunction {
  def open(parameters: Configuration): Unit
  def close(): Unit
  def getRuntimeContext: RuntimeContext
  def setRuntimeContext(t: RuntimeContext): Unit
}

// Configuration for open method
class Configuration extends java.util.HashMap[String, String] {
  def getString(key: String, defaultValue: String): String
  def getInteger(key: String, defaultValue: Int): Int
  def getBoolean(key: String, defaultValue: Boolean): Boolean
  def getFloat(key: String, defaultValue: Float): Float
  def getDouble(key: String, defaultValue: Double): Double
  def getLong(key: String, defaultValue: Long): Long
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json