Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications with pattern matching and event stream processing.
Convert DataStreams into PatternStreams for complex event processing by applying patterns to data streams.
Create a PatternStream from a DataStream and Pattern definition.
object CEP {
/**
* Create PatternStream from DataStream and Pattern
* @param input DataStream containing the input events
* @param pattern Pattern specification to detect
* @tparam T Type of input events
* @return PatternStream for processing matches
*/
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
}Usage Examples:
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
case class Event(id: String, eventType: String, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[Event] = env.fromElements(
Event("1", "start", 1000L),
Event("2", "middle", 2000L),
Event("3", "end", 3000L)
)
// Define pattern
val pattern = Pattern.begin[Event]("start")
.where(_.eventType == "start")
.next("middle")
.where(_.eventType == "middle")
.followedBy("end")
.where(_.eventType == "end")
// Create pattern stream
val patternStream = CEP.pattern(dataStream, pattern)Create a PatternStream with custom event ordering for events with equal timestamps.
object CEP {
/**
* Create PatternStream with custom event comparator
* @param input DataStream containing the input events
* @param pattern Pattern specification to detect
* @param comparator Comparator for events with equal timestamps
* @tparam T Type of input events
* @return PatternStream for processing matches
*/
def pattern[T](
input: DataStream[T],
pattern: Pattern[T, _ <: T],
comparator: EventComparator[T]
): PatternStream[T]
}Usage Examples:
import org.apache.flink.cep.EventComparator
case class PriorityEvent(priority: Int, eventType: String, timestamp: Long)
// Custom comparator for events with same timestamp
val priorityComparator = new EventComparator[PriorityEvent] {
override def compare(o1: PriorityEvent, o2: PriorityEvent): Int = {
// Higher priority events first
Integer.compare(o2.priority, o1.priority)
}
}
val dataStream: DataStream[PriorityEvent] = env.fromElements(
PriorityEvent(1, "low", 1000L),
PriorityEvent(5, "high", 1000L), // Same timestamp, higher priority
PriorityEvent(3, "medium", 2000L)
)
val pattern = Pattern.begin[PriorityEvent]("start")
.where(_.eventType.nonEmpty)
// Pattern stream with custom ordering
val patternStream = CEP.pattern(dataStream, pattern, priorityComparator)Configure time characteristics for the pattern stream processing.
class PatternStream[T] {
/**
* Use processing time for pattern detection
* @return PatternStream configured for processing time
*/
def inProcessingTime(): PatternStream[T]
/**
* Use event time for pattern detection
* @return PatternStream configured for event time
*/
def inEventTime(): PatternStream[T]
}Usage Examples:
// Processing time pattern stream
val processingTimeStream = CEP.pattern(dataStream, pattern)
.inProcessingTime()
// Event time pattern stream (requires watermarks in source)
val eventTimeStream = CEP.pattern(dataStream, pattern)
.inEventTime()Configure side output for late arriving data that misses pattern windows.
class PatternStream[T] {
/**
* Configure side output for late data
* @param lateDataOutputTag OutputTag for late events
* @return PatternStream with late data handling
*/
def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala.OutputTag
case class TimestampedEvent(data: String, eventTime: Long)
val lateDataTag = OutputTag[TimestampedEvent]("late-data")
val patternStream = CEP.pattern(dataStream, pattern)
.inEventTime()
.sideOutputLateData(lateDataTag)
// Process main results
val results = patternStream.select { pattern =>
// Process matched patterns
pattern.toString
}
// Handle late data separately
val lateData = results.getSideOutput(lateDataTag)
lateData.print("Late data: ")Combine multiple configuration options for comprehensive pattern stream setup.
Usage Examples:
import org.apache.flink.cep.EventComparator
import org.apache.flink.streaming.api.scala.OutputTag
import java.time.Duration
case class ComplexEvent(
id: String,
eventType: String,
priority: Int,
eventTime: Long,
processingTime: Long
)
// Custom comparator considering both priority and processing time
val complexComparator = new EventComparator[ComplexEvent] {
override def compare(o1: ComplexEvent, o2: ComplexEvent): Int = {
val priorityComp = Integer.compare(o2.priority, o1.priority)
if (priorityComp != 0) priorityComp
else Long.compare(o1.processingTime, o2.processingTime)
}
}
// Complex pattern with time window
val complexPattern = Pattern.begin[ComplexEvent]("high-priority")
.where(_.priority >= 8)
.followedBy("any-event")
.where(_.eventType.nonEmpty)
.within(Duration.ofMinutes(5))
val lateDataTag = OutputTag[ComplexEvent]("late-events")
// Fully configured pattern stream
val fullPatternStream = CEP.pattern(dataStream, complexPattern, complexComparator)
.inEventTime()
.sideOutputLateData(lateDataTag)
// Process with timeout handling
val timeoutTag = OutputTag[String]("timeouts")
val results = fullPatternStream.select(timeoutTag)(
// Timeout function
(pattern: Map[String, Iterable[ComplexEvent]], timestamp: Long) => {
s"Pattern timed out at $timestamp: ${pattern.keys.mkString(", ")}"
}
)(
// Success function
(pattern: Map[String, Iterable[ComplexEvent]]) => {
val highPriorityEvent = pattern("high-priority").head
val followingEvent = pattern("any-event").head
s"Complex pattern: ${highPriorityEvent.id} -> ${followingEvent.id}"
}
)
// Handle all output streams
results.print("Matches: ")
results.getSideOutput(timeoutTag).print("Timeouts: ")
results.getSideOutput(lateDataTag).print("Late data: ")// Core types from Flink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.cep.EventComparator
// Pattern stream wrapper
class PatternStream[T] {
// Internal Java pattern stream (not directly accessible)
private[flink] def wrappedPatternStream: org.apache.flink.cep.PatternStream[T]
}
// Event comparator interface
abstract class EventComparator[T] {
def compare(o1: T, o2: T): Int
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-cep-scala-2-12