⚠️ DEPRECATED: This API is deprecated as of Flink 1.18+ and will be removed in a future version. Users should migrate to the Java CEP API. See FLIP-265 for details.
The Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications built on Apache Flink. This library serves as a Scala wrapper around the Java-based Flink CEP engine, offering idiomatic Scala APIs for pattern matching, event stream processing, and complex event detection with type-safe transformations.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.12</artifactId>
<version>1.20.2</version>
</dependency>import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
case class Event(name: String, value: Int, timestamp: Long)
// Create pattern
val pattern = Pattern.begin[Event]("start")
.where(_.name == "start")
.next("middle")
.where(_.value > 10)
.followedBy("end")
.where(_.name == "end")
// Apply pattern to data stream
val patternStream = CEP.pattern(dataStream, pattern)
// Process matched patterns
val result = patternStream.select { pattern =>
val startEvent = pattern("start").head
val endEvent = pattern("end").head
s"Pattern matched: ${startEvent.name} -> ${endEvent.name}"
}The Flink CEP Scala API is built around several key components:
Define complex event patterns using a fluent Scala DSL with temporal constraints, conditions, and quantifiers.
object Pattern {
def begin[X](name: String): Pattern[X, X]
def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
}
class Pattern[T, F <: T] {
def where(condition: F => Boolean): Pattern[T, F]
def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
def next(name: String): Pattern[T, T]
def followedBy(name: String): Pattern[T, T]
def within(windowTime: Duration): Pattern[T, F]
def oneOrMore: Pattern[T, F]
def optional: Pattern[T, F]
}Convert DataStreams into PatternStreams for complex event processing.
object CEP {
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T], comparator: EventComparator[T]): PatternStream[T]
}Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations.
class PatternStream[T] {
def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit): DataStream[R]
def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R]): DataStream[R]
}Handle partial pattern matches that timeout with side outputs for comprehensive event processing.
class PatternStream[T] {
def select[L: TypeInformation, R: TypeInformation](
outputTag: OutputTag[L],
patternTimeoutFunction: PatternTimeoutFunction[T, L],
patternSelectFunction: PatternSelectFunction[T, R]
): DataStream[R]
}Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns.
class GroupPattern[T, F <: T] extends Pattern[T, F] {
// Inherits Pattern methods but restricts where(), or(), and subtype()
}
object Pattern {
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
}// Core pattern types
trait Context[T] {
def getEventsForPattern(name: String): Iterable[T]
}
// Imported from Java CEP
abstract class PatternSelectFunction[T, R] {
def select(pattern: java.util.Map[String, java.util.List[T]]): R
}
abstract class PatternFlatSelectFunction[T, R] {
def flatSelect(pattern: java.util.Map[String, java.util.List[T]], out: Collector[R]): Unit
}
abstract class PatternTimeoutFunction[T, L] {
def timeout(pattern: java.util.Map[String, java.util.List[T]], timeoutTimestamp: Long): L
}
abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
def processMatch(
`match`: java.util.Map[String, java.util.List[T]],
ctx: PatternProcessFunction.Context,
out: Collector[R]
): Unit
}