Convert DataStreams into PatternStreams for complex event processing by applying patterns to data streams.
Create a PatternStream from a DataStream and Pattern definition.
object CEP {
/**
* Create PatternStream from DataStream and Pattern
* @param input DataStream containing the input events
* @param pattern Pattern specification to detect
* @tparam T Type of input events
* @return PatternStream for processing matches
*/
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
}Usage Examples:
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
case class Event(id: String, eventType: String, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[Event] = env.fromElements(
Event("1", "start", 1000L),
Event("2", "middle", 2000L),
Event("3", "end", 3000L)
)
// Define pattern
val pattern = Pattern.begin[Event]("start")
.where(_.eventType == "start")
.next("middle")
.where(_.eventType == "middle")
.followedBy("end")
.where(_.eventType == "end")
// Create pattern stream
val patternStream = CEP.pattern(dataStream, pattern)Create a PatternStream with custom event ordering for events with equal timestamps.
object CEP {
/**
* Create PatternStream with custom event comparator
* @param input DataStream containing the input events
* @param pattern Pattern specification to detect
* @param comparator Comparator for events with equal timestamps
* @tparam T Type of input events
* @return PatternStream for processing matches
*/
def pattern[T](
input: DataStream[T],
pattern: Pattern[T, _ <: T],
comparator: EventComparator[T]
): PatternStream[T]
}Usage Examples:
import org.apache.flink.cep.EventComparator
case class PriorityEvent(priority: Int, eventType: String, timestamp: Long)
// Custom comparator for events with same timestamp
val priorityComparator = new EventComparator[PriorityEvent] {
override def compare(o1: PriorityEvent, o2: PriorityEvent): Int = {
// Higher priority events first
Integer.compare(o2.priority, o1.priority)
}
}
val dataStream: DataStream[PriorityEvent] = env.fromElements(
PriorityEvent(1, "low", 1000L),
PriorityEvent(5, "high", 1000L), // Same timestamp, higher priority
PriorityEvent(3, "medium", 2000L)
)
val pattern = Pattern.begin[PriorityEvent]("start")
.where(_.eventType.nonEmpty)
// Pattern stream with custom ordering
val patternStream = CEP.pattern(dataStream, pattern, priorityComparator)Configure time characteristics for the pattern stream processing.
class PatternStream[T] {
/**
* Use processing time for pattern detection
* @return PatternStream configured for processing time
*/
def inProcessingTime(): PatternStream[T]
/**
* Use event time for pattern detection
* @return PatternStream configured for event time
*/
def inEventTime(): PatternStream[T]
}Usage Examples:
// Processing time pattern stream
val processingTimeStream = CEP.pattern(dataStream, pattern)
.inProcessingTime()
// Event time pattern stream (requires watermarks in source)
val eventTimeStream = CEP.pattern(dataStream, pattern)
.inEventTime()Configure side output for late arriving data that misses pattern windows.
class PatternStream[T] {
/**
* Configure side output for late data
* @param lateDataOutputTag OutputTag for late events
* @return PatternStream with late data handling
*/
def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala.OutputTag
case class TimestampedEvent(data: String, eventTime: Long)
val lateDataTag = OutputTag[TimestampedEvent]("late-data")
val patternStream = CEP.pattern(dataStream, pattern)
.inEventTime()
.sideOutputLateData(lateDataTag)
// Process main results
val results = patternStream.select { pattern =>
// Process matched patterns
pattern.toString
}
// Handle late data separately
val lateData = results.getSideOutput(lateDataTag)
lateData.print("Late data: ")Combine multiple configuration options for comprehensive pattern stream setup.
Usage Examples:
import org.apache.flink.cep.EventComparator
import org.apache.flink.streaming.api.scala.OutputTag
import java.time.Duration
case class ComplexEvent(
id: String,
eventType: String,
priority: Int,
eventTime: Long,
processingTime: Long
)
// Custom comparator considering both priority and processing time
val complexComparator = new EventComparator[ComplexEvent] {
override def compare(o1: ComplexEvent, o2: ComplexEvent): Int = {
val priorityComp = Integer.compare(o2.priority, o1.priority)
if (priorityComp != 0) priorityComp
else Long.compare(o1.processingTime, o2.processingTime)
}
}
// Complex pattern with time window
val complexPattern = Pattern.begin[ComplexEvent]("high-priority")
.where(_.priority >= 8)
.followedBy("any-event")
.where(_.eventType.nonEmpty)
.within(Duration.ofMinutes(5))
val lateDataTag = OutputTag[ComplexEvent]("late-events")
// Fully configured pattern stream
val fullPatternStream = CEP.pattern(dataStream, complexPattern, complexComparator)
.inEventTime()
.sideOutputLateData(lateDataTag)
// Process with timeout handling
val timeoutTag = OutputTag[String]("timeouts")
val results = fullPatternStream.select(timeoutTag)(
// Timeout function
(pattern: Map[String, Iterable[ComplexEvent]], timestamp: Long) => {
s"Pattern timed out at $timestamp: ${pattern.keys.mkString(", ")}"
}
)(
// Success function
(pattern: Map[String, Iterable[ComplexEvent]]) => {
val highPriorityEvent = pattern("high-priority").head
val followingEvent = pattern("any-event").head
s"Complex pattern: ${highPriorityEvent.id} -> ${followingEvent.id}"
}
)
// Handle all output streams
results.print("Matches: ")
results.getSideOutput(timeoutTag).print("Timeouts: ")
results.getSideOutput(lateDataTag).print("Late data: ")// Core types from Flink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.cep.EventComparator
// Pattern stream wrapper
class PatternStream[T] {
// Internal Java pattern stream (not directly accessible)
private[flink] def wrappedPatternStream: org.apache.flink.cep.PatternStream[T]
}
// Event comparator interface
abstract class EventComparator[T] {
def compare(o1: T, o2: T): Int
}