CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-cep-scala-2-12

Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications with pattern matching and event stream processing.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Flink CEP Scala API

⚠️ DEPRECATED: This API is deprecated as of Flink 1.18+ and will be removed in a future version. Users should migrate to the Java CEP API. See FLIP-265 for details.

The Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications built on Apache Flink. This library serves as a Scala wrapper around the Java-based Flink CEP engine, offering idiomatic Scala APIs for pattern matching, event stream processing, and complex event detection with type-safe transformations.

Package Information

  • Package Name: flink-cep-scala_2.12
  • Package Type: maven
  • Language: Scala
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep-scala_2.12</artifactId>
      <version>1.20.2</version>
    </dependency>

Core Imports

import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

Basic Usage

import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

case class Event(name: String, value: Int, timestamp: Long)

// Create pattern
val pattern = Pattern.begin[Event]("start")
  .where(_.name == "start")
  .next("middle")
  .where(_.value > 10)
  .followedBy("end")
  .where(_.name == "end")

// Apply pattern to data stream
val patternStream = CEP.pattern(dataStream, pattern)

// Process matched patterns
val result = patternStream.select { pattern =>
  val startEvent = pattern("start").head
  val endEvent = pattern("end").head
  s"Pattern matched: ${startEvent.name} -> ${endEvent.name}"
}

Architecture

The Flink CEP Scala API is built around several key components:

  • CEP Object: Entry point for creating pattern streams from data streams
  • Pattern System: Fluent API for defining complex event patterns with temporal constraints
  • PatternStream: Stream abstraction for processing detected pattern sequences
  • Event Processing: Type-safe pattern matching with Scala functions and Java interoperability
  • Time Handling: Support for event-time and processing-time pattern detection

Capabilities

Pattern Creation

Define complex event patterns using a fluent Scala DSL with temporal constraints, conditions, and quantifiers.

object Pattern {
  def begin[X](name: String): Pattern[X, X]
  def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
}

class Pattern[T, F <: T] {
  def where(condition: F => Boolean): Pattern[T, F]
  def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
  def next(name: String): Pattern[T, T]
  def followedBy(name: String): Pattern[T, T]
  def within(windowTime: Duration): Pattern[T, F]
  def oneOrMore: Pattern[T, F]
  def optional: Pattern[T, F]
}

Pattern Definition

Pattern Stream Creation

Convert DataStreams into PatternStreams for complex event processing.

object CEP {
  def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
  def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T], comparator: EventComparator[T]): PatternStream[T]
}

Pattern Stream Creation

Pattern Processing

Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations.

class PatternStream[T] {
  def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
  def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit): DataStream[R]
  def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R]): DataStream[R]
}

Pattern Processing

Timeout Handling

Handle partial pattern matches that timeout with side outputs for comprehensive event processing.

class PatternStream[T] {
  def select[L: TypeInformation, R: TypeInformation](
    outputTag: OutputTag[L],
    patternTimeoutFunction: PatternTimeoutFunction[T, L],
    patternSelectFunction: PatternSelectFunction[T, R]
  ): DataStream[R]
}

Timeout Handling

Group Pattern Management

Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns.

class GroupPattern[T, F <: T] extends Pattern[T, F] {
  // Inherits Pattern methods but restricts where(), or(), and subtype()
}

object Pattern {
  def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
}

Group Pattern Management

Types

// Core pattern types
trait Context[T] {
  def getEventsForPattern(name: String): Iterable[T]
}

// Imported from Java CEP
abstract class PatternSelectFunction[T, R] {
  def select(pattern: java.util.Map[String, java.util.List[T]]): R
}

abstract class PatternFlatSelectFunction[T, R] {
  def flatSelect(pattern: java.util.Map[String, java.util.List[T]], out: Collector[R]): Unit
}

abstract class PatternTimeoutFunction[T, L] {
  def timeout(pattern: java.util.Map[String, java.util.List[T]], timeoutTimestamp: Long): L
}

abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
  def processMatch(
    `match`: java.util.Map[String, java.util.List[T]],
    ctx: PatternProcessFunction.Context,
    out: Collector[R]
  ): Unit
}

docs

group-pattern-management.md

index.md

pattern-definition.md

pattern-processing.md

pattern-stream-creation.md

timeout-handling.md

tile.json