Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns into composite structures.
GroupPattern extends Pattern but restricts certain operations to maintain consistency in complex pattern sequences.
/**
* GroupPattern represents a composite pattern created by chaining Pattern objects
* @param jGroupPattern Underlying Java GroupPattern
* @tparam T Base type of events
* @tparam F Subtype constraint
*/
class GroupPattern[T, F <: T] extends Pattern[T, F] {
// Inherits all Pattern methods except where(), or(), and subtype()
// These methods throw UnsupportedOperationException
}
object GroupPattern {
/**
* Wrap Java GroupPattern for Scala API usage
* @param jGroupPattern Java GroupPattern to wrap
* @return Scala GroupPattern wrapper
*/
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}GroupPatterns are created by chaining Pattern objects together or starting with existing patterns.
class Pattern[T, F <: T] {
/**
* Chain with another pattern using followedBy semantics
* @param pattern The pattern to follow this one
* @return GroupPattern for further composition
*/
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Chain with another pattern using followedByAny semantics
* @param pattern The pattern to follow this one
* @return GroupPattern for further composition
*/
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Chain with another pattern using strict next semantics
* @param pattern The pattern to follow this one immediately
* @return GroupPattern for further composition
*/
def next(pattern: Pattern[T, F]): GroupPattern[T, F]
}
object Pattern {
/**
* Start a new pattern sequence with an existing pattern
* @param pattern Initial pattern for the sequence
* @return GroupPattern for building complex sequences
*/
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
/**
* Start a new pattern sequence with existing pattern and skip strategy
* @param pattern Initial pattern for the sequence
* @param afterMatchSkipStrategy Strategy for handling overlapping matches
* @return GroupPattern for building complex sequences
*/
def begin[T, F <: T](
pattern: Pattern[T, F],
afterMatchSkipStrategy: AfterMatchSkipStrategy
): GroupPattern[T, F]
}Usage Examples:
case class Event(eventType: String, userId: String, value: Int, timestamp: Long)
// Create individual patterns
val loginPattern = Pattern.begin[Event]("login")
.where(_.eventType == "login")
val activityPattern = Pattern.begin[Event]("activity")
.where(_.eventType == "click")
.oneOrMore
val logoutPattern = Pattern.begin[Event]("logout")
.where(_.eventType == "logout")
// Combine patterns into GroupPattern using chaining
val sessionPattern = loginPattern
.followedBy(activityPattern)
.followedBy(logoutPattern)
// Start with an existing pattern to create GroupPattern
val complexSessionPattern = Pattern.begin(loginPattern)
.followedBy("purchase")
.where(_.eventType == "purchase")
.followedBy(logoutPattern)GroupPatterns inherit all Pattern methods but restrict condition-setting operations.
class GroupPattern[T, F <: T] extends Pattern[T, F] {
// These methods throw UnsupportedOperationException:
override def where(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED
override def or(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED
override def subtype[S <: F](clazz: Class[S]): Pattern[T, S] // NOT SUPPORTED
// All other Pattern methods are inherited and functional:
// - Quantifiers: optional, oneOrMore, times(), etc.
// - Temporal: within(), until()
// - Chaining: next(), followedBy(), etc.
// - Properties: getName, getQuantifier, etc.
}Usage Examples:
val individualPattern = Pattern.begin[Event]("first")
.where(_.eventType == "start")
val groupPattern = individualPattern.followedBy(
Pattern.begin[Event]("second").where(_.eventType == "end")
)
// Valid GroupPattern operations
val timedGroupPattern = groupPattern
.within(Duration.ofMinutes(10))
.optional
// Invalid operations - these throw UnsupportedOperationException:
// groupPattern.where(_.value > 10) // ERROR!
// groupPattern.or(_.eventType == "other") // ERROR!
// groupPattern.subtype(classOf[SpecialEvent]) // ERROR!
// Workaround: Apply conditions to individual patterns before grouping
val conditionedPattern = Pattern.begin[Event]("filtered")
.where(_.value > 10)
.where(_.eventType == "special")
val validGroupPattern = individualPattern.followedBy(conditionedPattern)Complex pattern sequences with multiple levels of composition.
Usage Examples:
case class UserEvent(userId: String, action: String, sessionId: String, timestamp: Long)
// Multi-level pattern composition
val userLoginSequence = Pattern.begin[UserEvent]("login")
.where(_.action == "login")
.followedBy("initial_activity")
.where(_.action == "page_view")
val userEngagementSequence = Pattern.begin[UserEvent]("engagement")
.where(_.action == "click")
.oneOrMore
.followedBy("conversion")
.where(_.action == "purchase")
// Combine sequences into comprehensive user journey pattern
val userJourneyPattern = Pattern.begin(userLoginSequence)
.followedBy(userEngagementSequence)
.within(Duration.ofHours(2))
// Use with CEP
import org.apache.flink.cep.scala.CEP
val userJourneyStream = CEP.pattern(userEventStream, userJourneyPattern)
.select { pattern =>
val login = pattern("login").head
val engagementEvents = pattern("engagement")
val conversion = pattern("conversion").head
s"User ${login.userId} converted after ${engagementEvents.size} engagement events"
}Configure how overlapping patterns are handled in GroupPattern sequences.
Usage Examples:
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
case class MarketEvent(symbol: String, price: Double, volume: Long, timestamp: Long)
val priceDropPattern = Pattern.begin[MarketEvent]("drop")
.where(_.price < 100)
val recoveryPattern = Pattern.begin[MarketEvent]("recovery")
.where(_.price > 100)
// Different skip strategies for overlapping matches
val skipPastLastEvent = AfterMatchSkipStrategy.skipPastLastEvent()
val skipToNext = AfterMatchSkipStrategy.skipToNext()
// GroupPattern with skip past last event strategy
val marketCyclePattern = Pattern.begin(priceDropPattern, skipPastLastEvent)
.followedBy(recoveryPattern)
.within(Duration.ofDays(7))
// Alternative: skip to next match for more overlapping detection
val overlappingCyclesPattern = Pattern.begin(priceDropPattern, skipToNext)
.followedBy(recoveryPattern)
.within(Duration.ofDays(7))// GroupPattern class and companion
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
class GroupPattern[T, F <: T] extends Pattern[T, F]
object GroupPattern {
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}
// Skip strategies for pattern matching
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
// Duration for time windows
import java.time.Duration