or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md
tile.json

pattern-stream-creation.mddocs/

Pattern Stream Creation

Convert DataStreams into PatternStreams for complex event processing by applying patterns to data streams.

Capabilities

Basic Pattern Stream Creation

Create a PatternStream from a DataStream and Pattern definition.

object CEP {
  /**
   * Create PatternStream from DataStream and Pattern
   * @param input DataStream containing the input events
   * @param pattern Pattern specification to detect
   * @tparam T Type of input events
   * @return PatternStream for processing matches
   */
  def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
}

Usage Examples:

import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

case class Event(id: String, eventType: String, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[Event] = env.fromElements(
  Event("1", "start", 1000L),
  Event("2", "middle", 2000L),
  Event("3", "end", 3000L)
)

// Define pattern
val pattern = Pattern.begin[Event]("start")
  .where(_.eventType == "start")  
  .next("middle")
  .where(_.eventType == "middle")
  .followedBy("end")
  .where(_.eventType == "end")

// Create pattern stream
val patternStream = CEP.pattern(dataStream, pattern)

Pattern Stream with Event Comparator

Create a PatternStream with custom event ordering for events with equal timestamps.

object CEP {
  /**
   * Create PatternStream with custom event comparator
   * @param input DataStream containing the input events
   * @param pattern Pattern specification to detect
   * @param comparator Comparator for events with equal timestamps
   * @tparam T Type of input events
   * @return PatternStream for processing matches
   */
  def pattern[T](
    input: DataStream[T], 
    pattern: Pattern[T, _ <: T], 
    comparator: EventComparator[T]
  ): PatternStream[T]
}

Usage Examples:

import org.apache.flink.cep.EventComparator

case class PriorityEvent(priority: Int, eventType: String, timestamp: Long)

// Custom comparator for events with same timestamp
val priorityComparator = new EventComparator[PriorityEvent] {
  override def compare(o1: PriorityEvent, o2: PriorityEvent): Int = {
    // Higher priority events first
    Integer.compare(o2.priority, o1.priority)
  }
}

val dataStream: DataStream[PriorityEvent] = env.fromElements(
  PriorityEvent(1, "low", 1000L),
  PriorityEvent(5, "high", 1000L), // Same timestamp, higher priority
  PriorityEvent(3, "medium", 2000L)
)

val pattern = Pattern.begin[PriorityEvent]("start")
  .where(_.eventType.nonEmpty)

// Pattern stream with custom ordering
val patternStream = CEP.pattern(dataStream, pattern, priorityComparator)

Time Characteristics Configuration

Configure time characteristics for the pattern stream processing.

class PatternStream[T] {
  /**
   * Use processing time for pattern detection
   * @return PatternStream configured for processing time
   */
  def inProcessingTime(): PatternStream[T]
  
  /**
   * Use event time for pattern detection
   * @return PatternStream configured for event time
   */
  def inEventTime(): PatternStream[T]
}

Usage Examples:

// Processing time pattern stream
val processingTimeStream = CEP.pattern(dataStream, pattern)
  .inProcessingTime()

// Event time pattern stream (requires watermarks in source)
val eventTimeStream = CEP.pattern(dataStream, pattern)
  .inEventTime()

Late Data Handling

Configure side output for late arriving data that misses pattern windows.

class PatternStream[T] {
  /**
   * Configure side output for late data
   * @param lateDataOutputTag OutputTag for late events
   * @return PatternStream with late data handling
   */
  def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala.OutputTag

case class TimestampedEvent(data: String, eventTime: Long)

val lateDataTag = OutputTag[TimestampedEvent]("late-data")

val patternStream = CEP.pattern(dataStream, pattern)
  .inEventTime()
  .sideOutputLateData(lateDataTag)

// Process main results
val results = patternStream.select { pattern =>
  // Process matched patterns
  pattern.toString
}

// Handle late data separately
val lateData = results.getSideOutput(lateDataTag)
lateData.print("Late data: ")

Advanced Pattern Stream Configuration

Combine multiple configuration options for comprehensive pattern stream setup.

Usage Examples:

import org.apache.flink.cep.EventComparator
import org.apache.flink.streaming.api.scala.OutputTag
import java.time.Duration

case class ComplexEvent(
  id: String,
  eventType: String, 
  priority: Int,
  eventTime: Long,
  processingTime: Long
)

// Custom comparator considering both priority and processing time
val complexComparator = new EventComparator[ComplexEvent] {
  override def compare(o1: ComplexEvent, o2: ComplexEvent): Int = {
    val priorityComp = Integer.compare(o2.priority, o1.priority)
    if (priorityComp != 0) priorityComp
    else Long.compare(o1.processingTime, o2.processingTime)
  }
}

// Complex pattern with time window
val complexPattern = Pattern.begin[ComplexEvent]("high-priority")
  .where(_.priority >= 8)
  .followedBy("any-event")
  .where(_.eventType.nonEmpty)
  .within(Duration.ofMinutes(5))

val lateDataTag = OutputTag[ComplexEvent]("late-events")

// Fully configured pattern stream
val fullPatternStream = CEP.pattern(dataStream, complexPattern, complexComparator)
  .inEventTime()
  .sideOutputLateData(lateDataTag)

// Process with timeout handling
val timeoutTag = OutputTag[String]("timeouts")
val results = fullPatternStream.select(timeoutTag)(
  // Timeout function
  (pattern: Map[String, Iterable[ComplexEvent]], timestamp: Long) => {
    s"Pattern timed out at $timestamp: ${pattern.keys.mkString(", ")}"
  }
)(
  // Success function
  (pattern: Map[String, Iterable[ComplexEvent]]) => {
    val highPriorityEvent = pattern("high-priority").head
    val followingEvent = pattern("any-event").head
    s"Complex pattern: ${highPriorityEvent.id} -> ${followingEvent.id}"
  }
)

// Handle all output streams
results.print("Matches: ")
results.getSideOutput(timeoutTag).print("Timeouts: ")
results.getSideOutput(lateDataTag).print("Late data: ")

Types

// Core types from Flink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.cep.EventComparator

// Pattern stream wrapper
class PatternStream[T] {
  // Internal Java pattern stream (not directly accessible)
  private[flink] def wrappedPatternStream: org.apache.flink.cep.PatternStream[T]
}

// Event comparator interface
abstract class EventComparator[T] {
  def compare(o1: T, o2: T): Int
}