or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md
tile.json

group-pattern-management.mddocs/

Group Pattern Management

Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns into composite structures.

Capabilities

GroupPattern Overview

GroupPattern extends Pattern but restricts certain operations to maintain consistency in complex pattern sequences.

/**
 * GroupPattern represents a composite pattern created by chaining Pattern objects
 * @param jGroupPattern Underlying Java GroupPattern
 * @tparam T Base type of events
 * @tparam F Subtype constraint
 */
class GroupPattern[T, F <: T] extends Pattern[T, F] {
  // Inherits all Pattern methods except where(), or(), and subtype()
  // These methods throw UnsupportedOperationException
}

object GroupPattern {
  /**
   * Wrap Java GroupPattern for Scala API usage
   * @param jGroupPattern Java GroupPattern to wrap
   * @return Scala GroupPattern wrapper
   */
  def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}

Creating GroupPatterns

GroupPatterns are created by chaining Pattern objects together or starting with existing patterns.

class Pattern[T, F <: T] {
  /**
   * Chain with another pattern using followedBy semantics
   * @param pattern The pattern to follow this one
   * @return GroupPattern for further composition
   */
  def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
  
  /**
   * Chain with another pattern using followedByAny semantics  
   * @param pattern The pattern to follow this one
   * @return GroupPattern for further composition
   */
  def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
  
  /**
   * Chain with another pattern using strict next semantics
   * @param pattern The pattern to follow this one immediately
   * @return GroupPattern for further composition
   */
  def next(pattern: Pattern[T, F]): GroupPattern[T, F]
}

object Pattern {
  /**
   * Start a new pattern sequence with an existing pattern
   * @param pattern Initial pattern for the sequence
   * @return GroupPattern for building complex sequences
   */
  def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
  
  /**
   * Start a new pattern sequence with existing pattern and skip strategy
   * @param pattern Initial pattern for the sequence
   * @param afterMatchSkipStrategy Strategy for handling overlapping matches
   * @return GroupPattern for building complex sequences
   */
  def begin[T, F <: T](
    pattern: Pattern[T, F],
    afterMatchSkipStrategy: AfterMatchSkipStrategy
  ): GroupPattern[T, F]
}

Usage Examples:

case class Event(eventType: String, userId: String, value: Int, timestamp: Long)

// Create individual patterns
val loginPattern = Pattern.begin[Event]("login")
  .where(_.eventType == "login")

val activityPattern = Pattern.begin[Event]("activity")  
  .where(_.eventType == "click")
  .oneOrMore

val logoutPattern = Pattern.begin[Event]("logout")
  .where(_.eventType == "logout")

// Combine patterns into GroupPattern using chaining
val sessionPattern = loginPattern
  .followedBy(activityPattern)
  .followedBy(logoutPattern)

// Start with an existing pattern to create GroupPattern
val complexSessionPattern = Pattern.begin(loginPattern)
  .followedBy("purchase") 
  .where(_.eventType == "purchase")
  .followedBy(logoutPattern)

GroupPattern Restrictions

GroupPatterns inherit all Pattern methods but restrict condition-setting operations.

class GroupPattern[T, F <: T] extends Pattern[T, F] {
  // These methods throw UnsupportedOperationException:
  override def where(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED
  override def or(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED
  override def subtype[S <: F](clazz: Class[S]): Pattern[T, S] // NOT SUPPORTED
  
  // All other Pattern methods are inherited and functional:
  // - Quantifiers: optional, oneOrMore, times(), etc.
  // - Temporal: within(), until()
  // - Chaining: next(), followedBy(), etc.
  // - Properties: getName, getQuantifier, etc.
}

Usage Examples:

val individualPattern = Pattern.begin[Event]("first")
  .where(_.eventType == "start")

val groupPattern = individualPattern.followedBy(
  Pattern.begin[Event]("second").where(_.eventType == "end")
)

// Valid GroupPattern operations
val timedGroupPattern = groupPattern
  .within(Duration.ofMinutes(10))
  .optional

// Invalid operations - these throw UnsupportedOperationException:
// groupPattern.where(_.value > 10) // ERROR!
// groupPattern.or(_.eventType == "other") // ERROR!
// groupPattern.subtype(classOf[SpecialEvent]) // ERROR!

// Workaround: Apply conditions to individual patterns before grouping
val conditionedPattern = Pattern.begin[Event]("filtered")
  .where(_.value > 10)
  .where(_.eventType == "special")

val validGroupPattern = individualPattern.followedBy(conditionedPattern)

Advanced GroupPattern Composition

Complex pattern sequences with multiple levels of composition.

Usage Examples:

case class UserEvent(userId: String, action: String, sessionId: String, timestamp: Long)

// Multi-level pattern composition
val userLoginSequence = Pattern.begin[UserEvent]("login")
  .where(_.action == "login")
  .followedBy("initial_activity")
  .where(_.action == "page_view")

val userEngagementSequence = Pattern.begin[UserEvent]("engagement")
  .where(_.action == "click")
  .oneOrMore
  .followedBy("conversion")
  .where(_.action == "purchase")

// Combine sequences into comprehensive user journey pattern
val userJourneyPattern = Pattern.begin(userLoginSequence)
  .followedBy(userEngagementSequence)
  .within(Duration.ofHours(2))

// Use with CEP
import org.apache.flink.cep.scala.CEP

val userJourneyStream = CEP.pattern(userEventStream, userJourneyPattern)
  .select { pattern =>
    val login = pattern("login").head
    val engagementEvents = pattern("engagement")
    val conversion = pattern("conversion").head
    
    s"User ${login.userId} converted after ${engagementEvents.size} engagement events"
  }

GroupPattern with Skip Strategies

Configure how overlapping patterns are handled in GroupPattern sequences.

Usage Examples:

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

case class MarketEvent(symbol: String, price: Double, volume: Long, timestamp: Long)

val priceDropPattern = Pattern.begin[MarketEvent]("drop")
  .where(_.price < 100)

val recoveryPattern = Pattern.begin[MarketEvent]("recovery")
  .where(_.price > 100)

// Different skip strategies for overlapping matches
val skipPastLastEvent = AfterMatchSkipStrategy.skipPastLastEvent()
val skipToNext = AfterMatchSkipStrategy.skipToNext()

// GroupPattern with skip past last event strategy
val marketCyclePattern = Pattern.begin(priceDropPattern, skipPastLastEvent)
  .followedBy(recoveryPattern)
  .within(Duration.ofDays(7))

// Alternative: skip to next match for more overlapping detection
val overlappingCyclesPattern = Pattern.begin(priceDropPattern, skipToNext)
  .followedBy(recoveryPattern)
  .within(Duration.ofDays(7))

Types

// GroupPattern class and companion
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}

class GroupPattern[T, F <: T] extends Pattern[T, F]

object GroupPattern {
  def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}

// Skip strategies for pattern matching
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

// Duration for time windows
import java.time.Duration