Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications with pattern matching and event stream processing.
Complex event pattern definition using the fluent Scala DSL with temporal constraints, conditions, and quantifiers.
Start a new pattern sequence with a named initial pattern.
/**
* Start a new pattern sequence with the given name
* @param name The name of starting pattern
* @tparam X Base type of events in the pattern
* @return The first pattern of a pattern sequence
*/
object Pattern {
def begin[X](name: String): Pattern[X, X]
def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
}Usage Examples:
import org.apache.flink.cep.scala.pattern.Pattern
case class LoginEvent(userId: String, timestamp: Long)
// Simple pattern start
val pattern = Pattern.begin[LoginEvent]("login")
.where(_.userId.nonEmpty)
// Pattern with skip strategy
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()
val pattern = Pattern.begin[LoginEvent]("login", skipStrategy)Add conditions that events must satisfy to be considered matches.
class Pattern[T, F <: T] {
/**
* Add AND condition using simple predicate function
* @param condition Predicate function for the event
* @return Pattern with the new condition
*/
def where(condition: F => Boolean): Pattern[T, F]
/**
* Add AND condition with access to context
* @param condition Function taking event and context
* @return Pattern with the new condition
*/
def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
/**
* Add AND condition using IterativeCondition
* @param condition The IterativeCondition to apply
* @return Pattern with the new condition
*/
def where(condition: IterativeCondition[F]): Pattern[T, F]
/**
* Add OR condition using simple predicate function
* @param condition Predicate function for the event
* @return Pattern with the new condition
*/
def or(condition: F => Boolean): Pattern[T, F]
/**
* Add OR condition with access to context
* @param condition Function taking event and context
* @return Pattern with the new condition
*/
def or(condition: (F, Context[F]) => Boolean): Pattern[T, F]
/**
* Add OR condition using IterativeCondition
* @param condition The IterativeCondition to apply
* @return Pattern with the new condition
*/
def or(condition: IterativeCondition[F]): Pattern[T, F]
}Usage Examples:
case class Event(eventType: String, value: Int, userId: String)
// Simple condition
val pattern = Pattern.begin[Event]("start")
.where(_.eventType == "login")
.or(_.eventType == "signup")
// Context-aware condition
val pattern = Pattern.begin[Event]("start")
.where(_.eventType == "purchase")
.next("confirmation")
.where((event, ctx) => {
val previousEvents = ctx.getEventsForPattern("start")
event.userId == previousEvents.head.userId
})Apply subtype constraints to patterns for type-safe event matching.
class Pattern[T, F <: T] {
/**
* Apply subtype constraint requiring events to be of specific subtype
* @param clazz Class of the required subtype
* @tparam S The subtype
* @return Pattern constrained to the subtype
*/
def subtype[S <: F](clazz: Class[S]): Pattern[T, S]
}Usage Examples:
abstract class Event(eventType: String)
case class LoginEvent(userId: String) extends Event("login")
case class PurchaseEvent(userId: String, amount: Double) extends Event("purchase")
val pattern = Pattern.begin[Event]("start")
.subtype(classOf[LoginEvent])
.next("purchase")
.subtype(classOf[PurchaseEvent])Chain patterns with different temporal contiguity requirements.
class Pattern[T, F <: T] {
/**
* Strict temporal contiguity - no events between matches
* @param name Name of the next pattern
* @return New pattern enforcing strict contiguity
*/
def next(name: String): Pattern[T, T]
/**
* Non-strict temporal contiguity - events may be interleaved
* @param name Name of the following pattern
* @return New pattern allowing interleaved events
*/
def followedBy(name: String): Pattern[T, T]
/**
* Non-deterministic following - matches any occurrence
* @param name Name of the following pattern
* @return New pattern with non-deterministic following
*/
def followedByAny(name: String): Pattern[T, T]
/**
* Negative pattern - no matching event should follow
* @param name Name of the pattern that should not occur
* @return New pattern with negative constraint
*/
def notNext(name: String): Pattern[T, T]
/**
* Negative following pattern - no matching event should occur between
* @param name Name of the pattern that should not occur
* @return New pattern with negative constraint
*/
def notFollowedBy(name: String): Pattern[T, T]
}Chain patterns with other Pattern objects to create GroupPatterns.
class Pattern[T, F <: T] {
/**
* Chain with another pattern using followedBy semantics
* @param pattern The pattern to follow this one
* @return GroupPattern for further chaining
*/
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Chain with another pattern using followedByAny semantics
* @param pattern The pattern to follow this one
* @return GroupPattern for further chaining
*/
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Chain with another pattern using next semantics
* @param pattern The pattern to follow this one strictly
* @return GroupPattern for further chaining
*/
def next(pattern: Pattern[T, F]): GroupPattern[T, F]
}
object Pattern {
/**
* Start pattern sequence with existing pattern
* @param pattern Initial pattern for the sequence
* @return GroupPattern for chaining
*/
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Start pattern sequence with existing pattern and skip strategy
* @param pattern Initial pattern for the sequence
* @param afterMatchSkipStrategy Skip strategy after matches
* @return GroupPattern for chaining
*/
def begin[T, F <: T](
pattern: Pattern[T, F],
afterMatchSkipStrategy: AfterMatchSkipStrategy
): GroupPattern[T, F]
}Usage Examples:
// Strict sequence - login immediately followed by purchase
val strictPattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
.next("purchase")
.where(_.eventType == "purchase")
// Relaxed sequence - login followed by purchase (other events allowed)
val relaxedPattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
.followedBy("purchase")
.where(_.eventType == "purchase")
// Negative pattern - login not followed by logout
val negativePattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
.notFollowedBy("logout")Define time constraints for pattern completion.
class Pattern[T, F <: T] {
/**
* Set maximum time for pattern completion
* @param windowTime Duration for pattern completion
* @return Pattern with time constraint
*/
def within(windowTime: Duration): Pattern[T, F]
/**
* Set maximum time for pattern completion (deprecated)
* @param windowTime Time window for pattern completion
* @return Pattern with time constraint
*/
@deprecated("Use within(Duration)", "1.19.0")
def within(windowTime: Time): Pattern[T, F]
}Usage Examples:
import java.time.Duration
// Pattern must complete within 5 minutes
val timedPattern = Pattern.begin[Event]("start")
.where(_.eventType == "start")
.followedBy("end")
.where(_.eventType == "end")
.within(Duration.ofMinutes(5))Apply quantifiers to specify repetition patterns.
class Pattern[T, F <: T] {
/**
* Make pattern optional (0 or 1 occurrence)
* @return Pattern marked as optional
*/
def optional: Pattern[T, F]
/**
* Pattern can occur one or more times
* @return Pattern with one-or-more quantifier
*/
def oneOrMore: Pattern[T, F]
/**
* Pattern occurs exactly N times
* @param times Exact number of occurrences
* @return Pattern with exact count quantifier
*/
def times(times: Int): Pattern[T, F]
/**
* Pattern occurs between from and to times
* @param from Minimum occurrences
* @param to Maximum occurrences
* @return Pattern with range quantifier
*/
def times(from: Int, to: Int): Pattern[T, F]
/**
* Pattern occurs at least N times
* @param times Minimum number of occurrences
* @return Pattern with at-least quantifier
*/
def timesOrMore(times: Int): Pattern[T, F]
/**
* Use greedy matching (match as many as possible)
* @return Pattern with greedy matching
*/
def greedy: Pattern[T, F]
/**
* Allow combinations in quantified patterns
* @return Pattern allowing combinations
*/
def allowCombinations(): Pattern[T, F]
/**
* Require consecutive matching for quantified patterns
* @return Pattern requiring consecutive matches
*/
def consecutive(): Pattern[T, F]
}Usage Examples:
// Optional pattern
val optionalPattern = Pattern.begin[Event]("optional")
.where(_.eventType == "init")
.optional
.followedBy("required")
.where(_.eventType == "process")
// Repeated pattern
val repeatedPattern = Pattern.begin[Event]("repeated")
.where(_.eventType == "click")
.oneOrMore
.consecutive()
.followedBy("submit")
.where(_.eventType == "submit")
// Exact count
val exactPattern = Pattern.begin[Event]("exactly")
.where(_.eventType == "attempt")
.times(3)
.followedBy("success")
.where(_.eventType == "success")Apply stop conditions for looping patterns.
class Pattern[T, F <: T] {
/**
* Stop condition with simple predicate
* @param untilCondition Condition to stop pattern matching
* @return Pattern with until condition
*/
def until(untilCondition: F => Boolean): Pattern[T, F]
/**
* Stop condition with context access
* @param untilCondition Condition function with context
* @return Pattern with until condition
*/
def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F]
/**
* Stop condition with IterativeCondition
* @param untilCondition The IterativeCondition for stopping
* @return Pattern with until condition
*/
def until(untilCondition: IterativeCondition[F]): Pattern[T, F]
}Usage Examples:
// Loop until condition is met
val loopingPattern = Pattern.begin[Event]("loop")
.where(_.eventType == "process")
.oneOrMore
.until(_.eventType == "complete")
.followedBy("final")
.where(_.eventType == "finish")Access pattern properties and configuration.
class Pattern[T, F <: T] {
/**
* Get the previous pattern in the chain
* @return Optional previous pattern
*/
def getPrevious: Option[Pattern[T, _ <: T]]
/**
* Get the name of this pattern
* @return Pattern name
*/
def getName: String
/**
* Get the time window for pattern completion (deprecated)
* @return Optional time window
*/
@deprecated("Use getWindowSize", "1.19.0")
def getWindowTime: Option[Time]
/**
* Get the time window duration for pattern completion
* @return Optional duration window
*/
def getWindowSize: Option[Duration]
/**
* Get the quantifier applied to this pattern
* @return Pattern quantifier
*/
def getQuantifier: Quantifier
/**
* Get the condition applied to this pattern
* @return Optional iterative condition
*/
def getCondition: Option[IterativeCondition[F]]
/**
* Get the until condition for looping patterns
* @return Optional until condition
*/
def getUntilCondition: Option[IterativeCondition[F]]
/**
* Get the after-match skip strategy
* @return After match skip strategy
*/
def getAfterMatchSkipStrategy: AfterMatchSkipStrategy
}GroupPattern extends Pattern but restricts certain operations.
class GroupPattern[T, F <: T] extends Pattern[T, F] {
// Inherits all Pattern methods except:
// - where() methods throw UnsupportedOperationException
// - or() methods throw UnsupportedOperationException
// - subtype() throws UnsupportedOperationException
}
object GroupPattern {
/**
* Wrap Java GroupPattern
* @param jGroupPattern Java GroupPattern to wrap
* @return Scala GroupPattern wrapper
*/
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}Usage Examples:
// GroupPattern creation from Pattern chaining
val individualPattern = Pattern.begin[Event]("first").where(_.eventType == "start")
val anotherPattern = Pattern.begin[Event]("second").where(_.eventType == "end")
// Chain patterns to create GroupPattern
val groupPattern = individualPattern.followedBy(anotherPattern)
// Start with existing pattern
val groupFromPattern = Pattern.begin(individualPattern)
.followedBy("additional")
.where(_.eventType == "middle")
// Note: GroupPattern cannot use where(), or(), or subtype()
// This would throw UnsupportedOperationException:
// groupPattern.where(_.eventType == "invalid") // ERROR!// Context for condition evaluation
trait Context[T] {
def getEventsForPattern(name: String): Iterable[T]
}
// Java interop types
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
import org.apache.flink.cep.pattern.Quantifier
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.DurationInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-cep-scala-2-12