or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-cep-scala_2.12

Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications with pattern matching and event stream processing.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-cep-scala_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-cep-scala_2.12@1.20.0

0

# Flink CEP Scala API

1

2

**⚠️ DEPRECATED**: This API is deprecated as of Flink 1.18+ and will be removed in a future version. Users should migrate to the Java CEP API. See [FLIP-265](https://s.apache.org/flip-265) for details.

3

4

The Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications built on Apache Flink. This library serves as a Scala wrapper around the Java-based Flink CEP engine, offering idiomatic Scala APIs for pattern matching, event stream processing, and complex event detection with type-safe transformations.

5

6

## Package Information

7

8

- **Package Name**: flink-cep-scala_2.12

9

- **Package Type**: maven

10

- **Language**: Scala

11

- **Installation**:

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-cep-scala_2.12</artifactId>

16

<version>1.20.2</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```scala

23

import org.apache.flink.cep.scala.CEP

24

import org.apache.flink.cep.scala.PatternStream

25

import org.apache.flink.cep.scala.pattern.Pattern

26

import org.apache.flink.streaming.api.scala._

27

```

28

29

## Basic Usage

30

31

```scala

32

import org.apache.flink.cep.scala.CEP

33

import org.apache.flink.cep.scala.pattern.Pattern

34

import org.apache.flink.streaming.api.scala._

35

36

case class Event(name: String, value: Int, timestamp: Long)

37

38

// Create pattern

39

val pattern = Pattern.begin[Event]("start")

40

.where(_.name == "start")

41

.next("middle")

42

.where(_.value > 10)

43

.followedBy("end")

44

.where(_.name == "end")

45

46

// Apply pattern to data stream

47

val patternStream = CEP.pattern(dataStream, pattern)

48

49

// Process matched patterns

50

val result = patternStream.select { pattern =>

51

val startEvent = pattern("start").head

52

val endEvent = pattern("end").head

53

s"Pattern matched: ${startEvent.name} -> ${endEvent.name}"

54

}

55

```

56

57

## Architecture

58

59

The Flink CEP Scala API is built around several key components:

60

61

- **CEP Object**: Entry point for creating pattern streams from data streams

62

- **Pattern System**: Fluent API for defining complex event patterns with temporal constraints

63

- **PatternStream**: Stream abstraction for processing detected pattern sequences

64

- **Event Processing**: Type-safe pattern matching with Scala functions and Java interoperability

65

- **Time Handling**: Support for event-time and processing-time pattern detection

66

67

## Capabilities

68

69

### Pattern Creation

70

71

Define complex event patterns using a fluent Scala DSL with temporal constraints, conditions, and quantifiers.

72

73

```scala { .api }

74

object Pattern {

75

def begin[X](name: String): Pattern[X, X]

76

def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]

77

}

78

79

class Pattern[T, F <: T] {

80

def where(condition: F => Boolean): Pattern[T, F]

81

def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]

82

def next(name: String): Pattern[T, T]

83

def followedBy(name: String): Pattern[T, T]

84

def within(windowTime: Duration): Pattern[T, F]

85

def oneOrMore: Pattern[T, F]

86

def optional: Pattern[T, F]

87

}

88

```

89

90

[Pattern Definition](./pattern-definition.md)

91

92

### Pattern Stream Creation

93

94

Convert DataStreams into PatternStreams for complex event processing.

95

96

```scala { .api }

97

object CEP {

98

def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]

99

def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T], comparator: EventComparator[T]): PatternStream[T]

100

}

101

```

102

103

[Pattern Stream Creation](./pattern-stream-creation.md)

104

105

### Pattern Processing

106

107

Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations.

108

109

```scala { .api }

110

class PatternStream[T] {

111

def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]

112

def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit): DataStream[R]

113

def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R]): DataStream[R]

114

}

115

```

116

117

[Pattern Processing](./pattern-processing.md)

118

119

### Timeout Handling

120

121

Handle partial pattern matches that timeout with side outputs for comprehensive event processing.

122

123

```scala { .api }

124

class PatternStream[T] {

125

def select[L: TypeInformation, R: TypeInformation](

126

outputTag: OutputTag[L],

127

patternTimeoutFunction: PatternTimeoutFunction[T, L],

128

patternSelectFunction: PatternSelectFunction[T, R]

129

): DataStream[R]

130

}

131

```

132

133

[Timeout Handling](./timeout-handling.md)

134

135

### Group Pattern Management

136

137

Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns.

138

139

```scala { .api }

140

class GroupPattern[T, F <: T] extends Pattern[T, F] {

141

// Inherits Pattern methods but restricts where(), or(), and subtype()

142

}

143

144

object Pattern {

145

def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]

146

}

147

```

148

149

[Group Pattern Management](./group-pattern-management.md)

150

151

## Types

152

153

```scala { .api }

154

// Core pattern types

155

trait Context[T] {

156

def getEventsForPattern(name: String): Iterable[T]

157

}

158

159

// Imported from Java CEP

160

abstract class PatternSelectFunction[T, R] {

161

def select(pattern: java.util.Map[String, java.util.List[T]]): R

162

}

163

164

abstract class PatternFlatSelectFunction[T, R] {

165

def flatSelect(pattern: java.util.Map[String, java.util.List[T]], out: Collector[R]): Unit

166

}

167

168

abstract class PatternTimeoutFunction[T, L] {

169

def timeout(pattern: java.util.Map[String, java.util.List[T]], timeoutTimestamp: Long): L

170

}

171

172

abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {

173

def processMatch(

174

`match`: java.util.Map[String, java.util.List[T]],

175

ctx: PatternProcessFunction.Context,

176

out: Collector[R]

177

): Unit

178

}

179

```