or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md
tile.json

pattern-definition.mddocs/

Pattern Definition

Complex event pattern definition using the fluent Scala DSL with temporal constraints, conditions, and quantifiers.

Capabilities

Pattern Creation

Start a new pattern sequence with a named initial pattern.

/**
 * Start a new pattern sequence with the given name
 * @param name The name of starting pattern
 * @tparam X Base type of events in the pattern
 * @return The first pattern of a pattern sequence
 */
object Pattern {
  def begin[X](name: String): Pattern[X, X]
  def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
}

Usage Examples:

import org.apache.flink.cep.scala.pattern.Pattern

case class LoginEvent(userId: String, timestamp: Long)

// Simple pattern start
val pattern = Pattern.begin[LoginEvent]("login")
  .where(_.userId.nonEmpty)

// Pattern with skip strategy
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()
val pattern = Pattern.begin[LoginEvent]("login", skipStrategy)

Condition Definition

Add conditions that events must satisfy to be considered matches.

class Pattern[T, F <: T] {
  /**
   * Add AND condition using simple predicate function
   * @param condition Predicate function for the event
   * @return Pattern with the new condition
   */
  def where(condition: F => Boolean): Pattern[T, F]
  
  /**
   * Add AND condition with access to context
   * @param condition Function taking event and context
   * @return Pattern with the new condition
   */
  def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
  
  /**
   * Add AND condition using IterativeCondition
   * @param condition The IterativeCondition to apply
   * @return Pattern with the new condition
   */
  def where(condition: IterativeCondition[F]): Pattern[T, F]
  
  /**
   * Add OR condition using simple predicate function
   * @param condition Predicate function for the event
   * @return Pattern with the new condition
   */
  def or(condition: F => Boolean): Pattern[T, F]
  
  /**
   * Add OR condition with access to context
   * @param condition Function taking event and context
   * @return Pattern with the new condition
   */
  def or(condition: (F, Context[F]) => Boolean): Pattern[T, F]
  
  /**
   * Add OR condition using IterativeCondition
   * @param condition The IterativeCondition to apply
   * @return Pattern with the new condition
   */
  def or(condition: IterativeCondition[F]): Pattern[T, F]
}

Usage Examples:

case class Event(eventType: String, value: Int, userId: String)

// Simple condition
val pattern = Pattern.begin[Event]("start")
  .where(_.eventType == "login")
  .or(_.eventType == "signup")

// Context-aware condition
val pattern = Pattern.begin[Event]("start")
  .where(_.eventType == "purchase")
  .next("confirmation")
  .where((event, ctx) => {
    val previousEvents = ctx.getEventsForPattern("start")
    event.userId == previousEvents.head.userId
  })

Subtype Constraints

Apply subtype constraints to patterns for type-safe event matching.

class Pattern[T, F <: T] {
  /**
   * Apply subtype constraint requiring events to be of specific subtype
   * @param clazz Class of the required subtype
   * @tparam S The subtype
   * @return Pattern constrained to the subtype
   */
  def subtype[S <: F](clazz: Class[S]): Pattern[T, S]
}

Usage Examples:

abstract class Event(eventType: String)
case class LoginEvent(userId: String) extends Event("login")
case class PurchaseEvent(userId: String, amount: Double) extends Event("purchase")

val pattern = Pattern.begin[Event]("start")
  .subtype(classOf[LoginEvent])
  .next("purchase")
  .subtype(classOf[PurchaseEvent])

Temporal Pattern Chaining

Chain patterns with different temporal contiguity requirements.

class Pattern[T, F <: T] {
  /**
   * Strict temporal contiguity - no events between matches
   * @param name Name of the next pattern
   * @return New pattern enforcing strict contiguity
   */
  def next(name: String): Pattern[T, T]
  
  /**
   * Non-strict temporal contiguity - events may be interleaved
   * @param name Name of the following pattern
   * @return New pattern allowing interleaved events
   */
  def followedBy(name: String): Pattern[T, T]
  
  /**
   * Non-deterministic following - matches any occurrence
   * @param name Name of the following pattern
   * @return New pattern with non-deterministic following
   */
  def followedByAny(name: String): Pattern[T, T]
  
  /**
   * Negative pattern - no matching event should follow
   * @param name Name of the pattern that should not occur
   * @return New pattern with negative constraint
   */
  def notNext(name: String): Pattern[T, T]
  
  /**
   * Negative following pattern - no matching event should occur between
   * @param name Name of the pattern that should not occur
   * @return New pattern with negative constraint
   */
  def notFollowedBy(name: String): Pattern[T, T]
}

Group Pattern Chaining

Chain patterns with other Pattern objects to create GroupPatterns.

class Pattern[T, F <: T] {
  /**
   * Chain with another pattern using followedBy semantics
   * @param pattern The pattern to follow this one
   * @return GroupPattern for further chaining
   */
  def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
  
  /**
   * Chain with another pattern using followedByAny semantics
   * @param pattern The pattern to follow this one
   * @return GroupPattern for further chaining
   */
  def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
  
  /**
   * Chain with another pattern using next semantics
   * @param pattern The pattern to follow this one strictly
   * @return GroupPattern for further chaining
   */
  def next(pattern: Pattern[T, F]): GroupPattern[T, F]
}

object Pattern {
  /**
   * Start pattern sequence with existing pattern
   * @param pattern Initial pattern for the sequence
   * @return GroupPattern for chaining
   */
  def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
  
  /**
   * Start pattern sequence with existing pattern and skip strategy
   * @param pattern Initial pattern for the sequence
   * @param afterMatchSkipStrategy Skip strategy after matches
   * @return GroupPattern for chaining
   */
  def begin[T, F <: T](
    pattern: Pattern[T, F],
    afterMatchSkipStrategy: AfterMatchSkipStrategy
  ): GroupPattern[T, F]
}

Usage Examples:

// Strict sequence - login immediately followed by purchase
val strictPattern = Pattern.begin[Event]("login")
  .where(_.eventType == "login")
  .next("purchase")
  .where(_.eventType == "purchase")

// Relaxed sequence - login followed by purchase (other events allowed)
val relaxedPattern = Pattern.begin[Event]("login")
  .where(_.eventType == "login")
  .followedBy("purchase")
  .where(_.eventType == "purchase")

// Negative pattern - login not followed by logout
val negativePattern = Pattern.begin[Event]("login")
  .where(_.eventType == "login")
  .notFollowedBy("logout")

Time Windows

Define time constraints for pattern completion.

class Pattern[T, F <: T] {
  /**
   * Set maximum time for pattern completion
   * @param windowTime Duration for pattern completion
   * @return Pattern with time constraint
   */
  def within(windowTime: Duration): Pattern[T, F]
  
  /**
   * Set maximum time for pattern completion (deprecated)
   * @param windowTime Time window for pattern completion
   * @return Pattern with time constraint
   */
  @deprecated("Use within(Duration)", "1.19.0")
  def within(windowTime: Time): Pattern[T, F]
}

Usage Examples:

import java.time.Duration

// Pattern must complete within 5 minutes
val timedPattern = Pattern.begin[Event]("start")
  .where(_.eventType == "start")
  .followedBy("end")
  .where(_.eventType == "end")
  .within(Duration.ofMinutes(5))

Pattern Quantifiers

Apply quantifiers to specify repetition patterns.

class Pattern[T, F <: T] {
  /**
   * Make pattern optional (0 or 1 occurrence)
   * @return Pattern marked as optional
   */
  def optional: Pattern[T, F]
  
  /**
   * Pattern can occur one or more times
   * @return Pattern with one-or-more quantifier 
   */
  def oneOrMore: Pattern[T, F]
  
  /**
   * Pattern occurs exactly N times
   * @param times Exact number of occurrences
   * @return Pattern with exact count quantifier
   */
  def times(times: Int): Pattern[T, F]
  
  /**
   * Pattern occurs between from and to times
   * @param from Minimum occurrences
   * @param to Maximum occurrences  
   * @return Pattern with range quantifier
   */
  def times(from: Int, to: Int): Pattern[T, F]
  
  /**
   * Pattern occurs at least N times
   * @param times Minimum number of occurrences
   * @return Pattern with at-least quantifier
   */
  def timesOrMore(times: Int): Pattern[T, F]
  
  /**
   * Use greedy matching (match as many as possible)
   * @return Pattern with greedy matching
   */
  def greedy: Pattern[T, F]
  
  /**
   * Allow combinations in quantified patterns
   * @return Pattern allowing combinations
   */
  def allowCombinations(): Pattern[T, F]
  
  /**
   * Require consecutive matching for quantified patterns
   * @return Pattern requiring consecutive matches
   */
  def consecutive(): Pattern[T, F]
}

Usage Examples:

// Optional pattern
val optionalPattern = Pattern.begin[Event]("optional")
  .where(_.eventType == "init")
  .optional
  .followedBy("required")
  .where(_.eventType == "process")

// Repeated pattern
val repeatedPattern = Pattern.begin[Event]("repeated")
  .where(_.eventType == "click")
  .oneOrMore
  .consecutive()
  .followedBy("submit")
  .where(_.eventType == "submit")

// Exact count
val exactPattern = Pattern.begin[Event]("exactly")
  .where(_.eventType == "attempt")
  .times(3)
  .followedBy("success")
  .where(_.eventType == "success")

Until Conditions

Apply stop conditions for looping patterns.

class Pattern[T, F <: T] {
  /**
   * Stop condition with simple predicate
   * @param untilCondition Condition to stop pattern matching
   * @return Pattern with until condition
   */
  def until(untilCondition: F => Boolean): Pattern[T, F]
  
  /**
   * Stop condition with context access
   * @param untilCondition Condition function with context
   * @return Pattern with until condition
   */
  def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F]
  
  /**
   * Stop condition with IterativeCondition
   * @param untilCondition The IterativeCondition for stopping
   * @return Pattern with until condition
   */
  def until(untilCondition: IterativeCondition[F]): Pattern[T, F]
}

Usage Examples:

// Loop until condition is met
val loopingPattern = Pattern.begin[Event]("loop")
  .where(_.eventType == "process")
  .oneOrMore
  .until(_.eventType == "complete")
  .followedBy("final")
  .where(_.eventType == "finish")

Pattern Properties

Access pattern properties and configuration.

class Pattern[T, F <: T] {
  /**
   * Get the previous pattern in the chain
   * @return Optional previous pattern
   */
  def getPrevious: Option[Pattern[T, _ <: T]]
  
  /**
   * Get the name of this pattern
   * @return Pattern name
   */
  def getName: String
  
  /**
   * Get the time window for pattern completion (deprecated)
   * @return Optional time window
   */
  @deprecated("Use getWindowSize", "1.19.0")
  def getWindowTime: Option[Time]
  
  /**
   * Get the time window duration for pattern completion
   * @return Optional duration window
   */
  def getWindowSize: Option[Duration]
  
  /**
   * Get the quantifier applied to this pattern
   * @return Pattern quantifier
   */
  def getQuantifier: Quantifier
  
  /**
   * Get the condition applied to this pattern
   * @return Optional iterative condition
   */
  def getCondition: Option[IterativeCondition[F]]
  
  /**
   * Get the until condition for looping patterns
   * @return Optional until condition
   */
  def getUntilCondition: Option[IterativeCondition[F]]
  
  /**
   * Get the after-match skip strategy
   * @return After match skip strategy
   */
  def getAfterMatchSkipStrategy: AfterMatchSkipStrategy
}

GroupPattern

GroupPattern extends Pattern but restricts certain operations.

class GroupPattern[T, F <: T] extends Pattern[T, F] {
  // Inherits all Pattern methods except:
  // - where() methods throw UnsupportedOperationException
  // - or() methods throw UnsupportedOperationException  
  // - subtype() throws UnsupportedOperationException
}

object GroupPattern {
  /**
   * Wrap Java GroupPattern
   * @param jGroupPattern Java GroupPattern to wrap
   * @return Scala GroupPattern wrapper
   */
  def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
}

Usage Examples:

// GroupPattern creation from Pattern chaining
val individualPattern = Pattern.begin[Event]("first").where(_.eventType == "start")
val anotherPattern = Pattern.begin[Event]("second").where(_.eventType == "end")

// Chain patterns to create GroupPattern
val groupPattern = individualPattern.followedBy(anotherPattern)

// Start with existing pattern
val groupFromPattern = Pattern.begin(individualPattern)
  .followedBy("additional")
  .where(_.eventType == "middle")

// Note: GroupPattern cannot use where(), or(), or subtype()
// This would throw UnsupportedOperationException:
// groupPattern.where(_.eventType == "invalid") // ERROR!

Types

// Context for condition evaluation
trait Context[T] {
  def getEventsForPattern(name: String): Iterable[T]
}

// Java interop types
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
import org.apache.flink.cep.pattern.Quantifier
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.Duration