or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md
tile.json

timeout-handling.mddocs/

Timeout Handling

Handle partial pattern matches that timeout with side outputs for comprehensive event processing, allowing separation of successful matches from timed-out partial matches.

Capabilities

Select with Timeout (Side Output)

Process patterns with timeout handling using side outputs for timed-out partial matches.

class PatternStream[T] {
  /**
   * Select with timeout using side output (recommended approach)
   * @param outputTag OutputTag for timeout events
   * @param patternTimeoutFunction Function to process timed-out patterns
   * @param patternSelectFunction Function to process successful matches
   * @tparam L Type of timeout result
   * @tparam R Type of success result
   * @return DataStream with successful matches, timeouts available via side output
   */
  def select[L: TypeInformation, R: TypeInformation](
    outputTag: OutputTag[L],
    patternTimeoutFunction: PatternTimeoutFunction[T, L],
    patternSelectFunction: PatternSelectFunction[T, R]
  ): DataStream[R]
  
  /**
   * Select with timeout using Scala functions and side output
   * @param outputTag OutputTag for timeout events  
   * @param patternTimeoutFunction Scala function for timeouts
   * @param patternSelectFunction Scala function for matches
   * @tparam L Type of timeout result
   * @tparam R Type of success result
   * @return DataStream with successful matches
   */
  def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
    patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L
  )(
    patternSelectFunction: Map[String, Iterable[T]] => R
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import java.util.{List => JList, Map => JMap}
import java.time.Duration

case class LoginEvent(userId: String, timestamp: Long, sessionId: String)
case class CompletedSession(userId: String, sessionId: String, startTime: Long, endTime: Long)
case class IncompleteSession(userId: String, sessionId: String, startTime: Long, timeoutTime: Long)

// Pattern with time window
val sessionPattern = Pattern.begin[LoginEvent]("login")
  .where(_.sessionId.nonEmpty)
  .followedBy("logout")
  .where(event => event.sessionId.nonEmpty)
  .within(Duration.ofMinutes(30)) // Sessions must complete within 30 minutes

// Define timeout output tag
val timeoutTag = OutputTag[IncompleteSession]("session-timeouts")

// Scala function approach
val sessionResults = CEP.pattern(loginStream, sessionPattern)
  .select(timeoutTag)(
    // Timeout handler - receives partial pattern and timeout timestamp
    (pattern: Map[String, Iterable[LoginEvent]], timeoutTimestamp: Long) => {
      val loginEvent = pattern("login").head
      IncompleteSession(
        loginEvent.userId,
        loginEvent.sessionId, 
        loginEvent.timestamp,
        timeoutTimestamp
      )
    }
  )(
    // Success handler - receives complete pattern
    (pattern: Map[String, Iterable[LoginEvent]]) => {
      val loginEvent = pattern("login").head
      val logoutEvent = pattern("logout").head
      CompletedSession(
        loginEvent.userId,
        loginEvent.sessionId,
        loginEvent.timestamp,
        logoutEvent.timestamp
      )
    }
  )

// Process main results and timeouts separately
sessionResults.print("Completed sessions: ")
sessionResults.getSideOutput(timeoutTag).print("Timed-out sessions: ")

// Java interface approach
val timeoutFunction = new PatternTimeoutFunction[LoginEvent, String] {
  override def timeout(pattern: JMap[String, JList[LoginEvent]], timeoutTimestamp: Long): String = {
    val loginEvent = pattern.get("login").get(0)
    s"Session timeout: ${loginEvent.userId} at $timeoutTimestamp"
  }
}

val selectFunction = new PatternSelectFunction[LoginEvent, String] {
  override def select(pattern: JMap[String, JList[LoginEvent]]): String = {
    val loginEvent = pattern.get("login").get(0)
    val logoutEvent = pattern.get("logout").get(0)
    s"Completed session: ${loginEvent.userId} duration ${logoutEvent.timestamp - loginEvent.timestamp}ms"
  }
}

val javaTimeoutTag = OutputTag[String]("java-timeouts")
val javaResults = CEP.pattern(loginStream, sessionPattern)
  .select(javaTimeoutTag, timeoutFunction, selectFunction)

Flat Select with Timeout (Side Output)

Process patterns with timeout handling that can emit multiple results per match.

class PatternStream[T] {
  /**
   * Flat select with timeout using side output
   * @param outputTag OutputTag for timeout events
   * @param patternFlatTimeoutFunction Function to process timed-out patterns
   * @param patternFlatSelectFunction Function to process successful matches
   * @tparam L Type of timeout result
   * @tparam R Type of success result
   * @return DataStream with successful matches
   */
  def flatSelect[L: TypeInformation, R: TypeInformation](
    outputTag: OutputTag[L],
    patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
    patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
  ): DataStream[R]
  
  /**
   * Flat select with timeout using Scala functions and side output
   * @param outputTag OutputTag for timeout events
   * @param patternFlatTimeoutFunction Scala function for timeouts
   * @param patternFlatSelectFunction Scala function for matches  
   * @tparam L Type of timeout result
   * @tparam R Type of success result
   * @return DataStream with successful matches
   */
  def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
    patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit
  )(
    patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.util.Collector
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}

case class TransactionEvent(accountId: String, amount: Double, location: String, timestamp: Long)
case class SuspiciousActivity(accountId: String, reason: String, amount: Double, locations: Set[String])
case class TimeoutAlert(accountId: String, partialAmount: Double, duration: Long)

// Pattern for detecting suspicious transaction sequences
val suspiciousPattern = Pattern.begin[TransactionEvent]("transactions")
  .where(_.amount > 1000)
  .oneOrMore
  .within(Duration.ofMinutes(10))

val timeoutTag = OutputTag[TimeoutAlert]("suspicious-timeouts")

// Scala flat select with timeout
val suspiciousResults = CEP.pattern(transactionStream, suspiciousPattern)
  .flatSelect(timeoutTag)(
    // Timeout handler - can emit multiple timeout alerts
    (pattern: Map[String, Iterable[TransactionEvent]], timeoutTimestamp: Long, out: Collector[TimeoutAlert]) => {
      val transactions = pattern("transactions")
      val accountId = transactions.head.accountId
      val totalAmount = transactions.map(_.amount).sum
      val firstTransaction = transactions.minBy(_.timestamp)
      val duration = timeoutTimestamp - firstTransaction.timestamp
      
      // Emit timeout alert
      out.collect(TimeoutAlert(accountId, totalAmount, duration))
      
      // Additional timeout processing if needed
      if (totalAmount > 5000) {
        out.collect(TimeoutAlert(s"$accountId-high-value", totalAmount, duration))
      }
    }
  )(
    // Success handler - can emit multiple suspicious activity reports
    (pattern: Map[String, Iterable[TransactionEvent]], out: Collector[SuspiciousActivity]) => {
      val transactions = pattern("transactions")
      val accountId = transactions.head.accountId
      val totalAmount = transactions.map(_.amount).sum
      val locations = transactions.map(_.location).toSet
      
      // Emit volume-based alert
      if (totalAmount > 10000) {
        out.collect(SuspiciousActivity(accountId, "high-volume", totalAmount, locations))
      }
      
      // Emit location-based alert  
      if (locations.size > 3) {
        out.collect(SuspiciousActivity(accountId, "multiple-locations", totalAmount, locations))
      }
      
      // Emit frequency-based alert
      if (transactions.size > 5) {
        out.collect(SuspiciousActivity(accountId, "high-frequency", totalAmount, locations))
      }
    }
  )

// Process results and timeouts
suspiciousResults.print("Suspicious activities: ")
suspiciousResults.getSideOutput(timeoutTag).print("Timeout alerts: ")

Deprecated Either-Based Timeout Handling

Legacy timeout handling using Either types (deprecated in favor of side outputs).

class PatternStream[T] {
  /**
   * Select with timeout using Either return type (deprecated)
   * @param patternTimeoutFunction Function for timeout processing
   * @param patternSelectFunction Function for match processing
   * @tparam L Type of timeout result
   * @tparam R Type of success result
   * @return DataStream of Either[L, R] containing both results
   */
  @deprecated("Use version with side outputs", "1.18.0")
  def select[L: TypeInformation, R: TypeInformation](
    patternTimeoutFunction: PatternTimeoutFunction[T, L],
    patternSelectFunction: PatternSelectFunction[T, R]
  ): DataStream[Either[L, R]]
  
  /**
   * Flat select with timeout using Either return type (deprecated)
   * @param patternFlatTimeoutFunction Function for timeout processing
   * @param patternFlatSelectFunction Function for match processing
   * @tparam L Type of timeout result
   * @tparam R Type of success result
   * @return DataStream of Either[L, R] containing both results
   */
  @deprecated("Use version with side outputs", "1.18.0")  
  def flatSelect[L: TypeInformation, R: TypeInformation](
    patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
    patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
  ): DataStream[Either[L, R]]
}

Usage Examples:

// Deprecated Either-based approach (not recommended)
val eitherResults = CEP.pattern(dataStream, pattern)
  .select(timeoutFunction, selectFunction)

// Separate Left (timeout) and Right (success) results
val (timeouts, successes) = eitherResults.split(
  either => either.isLeft,
  either => either.isRight
)

val timeoutResults = timeouts.map(_.left.get)
val successResults = successes.map(_.right.get)

Advanced Timeout Scenarios

Complex timeout handling with multiple patterns and cascading timeouts.

Usage Examples:

case class OrderEvent(orderId: String, stage: String, timestamp: Long, customerId: String)
case class OrderProgress(orderId: String, completedStages: List[String], timeSpent: Long)
case class OrderTimeout(orderId: String, lastStage: String, customerId: String, timeoutAt: Long)

// Multi-stage order processing pattern
val orderPattern = Pattern.begin[OrderEvent]("created")
  .where(_.stage == "created")
  .followedBy("paid")
  .where(_.stage == "paid")
  .followedBy("shipped")
  .where(_.stage == "shipped")
  .followedBy("delivered")
  .where(_.stage == "delivered")
  .within(Duration.ofDays(7)) // Orders should complete within a week

val orderTimeoutTag = OutputTag[OrderTimeout]("order-timeouts")

val orderResults = CEP.pattern(orderStream, orderPattern)
  .select(orderTimeoutTag)(
    // Timeout handler - determine which stage failed
    (pattern: Map[String, Iterable[OrderEvent]], timeoutTimestamp: Long) => {
      val allEvents = pattern.values.flatten.toList.sortBy(_.timestamp)
      val lastEvent = allEvents.lastOption
      
      lastEvent match {
        case Some(event) =>
          OrderTimeout(event.orderId, event.stage, event.customerId, timeoutTimestamp)
        case None =>
          // Should not happen, but handle gracefully
          OrderTimeout("unknown", "none", "unknown", timeoutTimestamp)
      }
    }
  )(
    // Success handler
    (pattern: Map[String, Iterable[OrderEvent]]) => {
      val created = pattern("created").head
      val delivered = pattern("delivered").head
      val stages = List("created", "paid", "shipped", "delivered")
      val timeSpent = delivered.timestamp - created.timestamp
      
      OrderProgress(created.orderId, stages, timeSpent)
    }
  )

// Advanced timeout analysis
val timeoutAnalysis = orderResults.getSideOutput(orderTimeoutTag)
  .map { timeout =>
    val stage = timeout.lastStage match {
      case "created" => "Payment failed"
      case "paid" => "Shipping failed" 
      case "shipped" => "Delivery failed"
      case _ => "Unknown failure"
    }
    s"Order ${timeout.orderId}: $stage (customer: ${timeout.customerId})"
  }

orderResults.print("Completed orders: ")
timeoutAnalysis.print("Failed orders: ")

Types

// Timeout function interfaces from Java CEP
import org.apache.flink.cep.PatternTimeoutFunction
import org.apache.flink.cep.PatternFlatTimeoutFunction

// Side output support
import org.apache.flink.streaming.api.scala.OutputTag

// Collection types
import java.util.{List => JList, Map => JMap}
import scala.collection.Map

// Abstract timeout function interfaces
abstract class PatternTimeoutFunction[T, L] {
  def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L
}

abstract class PatternFlatTimeoutFunction[T, L] {
  def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long, out: Collector[L]): Unit
}

// Either type for deprecated methods
sealed abstract class Either[+A, +B]
case class Left[+A](value: A) extends Either[A, Nothing]
case class Right[+B](value: B) extends Either[Nothing, B]