or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md
tile.json

tessl/maven-org-apache-flink--flink-cep-scala-2-12

Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications with pattern matching and event stream processing.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-cep-scala_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-cep-scala-2-12@1.20.0

index.mddocs/

Flink CEP Scala API

⚠️ DEPRECATED: This API is deprecated as of Flink 1.18+ and will be removed in a future version. Users should migrate to the Java CEP API. See FLIP-265 for details.

The Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications built on Apache Flink. This library serves as a Scala wrapper around the Java-based Flink CEP engine, offering idiomatic Scala APIs for pattern matching, event stream processing, and complex event detection with type-safe transformations.

Package Information

  • Package Name: flink-cep-scala_2.12
  • Package Type: maven
  • Language: Scala
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep-scala_2.12</artifactId>
      <version>1.20.2</version>
    </dependency>

Core Imports

import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

Basic Usage

import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

case class Event(name: String, value: Int, timestamp: Long)

// Create pattern
val pattern = Pattern.begin[Event]("start")
  .where(_.name == "start")
  .next("middle")
  .where(_.value > 10)
  .followedBy("end")
  .where(_.name == "end")

// Apply pattern to data stream
val patternStream = CEP.pattern(dataStream, pattern)

// Process matched patterns
val result = patternStream.select { pattern =>
  val startEvent = pattern("start").head
  val endEvent = pattern("end").head
  s"Pattern matched: ${startEvent.name} -> ${endEvent.name}"
}

Architecture

The Flink CEP Scala API is built around several key components:

  • CEP Object: Entry point for creating pattern streams from data streams
  • Pattern System: Fluent API for defining complex event patterns with temporal constraints
  • PatternStream: Stream abstraction for processing detected pattern sequences
  • Event Processing: Type-safe pattern matching with Scala functions and Java interoperability
  • Time Handling: Support for event-time and processing-time pattern detection

Capabilities

Pattern Creation

Define complex event patterns using a fluent Scala DSL with temporal constraints, conditions, and quantifiers.

object Pattern {
  def begin[X](name: String): Pattern[X, X]
  def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
}

class Pattern[T, F <: T] {
  def where(condition: F => Boolean): Pattern[T, F]
  def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
  def next(name: String): Pattern[T, T]
  def followedBy(name: String): Pattern[T, T]
  def within(windowTime: Duration): Pattern[T, F]
  def oneOrMore: Pattern[T, F]
  def optional: Pattern[T, F]
}

Pattern Definition

Pattern Stream Creation

Convert DataStreams into PatternStreams for complex event processing.

object CEP {
  def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
  def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T], comparator: EventComparator[T]): PatternStream[T]
}

Pattern Stream Creation

Pattern Processing

Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations.

class PatternStream[T] {
  def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
  def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit): DataStream[R]
  def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R]): DataStream[R]
}

Pattern Processing

Timeout Handling

Handle partial pattern matches that timeout with side outputs for comprehensive event processing.

class PatternStream[T] {
  def select[L: TypeInformation, R: TypeInformation](
    outputTag: OutputTag[L],
    patternTimeoutFunction: PatternTimeoutFunction[T, L],
    patternSelectFunction: PatternSelectFunction[T, R]
  ): DataStream[R]
}

Timeout Handling

Group Pattern Management

Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns.

class GroupPattern[T, F <: T] extends Pattern[T, F] {
  // Inherits Pattern methods but restricts where(), or(), and subtype()
}

object Pattern {
  def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
}

Group Pattern Management

Types

// Core pattern types
trait Context[T] {
  def getEventsForPattern(name: String): Iterable[T]
}

// Imported from Java CEP
abstract class PatternSelectFunction[T, R] {
  def select(pattern: java.util.Map[String, java.util.List[T]]): R
}

abstract class PatternFlatSelectFunction[T, R] {
  def flatSelect(pattern: java.util.Map[String, java.util.List[T]], out: Collector[R]): Unit
}

abstract class PatternTimeoutFunction[T, L] {
  def timeout(pattern: java.util.Map[String, java.util.List[T]], timeoutTimestamp: Long): L
}

abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
  def processMatch(
    `match`: java.util.Map[String, java.util.List[T]],
    ctx: PatternProcessFunction.Context,
    out: Collector[R]
  ): Unit
}