Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations for transforming matched patterns into result streams.
Transform matched patterns into single output elements using select functions.
class PatternStream[T] {
/**
* Process patterns with Scala function returning single result per match
* @param patternSelectFun Function transforming pattern map to result
* @tparam R Type of result elements
* @return DataStream containing results
*/
def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
/**
* Process patterns with PatternSelectFunction interface
* @param patternSelectFunction Java interface for pattern selection
* @tparam R Type of result elements
* @return DataStream containing results
*/
def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R]
}Usage Examples:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.PatternSelectFunction
import java.util.{List => JList, Map => JMap}
case class Event(userId: String, action: String, timestamp: Long)
case class UserSession(userId: String, startAction: String, endAction: String, duration: Long)
// Scala function approach
val sessionPattern = Pattern.begin[Event]("start")
.where(_.action == "login")
.followedBy("end")
.where(_.action == "logout")
val sessions = CEP.pattern(dataStream, sessionPattern)
.select { pattern: Map[String, Iterable[Event]] =>
val startEvent = pattern("start").head
val endEvent = pattern("end").head
UserSession(
startEvent.userId,
startEvent.action,
endEvent.action,
endEvent.timestamp - startEvent.timestamp
)
}
// Java interface approach
val javaSelectFunction = new PatternSelectFunction[Event, String] {
override def select(pattern: JMap[String, JList[Event]]): String = {
val start = pattern.get("start").get(0)
val end = pattern.get("end").get(0)
s"Session: ${start.userId} from ${start.timestamp} to ${end.timestamp}"
}
}
val sessionsJava = CEP.pattern(dataStream, sessionPattern)
.select(javaSelectFunction)Transform matched patterns into zero or more output elements using flatSelect functions.
class PatternStream[T] {
/**
* Process patterns with Scala function that can emit multiple results
* @param patternFlatSelectFun Function taking pattern and collector
* @tparam R Type of result elements
* @return DataStream containing results
*/
def flatSelect[R: TypeInformation](
patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit
): DataStream[R]
/**
* Process patterns with PatternFlatSelectFunction interface
* @param patternFlatSelectFunction Java interface for flat pattern selection
* @tparam R Type of result elements
* @return DataStream containing results
*/
def flatSelect[R: TypeInformation](
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
): DataStream[R]
}Usage Examples:
import org.apache.flink.util.Collector
import org.apache.flink.cep.PatternFlatSelectFunction
case class Purchase(userId: String, item: String, amount: Double, timestamp: Long)
case class Alert(userId: String, message: String, severity: String)
// Pattern for multiple purchases
val purchasePattern = Pattern.begin[Purchase]("purchases")
.where(_.amount > 0)
.oneOrMore
.within(Duration.ofHours(1))
// Scala flat select - generate multiple alerts per pattern
val alerts = CEP.pattern(purchaseStream, purchasePattern)
.flatSelect { (pattern: Map[String, Iterable[Purchase]], out: Collector[Alert]) =>
val purchases = pattern("purchases")
val totalAmount = purchases.map(_.amount).sum
val userId = purchases.head.userId
// Emit volume alert
if (totalAmount > 1000) {
out.collect(Alert(userId, s"High volume: $$${totalAmount}", "HIGH"))
}
// Emit frequency alert
if (purchases.size > 10) {
out.collect(Alert(userId, s"High frequency: ${purchases.size} purchases", "MEDIUM"))
}
// Emit item-specific alerts
purchases.groupBy(_.item).foreach { case (item, itemPurchases) =>
if (itemPurchases.size > 3) {
out.collect(Alert(userId, s"Repeated item: $item (${itemPurchases.size}x)", "LOW"))
}
}
}
// Java interface approach
val javaFlatSelectFunction = new PatternFlatSelectFunction[Purchase, String] {
override def flatSelect(pattern: JMap[String, JList[Purchase]], out: Collector[String]): Unit = {
val purchases = pattern.get("purchases")
purchases.forEach { purchase =>
out.collect(s"Purchase: ${purchase.userId} bought ${purchase.item}")
}
}
}Use PatternProcessFunction for advanced pattern processing with rich context access.
class PatternStream[T] {
/**
* Process patterns using PatternProcessFunction with full context
* @param patternProcessFunction Process function with rich context
* @tparam R Type of result elements
* @return DataStream containing results
*/
def process[R: TypeInformation](
patternProcessFunction: PatternProcessFunction[T, R]
): DataStream[R]
}Usage Examples:
import org.apache.flink.cep.functions.PatternProcessFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
case class Transaction(accountId: String, amount: Double, location: String, timestamp: Long)
case class FraudAlert(accountId: String, reason: String, transactions: List[Transaction], riskScore: Double)
class FraudDetectionProcessFunction extends PatternProcessFunction[Transaction, FraudAlert] {
override def processMatch(
`match`: JMap[String, JList[Transaction]],
ctx: PatternProcessFunction.Context,
out: Collector[FraudAlert]
): Unit = {
val suspiciousTransactions = `match`.get("suspicious")
import scala.jdk.CollectionConverters._
val transactions = suspiciousTransactions.asScala.toList
val accountId = transactions.head.accountId
val totalAmount = transactions.map(_.amount).sum
val locations = transactions.map(_.location).toSet
// Calculate risk score based on various factors
var riskScore = 0.0
var reasons = List.empty[String]
// High transaction volume
if (totalAmount > 10000) {
riskScore += 0.4
reasons = "high-volume" :: reasons
}
// Multiple locations
if (locations.size > 2) {
riskScore += 0.3
reasons = "multiple-locations" :: reasons
}
// Rapid succession
val timeSpan = transactions.maxBy(_.timestamp).timestamp - transactions.minBy(_.timestamp).timestamp
if (timeSpan < 300000) { // 5 minutes
riskScore += 0.3
reasons = "rapid-succession" :: reasons
}
if (riskScore > 0.5) {
out.collect(FraudAlert(
accountId,
reasons.mkString(", "),
transactions,
riskScore
))
}
}
}
// Usage with process function
val fraudPattern = Pattern.begin[Transaction]("suspicious")
.where(_.amount > 500)
.oneOrMore
.within(Duration.ofMinutes(10))
val fraudAlerts = CEP.pattern(transactionStream, fraudPattern)
.process(new FraudDetectionProcessFunction())Understanding how to access matched events from pattern maps.
Usage Examples:
// Pattern with multiple named elements
val complexPattern = Pattern.begin[Event]("first")
.where(_.action == "start")
.next("second")
.where(_.action == "process")
.followedBy("third")
.where(_.action == "end")
val results = CEP.pattern(dataStream, complexPattern)
.select { pattern: Map[String, Iterable[Event]] =>
// Access events by pattern name
val firstEvents = pattern("first") // Iterable[Event]
val secondEvents = pattern("second") // Iterable[Event]
val thirdEvents = pattern("third") // Iterable[Event]
// For non-quantified patterns, typically one event
val firstEvent = firstEvents.head
val secondEvent = secondEvents.head
val thirdEvent = thirdEvents.head
s"Sequence: ${firstEvent.action} -> ${secondEvent.action} -> ${thirdEvent.action}"
}
// Pattern with quantified elements
val quantifiedPattern = Pattern.begin[Event]("start")
.where(_.action == "begin")
.followedBy("repeated")
.where(_.action == "process")
.oneOrMore // This can match multiple events
.followedBy("end")
.where(_.action == "finish")
val quantifiedResults = CEP.pattern(dataStream, quantifiedPattern)
.select { pattern: Map[String, Iterable[Event]] =>
val startEvent = pattern("start").head
val repeatedEvents = pattern("repeated") // Multiple events possible
val endEvent = pattern("end").head
s"Start: ${startEvent.action}, Repeated: ${repeatedEvents.size} times, End: ${endEvent.action}"
}// Core collector interface
import org.apache.flink.util.Collector
// Pattern processing interfaces from Java CEP
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.PatternFlatSelectFunction
import org.apache.flink.cep.functions.PatternProcessFunction
// Type information for Scala
import org.apache.flink.api.common.typeinfo.TypeInformation
// Collection conversions
import scala.collection.Map
import java.util.{List => JList, Map => JMap}
// Abstract interfaces
abstract class PatternSelectFunction[T, R] {
def select(pattern: JMap[String, JList[T]]): R
}
abstract class PatternFlatSelectFunction[T, R] {
def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit
}
abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
def processMatch(
`match`: JMap[String, JList[T]],
ctx: PatternProcessFunction.Context,
out: Collector[R]
): Unit
}