or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md
tile.json

pattern-processing.mddocs/

Pattern Processing

Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations for transforming matched patterns into result streams.

Capabilities

Select Operations

Transform matched patterns into single output elements using select functions.

class PatternStream[T] {
  /**
   * Process patterns with Scala function returning single result per match
   * @param patternSelectFun Function transforming pattern map to result
   * @tparam R Type of result elements
   * @return DataStream containing results
   */
  def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
  
  /**
   * Process patterns with PatternSelectFunction interface
   * @param patternSelectFunction Java interface for pattern selection
   * @tparam R Type of result elements  
   * @return DataStream containing results
   */
  def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R]
}

Usage Examples:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.PatternSelectFunction
import java.util.{List => JList, Map => JMap}

case class Event(userId: String, action: String, timestamp: Long)
case class UserSession(userId: String, startAction: String, endAction: String, duration: Long)

// Scala function approach
val sessionPattern = Pattern.begin[Event]("start")
  .where(_.action == "login")
  .followedBy("end")  
  .where(_.action == "logout")

val sessions = CEP.pattern(dataStream, sessionPattern)
  .select { pattern: Map[String, Iterable[Event]] =>
    val startEvent = pattern("start").head
    val endEvent = pattern("end").head
    UserSession(
      startEvent.userId,
      startEvent.action, 
      endEvent.action,
      endEvent.timestamp - startEvent.timestamp
    )
  }

// Java interface approach
val javaSelectFunction = new PatternSelectFunction[Event, String] {
  override def select(pattern: JMap[String, JList[Event]]): String = {
    val start = pattern.get("start").get(0)
    val end = pattern.get("end").get(0)
    s"Session: ${start.userId} from ${start.timestamp} to ${end.timestamp}"
  }
}

val sessionsJava = CEP.pattern(dataStream, sessionPattern)
  .select(javaSelectFunction)

Flat Select Operations

Transform matched patterns into zero or more output elements using flatSelect functions.

class PatternStream[T] {
  /**
   * Process patterns with Scala function that can emit multiple results
   * @param patternFlatSelectFun Function taking pattern and collector
   * @tparam R Type of result elements
   * @return DataStream containing results
   */
  def flatSelect[R: TypeInformation](
    patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit
  ): DataStream[R]
  
  /**
   * Process patterns with PatternFlatSelectFunction interface
   * @param patternFlatSelectFunction Java interface for flat pattern selection
   * @tparam R Type of result elements
   * @return DataStream containing results
   */
  def flatSelect[R: TypeInformation](
    patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.util.Collector
import org.apache.flink.cep.PatternFlatSelectFunction

case class Purchase(userId: String, item: String, amount: Double, timestamp: Long)
case class Alert(userId: String, message: String, severity: String)

// Pattern for multiple purchases
val purchasePattern = Pattern.begin[Purchase]("purchases")
  .where(_.amount > 0)
  .oneOrMore
  .within(Duration.ofHours(1))

// Scala flat select - generate multiple alerts per pattern
val alerts = CEP.pattern(purchaseStream, purchasePattern)
  .flatSelect { (pattern: Map[String, Iterable[Purchase]], out: Collector[Alert]) =>
    val purchases = pattern("purchases")
    val totalAmount = purchases.map(_.amount).sum
    val userId = purchases.head.userId
    
    // Emit volume alert
    if (totalAmount > 1000) {
      out.collect(Alert(userId, s"High volume: $$${totalAmount}", "HIGH"))
    }
    
    // Emit frequency alert
    if (purchases.size > 10) {
      out.collect(Alert(userId, s"High frequency: ${purchases.size} purchases", "MEDIUM"))
    }
    
    // Emit item-specific alerts
    purchases.groupBy(_.item).foreach { case (item, itemPurchases) =>
      if (itemPurchases.size > 3) {
        out.collect(Alert(userId, s"Repeated item: $item (${itemPurchases.size}x)", "LOW"))
      }
    }
  }

// Java interface approach
val javaFlatSelectFunction = new PatternFlatSelectFunction[Purchase, String] {
  override def flatSelect(pattern: JMap[String, JList[Purchase]], out: Collector[String]): Unit = {
    val purchases = pattern.get("purchases")
    purchases.forEach { purchase =>
      out.collect(s"Purchase: ${purchase.userId} bought ${purchase.item}")
    }
  }
}

Process Function Operations

Use PatternProcessFunction for advanced pattern processing with rich context access.

class PatternStream[T] {
  /**
   * Process patterns using PatternProcessFunction with full context
   * @param patternProcessFunction Process function with rich context
   * @tparam R Type of result elements
   * @return DataStream containing results
   */
  def process[R: TypeInformation](
    patternProcessFunction: PatternProcessFunction[T, R]
  ): DataStream[R]
}

Usage Examples:

import org.apache.flink.cep.functions.PatternProcessFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector

case class Transaction(accountId: String, amount: Double, location: String, timestamp: Long)
case class FraudAlert(accountId: String, reason: String, transactions: List[Transaction], riskScore: Double)

class FraudDetectionProcessFunction extends PatternProcessFunction[Transaction, FraudAlert] {
  
  override def processMatch(
    `match`: JMap[String, JList[Transaction]],
    ctx: PatternProcessFunction.Context,
    out: Collector[FraudAlert]
  ): Unit = {
    
    val suspiciousTransactions = `match`.get("suspicious")
    import scala.jdk.CollectionConverters._
    val transactions = suspiciousTransactions.asScala.toList
    
    val accountId = transactions.head.accountId
    val totalAmount = transactions.map(_.amount).sum
    val locations = transactions.map(_.location).toSet
    
    // Calculate risk score based on various factors
    var riskScore = 0.0
    var reasons = List.empty[String]
    
    // High transaction volume
    if (totalAmount > 10000) {
      riskScore += 0.4
      reasons = "high-volume" :: reasons
    }
    
    // Multiple locations
    if (locations.size > 2) {
      riskScore += 0.3
      reasons = "multiple-locations" :: reasons
    }
    
    // Rapid succession
    val timeSpan = transactions.maxBy(_.timestamp).timestamp - transactions.minBy(_.timestamp).timestamp
    if (timeSpan < 300000) { // 5 minutes
      riskScore += 0.3
      reasons = "rapid-succession" :: reasons
    }
    
    if (riskScore > 0.5) {
      out.collect(FraudAlert(
        accountId,
        reasons.mkString(", "),
        transactions,
        riskScore
      ))
    }
  }
}

// Usage with process function
val fraudPattern = Pattern.begin[Transaction]("suspicious")
  .where(_.amount > 500)
  .oneOrMore
  .within(Duration.ofMinutes(10))

val fraudAlerts = CEP.pattern(transactionStream, fraudPattern)
  .process(new FraudDetectionProcessFunction())

Pattern Map Access

Understanding how to access matched events from pattern maps.

Usage Examples:

// Pattern with multiple named elements
val complexPattern = Pattern.begin[Event]("first")
  .where(_.action == "start")
  .next("second")
  .where(_.action == "process")
  .followedBy("third")
  .where(_.action == "end")

val results = CEP.pattern(dataStream, complexPattern)
  .select { pattern: Map[String, Iterable[Event]] =>
    // Access events by pattern name
    val firstEvents = pattern("first") // Iterable[Event]  
    val secondEvents = pattern("second") // Iterable[Event]
    val thirdEvents = pattern("third") // Iterable[Event]
    
    // For non-quantified patterns, typically one event
    val firstEvent = firstEvents.head
    val secondEvent = secondEvents.head
    val thirdEvent = thirdEvents.head
    
    s"Sequence: ${firstEvent.action} -> ${secondEvent.action} -> ${thirdEvent.action}"
  }

// Pattern with quantified elements
val quantifiedPattern = Pattern.begin[Event]("start")
  .where(_.action == "begin")
  .followedBy("repeated")
  .where(_.action == "process")
  .oneOrMore // This can match multiple events
  .followedBy("end")
  .where(_.action == "finish")

val quantifiedResults = CEP.pattern(dataStream, quantifiedPattern)
  .select { pattern: Map[String, Iterable[Event]] =>
    val startEvent = pattern("start").head
    val repeatedEvents = pattern("repeated") // Multiple events possible
    val endEvent = pattern("end").head
    
    s"Start: ${startEvent.action}, Repeated: ${repeatedEvents.size} times, End: ${endEvent.action}"
  }

Types

// Core collector interface
import org.apache.flink.util.Collector

// Pattern processing interfaces from Java CEP
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.PatternFlatSelectFunction  
import org.apache.flink.cep.functions.PatternProcessFunction

// Type information for Scala
import org.apache.flink.api.common.typeinfo.TypeInformation

// Collection conversions
import scala.collection.Map
import java.util.{List => JList, Map => JMap}

// Abstract interfaces
abstract class PatternSelectFunction[T, R] {
  def select(pattern: JMap[String, JList[T]]): R
}

abstract class PatternFlatSelectFunction[T, R] {
  def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit
}

abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
  def processMatch(
    `match`: JMap[String, JList[T]],
    ctx: PatternProcessFunction.Context,
    out: Collector[R]
  ): Unit
}