Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications with pattern matching and event stream processing.
Handle partial pattern matches that timeout with side outputs for comprehensive event processing, allowing separation of successful matches from timed-out partial matches.
Process patterns with timeout handling using side outputs for timed-out partial matches.
class PatternStream[T] {
/**
* Select with timeout using side output (recommended approach)
* @param outputTag OutputTag for timeout events
* @param patternTimeoutFunction Function to process timed-out patterns
* @param patternSelectFunction Function to process successful matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches, timeouts available via side output
*/
def select[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R]
): DataStream[R]
/**
* Select with timeout using Scala functions and side output
* @param outputTag OutputTag for timeout events
* @param patternTimeoutFunction Scala function for timeouts
* @param patternSelectFunction Scala function for matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches
*/
def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L
)(
patternSelectFunction: Map[String, Iterable[T]] => R
): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import java.util.{List => JList, Map => JMap}
import java.time.Duration
case class LoginEvent(userId: String, timestamp: Long, sessionId: String)
case class CompletedSession(userId: String, sessionId: String, startTime: Long, endTime: Long)
case class IncompleteSession(userId: String, sessionId: String, startTime: Long, timeoutTime: Long)
// Pattern with time window
val sessionPattern = Pattern.begin[LoginEvent]("login")
.where(_.sessionId.nonEmpty)
.followedBy("logout")
.where(event => event.sessionId.nonEmpty)
.within(Duration.ofMinutes(30)) // Sessions must complete within 30 minutes
// Define timeout output tag
val timeoutTag = OutputTag[IncompleteSession]("session-timeouts")
// Scala function approach
val sessionResults = CEP.pattern(loginStream, sessionPattern)
.select(timeoutTag)(
// Timeout handler - receives partial pattern and timeout timestamp
(pattern: Map[String, Iterable[LoginEvent]], timeoutTimestamp: Long) => {
val loginEvent = pattern("login").head
IncompleteSession(
loginEvent.userId,
loginEvent.sessionId,
loginEvent.timestamp,
timeoutTimestamp
)
}
)(
// Success handler - receives complete pattern
(pattern: Map[String, Iterable[LoginEvent]]) => {
val loginEvent = pattern("login").head
val logoutEvent = pattern("logout").head
CompletedSession(
loginEvent.userId,
loginEvent.sessionId,
loginEvent.timestamp,
logoutEvent.timestamp
)
}
)
// Process main results and timeouts separately
sessionResults.print("Completed sessions: ")
sessionResults.getSideOutput(timeoutTag).print("Timed-out sessions: ")
// Java interface approach
val timeoutFunction = new PatternTimeoutFunction[LoginEvent, String] {
override def timeout(pattern: JMap[String, JList[LoginEvent]], timeoutTimestamp: Long): String = {
val loginEvent = pattern.get("login").get(0)
s"Session timeout: ${loginEvent.userId} at $timeoutTimestamp"
}
}
val selectFunction = new PatternSelectFunction[LoginEvent, String] {
override def select(pattern: JMap[String, JList[LoginEvent]]): String = {
val loginEvent = pattern.get("login").get(0)
val logoutEvent = pattern.get("logout").get(0)
s"Completed session: ${loginEvent.userId} duration ${logoutEvent.timestamp - loginEvent.timestamp}ms"
}
}
val javaTimeoutTag = OutputTag[String]("java-timeouts")
val javaResults = CEP.pattern(loginStream, sessionPattern)
.select(javaTimeoutTag, timeoutFunction, selectFunction)Process patterns with timeout handling that can emit multiple results per match.
class PatternStream[T] {
/**
* Flat select with timeout using side output
* @param outputTag OutputTag for timeout events
* @param patternFlatTimeoutFunction Function to process timed-out patterns
* @param patternFlatSelectFunction Function to process successful matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches
*/
def flatSelect[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
): DataStream[R]
/**
* Flat select with timeout using Scala functions and side output
* @param outputTag OutputTag for timeout events
* @param patternFlatTimeoutFunction Scala function for timeouts
* @param patternFlatSelectFunction Scala function for matches
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream with successful matches
*/
def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit
)(
patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit
): DataStream[R]
}Usage Examples:
import org.apache.flink.util.Collector
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
case class TransactionEvent(accountId: String, amount: Double, location: String, timestamp: Long)
case class SuspiciousActivity(accountId: String, reason: String, amount: Double, locations: Set[String])
case class TimeoutAlert(accountId: String, partialAmount: Double, duration: Long)
// Pattern for detecting suspicious transaction sequences
val suspiciousPattern = Pattern.begin[TransactionEvent]("transactions")
.where(_.amount > 1000)
.oneOrMore
.within(Duration.ofMinutes(10))
val timeoutTag = OutputTag[TimeoutAlert]("suspicious-timeouts")
// Scala flat select with timeout
val suspiciousResults = CEP.pattern(transactionStream, suspiciousPattern)
.flatSelect(timeoutTag)(
// Timeout handler - can emit multiple timeout alerts
(pattern: Map[String, Iterable[TransactionEvent]], timeoutTimestamp: Long, out: Collector[TimeoutAlert]) => {
val transactions = pattern("transactions")
val accountId = transactions.head.accountId
val totalAmount = transactions.map(_.amount).sum
val firstTransaction = transactions.minBy(_.timestamp)
val duration = timeoutTimestamp - firstTransaction.timestamp
// Emit timeout alert
out.collect(TimeoutAlert(accountId, totalAmount, duration))
// Additional timeout processing if needed
if (totalAmount > 5000) {
out.collect(TimeoutAlert(s"$accountId-high-value", totalAmount, duration))
}
}
)(
// Success handler - can emit multiple suspicious activity reports
(pattern: Map[String, Iterable[TransactionEvent]], out: Collector[SuspiciousActivity]) => {
val transactions = pattern("transactions")
val accountId = transactions.head.accountId
val totalAmount = transactions.map(_.amount).sum
val locations = transactions.map(_.location).toSet
// Emit volume-based alert
if (totalAmount > 10000) {
out.collect(SuspiciousActivity(accountId, "high-volume", totalAmount, locations))
}
// Emit location-based alert
if (locations.size > 3) {
out.collect(SuspiciousActivity(accountId, "multiple-locations", totalAmount, locations))
}
// Emit frequency-based alert
if (transactions.size > 5) {
out.collect(SuspiciousActivity(accountId, "high-frequency", totalAmount, locations))
}
}
)
// Process results and timeouts
suspiciousResults.print("Suspicious activities: ")
suspiciousResults.getSideOutput(timeoutTag).print("Timeout alerts: ")Legacy timeout handling using Either types (deprecated in favor of side outputs).
class PatternStream[T] {
/**
* Select with timeout using Either return type (deprecated)
* @param patternTimeoutFunction Function for timeout processing
* @param patternSelectFunction Function for match processing
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream of Either[L, R] containing both results
*/
@deprecated("Use version with side outputs", "1.18.0")
def select[L: TypeInformation, R: TypeInformation](
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R]
): DataStream[Either[L, R]]
/**
* Flat select with timeout using Either return type (deprecated)
* @param patternFlatTimeoutFunction Function for timeout processing
* @param patternFlatSelectFunction Function for match processing
* @tparam L Type of timeout result
* @tparam R Type of success result
* @return DataStream of Either[L, R] containing both results
*/
@deprecated("Use version with side outputs", "1.18.0")
def flatSelect[L: TypeInformation, R: TypeInformation](
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
): DataStream[Either[L, R]]
}Usage Examples:
// Deprecated Either-based approach (not recommended)
val eitherResults = CEP.pattern(dataStream, pattern)
.select(timeoutFunction, selectFunction)
// Separate Left (timeout) and Right (success) results
val (timeouts, successes) = eitherResults.split(
either => either.isLeft,
either => either.isRight
)
val timeoutResults = timeouts.map(_.left.get)
val successResults = successes.map(_.right.get)Complex timeout handling with multiple patterns and cascading timeouts.
Usage Examples:
case class OrderEvent(orderId: String, stage: String, timestamp: Long, customerId: String)
case class OrderProgress(orderId: String, completedStages: List[String], timeSpent: Long)
case class OrderTimeout(orderId: String, lastStage: String, customerId: String, timeoutAt: Long)
// Multi-stage order processing pattern
val orderPattern = Pattern.begin[OrderEvent]("created")
.where(_.stage == "created")
.followedBy("paid")
.where(_.stage == "paid")
.followedBy("shipped")
.where(_.stage == "shipped")
.followedBy("delivered")
.where(_.stage == "delivered")
.within(Duration.ofDays(7)) // Orders should complete within a week
val orderTimeoutTag = OutputTag[OrderTimeout]("order-timeouts")
val orderResults = CEP.pattern(orderStream, orderPattern)
.select(orderTimeoutTag)(
// Timeout handler - determine which stage failed
(pattern: Map[String, Iterable[OrderEvent]], timeoutTimestamp: Long) => {
val allEvents = pattern.values.flatten.toList.sortBy(_.timestamp)
val lastEvent = allEvents.lastOption
lastEvent match {
case Some(event) =>
OrderTimeout(event.orderId, event.stage, event.customerId, timeoutTimestamp)
case None =>
// Should not happen, but handle gracefully
OrderTimeout("unknown", "none", "unknown", timeoutTimestamp)
}
}
)(
// Success handler
(pattern: Map[String, Iterable[OrderEvent]]) => {
val created = pattern("created").head
val delivered = pattern("delivered").head
val stages = List("created", "paid", "shipped", "delivered")
val timeSpent = delivered.timestamp - created.timestamp
OrderProgress(created.orderId, stages, timeSpent)
}
)
// Advanced timeout analysis
val timeoutAnalysis = orderResults.getSideOutput(orderTimeoutTag)
.map { timeout =>
val stage = timeout.lastStage match {
case "created" => "Payment failed"
case "paid" => "Shipping failed"
case "shipped" => "Delivery failed"
case _ => "Unknown failure"
}
s"Order ${timeout.orderId}: $stage (customer: ${timeout.customerId})"
}
orderResults.print("Completed orders: ")
timeoutAnalysis.print("Failed orders: ")// Timeout function interfaces from Java CEP
import org.apache.flink.cep.PatternTimeoutFunction
import org.apache.flink.cep.PatternFlatTimeoutFunction
// Side output support
import org.apache.flink.streaming.api.scala.OutputTag
// Collection types
import java.util.{List => JList, Map => JMap}
import scala.collection.Map
// Abstract timeout function interfaces
abstract class PatternTimeoutFunction[T, L] {
def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L
}
abstract class PatternFlatTimeoutFunction[T, L] {
def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long, out: Collector[L]): Unit
}
// Either type for deprecated methods
sealed abstract class Either[+A, +B]
case class Left[+A](value: A) extends Either[A, Nothing]
case class Right[+B](value: B) extends Either[Nothing, B]Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-cep-scala-2-12