Complex event pattern definition using the fluent Scala DSL with temporal constraints, conditions, and quantifiers.
Start a new pattern sequence with a named initial pattern.
/**
* Start a new pattern sequence with the given name
* @param name The name of starting pattern
* @tparam X Base type of events in the pattern
* @return The first pattern of a pattern sequence
*/
object Pattern {
def begin[X](name: String): Pattern[X, X]
def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
}Usage Examples:
import org.apache.flink.cep.scala.pattern.Pattern
case class LoginEvent(userId: String, timestamp: Long)
// Simple pattern start
val pattern = Pattern.begin[LoginEvent]("login")
.where(_.userId.nonEmpty)
// Pattern with skip strategy
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()
val pattern = Pattern.begin[LoginEvent]("login", skipStrategy)Add conditions that events must satisfy to be considered matches.
class Pattern[T, F <: T] {
/**
* Add AND condition using simple predicate function
* @param condition Predicate function for the event
* @return Pattern with the new condition
*/
def where(condition: F => Boolean): Pattern[T, F]
/**
* Add AND condition with access to context
* @param condition Function taking event and context
* @return Pattern with the new condition
*/
def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
/**
* Add AND condition using IterativeCondition
* @param condition The IterativeCondition to apply
* @return Pattern with the new condition
*/
def where(condition: IterativeCondition[F]): Pattern[T, F]
/**
* Add OR condition using simple predicate function
* @param condition Predicate function for the event
* @return Pattern with the new condition
*/
def or(condition: F => Boolean): Pattern[T, F]
/**
* Add OR condition with access to context
* @param condition Function taking event and context
* @return Pattern with the new condition
*/
def or(condition: (F, Context[F]) => Boolean): Pattern[T, F]
/**
* Add OR condition using IterativeCondition
* @param condition The IterativeCondition to apply
* @return Pattern with the new condition
*/
def or(condition: IterativeCondition[F]): Pattern[T, F]
}Usage Examples:
case class Event(eventType: String, value: Int, userId: String)
// Simple condition
val pattern = Pattern.begin[Event]("start")
.where(_.eventType == "login")
.or(_.eventType == "signup")
// Context-aware condition
val pattern = Pattern.begin[Event]("start")
.where(_.eventType == "purchase")
.next("confirmation")
.where((event, ctx) => {
val previousEvents = ctx.getEventsForPattern("start")
event.userId == previousEvents.head.userId
})Apply subtype constraints to patterns for type-safe event matching.
class Pattern[T, F <: T] {
/**
* Apply subtype constraint requiring events to be of specific subtype
* @param clazz Class of the required subtype
* @tparam S The subtype
* @return Pattern constrained to the subtype
*/
def subtype[S <: F](clazz: Class[S]): Pattern[T, S]
}Usage Examples:
abstract class Event(eventType: String)
case class LoginEvent(userId: String) extends Event("login")
case class PurchaseEvent(userId: String, amount: Double) extends Event("purchase")
val pattern = Pattern.begin[Event]("start")
.subtype(classOf[LoginEvent])
.next("purchase")
.subtype(classOf[PurchaseEvent])Chain patterns with different temporal contiguity requirements.
class Pattern[T, F <: T] {
/**
* Strict temporal contiguity - no events between matches
* @param name Name of the next pattern
* @return New pattern enforcing strict contiguity
*/
def next(name: String): Pattern[T, T]
/**
* Non-strict temporal contiguity - events may be interleaved
* @param name Name of the following pattern
* @return New pattern allowing interleaved events
*/
def followedBy(name: String): Pattern[T, T]
/**
* Non-deterministic following - matches any occurrence
* @param name Name of the following pattern
* @return New pattern with non-deterministic following
*/
def followedByAny(name: String): Pattern[T, T]
/**
* Negative pattern - no matching event should follow
* @param name Name of the pattern that should not occur
* @return New pattern with negative constraint
*/
def notNext(name: String): Pattern[T, T]
/**
* Negative following pattern - no matching event should occur between
* @param name Name of the pattern that should not occur
* @return New pattern with negative constraint
*/
def notFollowedBy(name: String): Pattern[T, T]
}Chain patterns with other Pattern objects to create GroupPatterns.
class Pattern[T, F <: T] {
/**
* Chain with another pattern using followedBy semantics
* @param pattern The pattern to follow this one
* @return GroupPattern for further chaining
*/
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Chain with another pattern using followedByAny semantics
* @param pattern The pattern to follow this one
* @return GroupPattern for further chaining
*/
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Chain with another pattern using next semantics
* @param pattern The pattern to follow this one strictly
* @return GroupPattern for further chaining
*/
def next(pattern: Pattern[T, F]): GroupPattern[T, F]
}
object Pattern {
/**
* Start pattern sequence with existing pattern
* @param pattern Initial pattern for the sequence
* @return GroupPattern for chaining
*/
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Start pattern sequence with existing pattern and skip strategy
* @param pattern Initial pattern for the sequence
* @param afterMatchSkipStrategy Skip strategy after matches
* @return GroupPattern for chaining
*/
def begin[T, F <: T](
pattern: Pattern[T, F],
afterMatchSkipStrategy: AfterMatchSkipStrategy
): GroupPattern[T, F]
}Usage Examples:
// Strict sequence - login immediately followed by purchase
val strictPattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
.next("purchase")
.where(_.eventType == "purchase")
// Relaxed sequence - login followed by purchase (other events allowed)
val relaxedPattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
.followedBy("purchase")
.where(_.eventType == "purchase")
// Negative pattern - login not followed by logout
val negativePattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
.notFollowedBy("logout")Define time constraints for pattern completion.
class Pattern[T, F <: T] {
/**
* Set maximum time for pattern completion
* @param windowTime Duration for pattern completion
* @return Pattern with time constraint
*/
def within(windowTime: Duration): Pattern[T, F]
/**
* Set maximum time for pattern completion (deprecated)
* @param windowTime Time window for pattern completion
* @return Pattern with time constraint
*/
@deprecated("Use within(Duration)", "1.19.0")
def within(windowTime: Time): Pattern[T, F]
}Usage Examples:
import java.time.Duration
// Pattern must complete within 5 minutes
val timedPattern = Pattern.begin[Event]("start")
.where(_.eventType == "start")
.followedBy("end")
.where(_.eventType == "end")
.within(Duration.ofMinutes(5))Apply quantifiers to specify repetition patterns.
class Pattern[T, F <: T] {
/**
* Make pattern optional (0 or 1 occurrence)
* @return Pattern marked as optional
*/
def optional: Pattern[T, F]
/**
* Pattern can occur one or more times
* @return Pattern with one-or-more quantifier
*/
def oneOrMore: Pattern[T, F]
/**
* Pattern occurs exactly N times
* @param times Exact number of occurrences
* @return Pattern with exact count quantifier
*/
def times(times: Int): Pattern[T, F]
/**
* Pattern occurs between from and to times
* @param from Minimum occurrences
* @param to Maximum occurrences
* @return Pattern with range quantifier
*/
def times(from: Int, to: Int): Pattern[T, F]
/**
* Pattern occurs at least N times
* @param times Minimum number of occurrences
* @return Pattern with at-least quantifier
*/
def timesOrMore(times: Int): Pattern[T, F]
/**
* Use greedy matching (match as many as possible)
* @return Pattern with greedy matching
*/
def greedy: Pattern[T, F]
/**
* Allow combinations in quantified patterns
* @return Pattern allowing combinations
*/
def allowCombinations(): Pattern[T, F]
/**
* Require consecutive matching for quantified patterns
* @return Pattern requiring consecutive matches
*/
def consecutive(): Pattern[T, F]
}Usage Examples:
// Optional pattern
val optionalPattern = Pattern.begin[Event]("optional")
.where(_.eventType == "init")
.optional
.followedBy("required")
.where(_.eventType == "process")
// Repeated pattern
val repeatedPattern = Pattern.begin[Event]("repeated")
.where(_.eventType == "click")
.oneOrMore
.consecutive()
.followedBy("submit")
.where(_.eventType == "submit")
// Exact count
val exactPattern = Pattern.begin[Event]("exactly")
.where(_.eventType == "attempt")
.times(3)
.followedBy("success")
.where(_.eventType == "success")Apply stop conditions for looping patterns.
class Pattern[T, F <: T] {
/**
* Stop condition with simple predicate
* @param untilCondition Condition to stop pattern matching
* @return Pattern with until condition
*/
def until(untilCondition: F => Boolean): Pattern[T, F]
/**
* Stop condition with context access
* @param untilCondition Condition function with context
* @return Pattern with until condition
*/
def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F]
/**
* Stop condition with IterativeCondition
* @param untilCondition The IterativeCondition for stopping
* @return Pattern with until condition
*/
def until(untilCondition: IterativeCondition[F]): Pattern[T, F]
}Usage Examples:
// Loop until condition is met
val loopingPattern = Pattern.begin[Event]("loop")
.where(_.eventType == "process")
.oneOrMore
.until(_.eventType == "complete")
.followedBy("final")
.where(_.eventType == "finish")Access pattern properties and configuration.
class Pattern[T, F <: T] {
/**
* Get the previous pattern in the chain
* @return Optional previous pattern
*/
def getPrevious: Option[Pattern[T, _ <: T]]
/**
* Get the name of this pattern
* @return Pattern name
*/
def getName: String
/**
* Get the time window for pattern completion (deprecated)
* @return Optional time window
*/
@deprecated("Use getWindowSize", "1.19.0")
def getWindowTime: Option[Time]
/**
* Get the time window duration for pattern completion
* @return Optional duration window
*/
def getWindowSize: Option[Duration]
/**
* Get the quantifier applied to this pattern
* @return Pattern quantifier
*/
def getQuantifier: Quantifier
/**
* Get the condition applied to this pattern
* @return Optional iterative condition
*/
def getCondition: Option[IterativeCondition[F]]
/**
* Get the until condition for looping patterns
* @return Optional until condition
*/
def getUntilCondition: Option[IterativeCondition[F]]
/**
* Get the after-match skip strategy
* @return After match skip strategy
*/
def getAfterMatchSkipStrategy: AfterMatchSkipStrategy
}GroupPattern extends Pattern but restricts certain operations.
class GroupPattern[T, F <: T] extends Pattern[T, F] {
// Inherits all Pattern methods except:
// - where() methods throw UnsupportedOperationException
// - or() methods throw UnsupportedOperationException
// - subtype() throws UnsupportedOperationException
}
object GroupPattern {
/**
* Wrap Java GroupPattern
* @param jGroupPattern Java GroupPattern to wrap
* @return Scala GroupPattern wrapper
*/
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}Usage Examples:
// GroupPattern creation from Pattern chaining
val individualPattern = Pattern.begin[Event]("first").where(_.eventType == "start")
val anotherPattern = Pattern.begin[Event]("second").where(_.eventType == "end")
// Chain patterns to create GroupPattern
val groupPattern = individualPattern.followedBy(anotherPattern)
// Start with existing pattern
val groupFromPattern = Pattern.begin(individualPattern)
.followedBy("additional")
.where(_.eventType == "middle")
// Note: GroupPattern cannot use where(), or(), or subtype()
// This would throw UnsupportedOperationException:
// groupPattern.where(_.eventType == "invalid") // ERROR!// Context for condition evaluation
trait Context[T] {
def getEventsForPattern(name: String): Iterable[T]
}
// Java interop types
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
import org.apache.flink.cep.pattern.Quantifier
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.Duration