Handle partial pattern matches that timeout with side outputs for comprehensive event processing, allowing separation of successful matches from timed-out partial matches.
Process patterns with timeout handling using side outputs for timed-out partial matches.
class PatternStream[T] {
/**
* Select with timeout using side output (recommended approach)
* @param outputTag OutputTag for timeout events
* @param patternTimeoutFunction Function to process timed-out patterns
* @param patternSelectFunction Function to process successful matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches, timeouts available via side output
*/
def select[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R]
): DataStream[R]
/**
* Select with timeout using Scala functions and side output
* @param outputTag OutputTag for timeout events
* @param patternTimeoutFunction Scala function for timeouts
* @param patternSelectFunction Scala function for matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches
*/
def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L
)(
patternSelectFunction: Map[String, Iterable[T]] => R
): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import java.util.{List => JList, Map => JMap}
import java.time.Duration
case class LoginEvent(userId: String, timestamp: Long, sessionId: String)
case class CompletedSession(userId: String, sessionId: String, startTime: Long, endTime: Long)
case class IncompleteSession(userId: String, sessionId: String, startTime: Long, timeoutTime: Long)
// Pattern with time window
val sessionPattern = Pattern.begin[LoginEvent]("login")
.where(_.sessionId.nonEmpty)
.followedBy("logout")
.where(event => event.sessionId.nonEmpty)
.within(Duration.ofMinutes(30)) // Sessions must complete within 30 minutes
// Define timeout output tag
val timeoutTag = OutputTag[IncompleteSession]("session-timeouts")
// Scala function approach
val sessionResults = CEP.pattern(loginStream, sessionPattern)
.select(timeoutTag)(
// Timeout handler - receives partial pattern and timeout timestamp
(pattern: Map[String, Iterable[LoginEvent]], timeoutTimestamp: Long) => {
val loginEvent = pattern("login").head
IncompleteSession(
loginEvent.userId,
loginEvent.sessionId,
loginEvent.timestamp,
timeoutTimestamp
)
}
)(
// Success handler - receives complete pattern
(pattern: Map[String, Iterable[LoginEvent]]) => {
val loginEvent = pattern("login").head
val logoutEvent = pattern("logout").head
CompletedSession(
loginEvent.userId,
loginEvent.sessionId,
loginEvent.timestamp,
logoutEvent.timestamp
)
}
)
// Process main results and timeouts separately
sessionResults.print("Completed sessions: ")
sessionResults.getSideOutput(timeoutTag).print("Timed-out sessions: ")
// Java interface approach
val timeoutFunction = new PatternTimeoutFunction[LoginEvent, String] {
override def timeout(pattern: JMap[String, JList[LoginEvent]], timeoutTimestamp: Long): String = {
val loginEvent = pattern.get("login").get(0)
s"Session timeout: ${loginEvent.userId} at $timeoutTimestamp"
}
}
val selectFunction = new PatternSelectFunction[LoginEvent, String] {
override def select(pattern: JMap[String, JList[LoginEvent]]): String = {
val loginEvent = pattern.get("login").get(0)
val logoutEvent = pattern.get("logout").get(0)
s"Completed session: ${loginEvent.userId} duration ${logoutEvent.timestamp - loginEvent.timestamp}ms"
}
}
val javaTimeoutTag = OutputTag[String]("java-timeouts")
val javaResults = CEP.pattern(loginStream, sessionPattern)
.select(javaTimeoutTag, timeoutFunction, selectFunction)Process patterns with timeout handling that can emit multiple results per match.
class PatternStream[T] {
/**
* Flat select with timeout using side output
* @param outputTag OutputTag for timeout events
* @param patternFlatTimeoutFunction Function to process timed-out patterns
* @param patternFlatSelectFunction Function to process successful matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches
*/
def flatSelect[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
): DataStream[R]
/**
* Flat select with timeout using Scala functions and side output
* @param outputTag OutputTag for timeout events
* @param patternFlatTimeoutFunction Scala function for timeouts
* @param patternFlatSelectFunction Scala function for matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches
*/
def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit
)(
patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit
): DataStream[R]
}Usage Examples:
import org.apache.flink.util.Collector
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
case class TransactionEvent(accountId: String, amount: Double, location: String, timestamp: Long)
case class SuspiciousActivity(accountId: String, reason: String, amount: Double, locations: Set[String])
case class TimeoutAlert(accountId: String, partialAmount: Double, duration: Long)
// Pattern for detecting suspicious transaction sequences
val suspiciousPattern = Pattern.begin[TransactionEvent]("transactions")
.where(_.amount > 1000)
.oneOrMore
.within(Duration.ofMinutes(10))
val timeoutTag = OutputTag[TimeoutAlert]("suspicious-timeouts")
// Scala flat select with timeout
val suspiciousResults = CEP.pattern(transactionStream, suspiciousPattern)
.flatSelect(timeoutTag)(
// Timeout handler - can emit multiple timeout alerts
(pattern: Map[String, Iterable[TransactionEvent]], timeoutTimestamp: Long, out: Collector[TimeoutAlert]) => {
val transactions = pattern("transactions")
val accountId = transactions.head.accountId
val totalAmount = transactions.map(_.amount).sum
val firstTransaction = transactions.minBy(_.timestamp)
val duration = timeoutTimestamp - firstTransaction.timestamp
// Emit timeout alert
out.collect(TimeoutAlert(accountId, totalAmount, duration))
// Additional timeout processing if needed
if (totalAmount > 5000) {
out.collect(TimeoutAlert(s"$accountId-high-value", totalAmount, duration))
}
}
)(
// Success handler - can emit multiple suspicious activity reports
(pattern: Map[String, Iterable[TransactionEvent]], out: Collector[SuspiciousActivity]) => {
val transactions = pattern("transactions")
val accountId = transactions.head.accountId
val totalAmount = transactions.map(_.amount).sum
val locations = transactions.map(_.location).toSet
// Emit volume-based alert
if (totalAmount > 10000) {
out.collect(SuspiciousActivity(accountId, "high-volume", totalAmount, locations))
}
// Emit location-based alert
if (locations.size > 3) {
out.collect(SuspiciousActivity(accountId, "multiple-locations", totalAmount, locations))
}
// Emit frequency-based alert
if (transactions.size > 5) {
out.collect(SuspiciousActivity(accountId, "high-frequency", totalAmount, locations))
}
}
)
// Process results and timeouts
suspiciousResults.print("Suspicious activities: ")
suspiciousResults.getSideOutput(timeoutTag).print("Timeout alerts: ")Legacy timeout handling using Either types (deprecated in favor of side outputs).
class PatternStream[T] {
/**
* Select with timeout using Either return type (deprecated)
* @param patternTimeoutFunction Function for timeout processing
* @param patternSelectFunction Function for match processing
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream of Either[L, R] containing both results
*/
@deprecated("Use version with side outputs", "1.18.0")
def select[L: TypeInformation, R: TypeInformation](
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R]
): DataStream[Either[L, R]]
/**
* Flat select with timeout using Either return type (deprecated)
* @param patternFlatTimeoutFunction Function for timeout processing
* @param patternFlatSelectFunction Function for match processing
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream of Either[L, R] containing both results
*/
@deprecated("Use version with side outputs", "1.18.0")
def flatSelect[L: TypeInformation, R: TypeInformation](
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
): DataStream[Either[L, R]]
}Usage Examples:
// Deprecated Either-based approach (not recommended)
val eitherResults = CEP.pattern(dataStream, pattern)
.select(timeoutFunction, selectFunction)
// Separate Left (timeout) and Right (success) results
val (timeouts, successes) = eitherResults.split(
either => either.isLeft,
either => either.isRight
)
val timeoutResults = timeouts.map(_.left.get)
val successResults = successes.map(_.right.get)Complex timeout handling with multiple patterns and cascading timeouts.
Usage Examples:
case class OrderEvent(orderId: String, stage: String, timestamp: Long, customerId: String)
case class OrderProgress(orderId: String, completedStages: List[String], timeSpent: Long)
case class OrderTimeout(orderId: String, lastStage: String, customerId: String, timeoutAt: Long)
// Multi-stage order processing pattern
val orderPattern = Pattern.begin[OrderEvent]("created")
.where(_.stage == "created")
.followedBy("paid")
.where(_.stage == "paid")
.followedBy("shipped")
.where(_.stage == "shipped")
.followedBy("delivered")
.where(_.stage == "delivered")
.within(Duration.ofDays(7)) // Orders should complete within a week
val orderTimeoutTag = OutputTag[OrderTimeout]("order-timeouts")
val orderResults = CEP.pattern(orderStream, orderPattern)
.select(orderTimeoutTag)(
// Timeout handler - determine which stage failed
(pattern: Map[String, Iterable[OrderEvent]], timeoutTimestamp: Long) => {
val allEvents = pattern.values.flatten.toList.sortBy(_.timestamp)
val lastEvent = allEvents.lastOption
lastEvent match {
case Some(event) =>
OrderTimeout(event.orderId, event.stage, event.customerId, timeoutTimestamp)
case None =>
// Should not happen, but handle gracefully
OrderTimeout("unknown", "none", "unknown", timeoutTimestamp)
}
}
)(
// Success handler
(pattern: Map[String, Iterable[OrderEvent]]) => {
val created = pattern("created").head
val delivered = pattern("delivered").head
val stages = List("created", "paid", "shipped", "delivered")
val timeSpent = delivered.timestamp - created.timestamp
OrderProgress(created.orderId, stages, timeSpent)
}
)
// Advanced timeout analysis
val timeoutAnalysis = orderResults.getSideOutput(orderTimeoutTag)
.map { timeout =>
val stage = timeout.lastStage match {
case "created" => "Payment failed"
case "paid" => "Shipping failed"
case "shipped" => "Delivery failed"
case _ => "Unknown failure"
}
s"Order ${timeout.orderId}: $stage (customer: ${timeout.customerId})"
}
orderResults.print("Completed orders: ")
timeoutAnalysis.print("Failed orders: ")// Timeout function interfaces from Java CEP
import org.apache.flink.cep.PatternTimeoutFunction
import org.apache.flink.cep.PatternFlatTimeoutFunction
// Side output support
import org.apache.flink.streaming.api.scala.OutputTag
// Collection types
import java.util.{List => JList, Map => JMap}
import scala.collection.Map
// Abstract timeout function interfaces
abstract class PatternTimeoutFunction[T, L] {
def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L
}
abstract class PatternFlatTimeoutFunction[T, L] {
def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long, out: Collector[L]): Unit
}
// Either type for deprecated methods
sealed abstract class Either[+A, +B]
case class Left[+A](value: A) extends Either[A, Nothing]
case class Right[+B](value: B) extends Either[Nothing, B]