0
# Flink CEP Scala API
1
2
**⚠️ DEPRECATED**: This API is deprecated as of Flink 1.18+ and will be removed in a future version. Users should migrate to the Java CEP API. See [FLIP-265](https://s.apache.org/flip-265) for details.
3
4
The Apache Flink CEP Scala API provides Complex Event Processing capabilities for Scala applications built on Apache Flink. This library serves as a Scala wrapper around the Java-based Flink CEP engine, offering idiomatic Scala APIs for pattern matching, event stream processing, and complex event detection with type-safe transformations.
5
6
## Package Information
7
8
- **Package Name**: flink-cep-scala_2.12
9
- **Package Type**: maven
10
- **Language**: Scala
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-cep-scala_2.12</artifactId>
16
<version>1.20.2</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```scala
23
import org.apache.flink.cep.scala.CEP
24
import org.apache.flink.cep.scala.PatternStream
25
import org.apache.flink.cep.scala.pattern.Pattern
26
import org.apache.flink.streaming.api.scala._
27
```
28
29
## Basic Usage
30
31
```scala
32
import org.apache.flink.cep.scala.CEP
33
import org.apache.flink.cep.scala.pattern.Pattern
34
import org.apache.flink.streaming.api.scala._
35
36
case class Event(name: String, value: Int, timestamp: Long)
37
38
// Create pattern
39
val pattern = Pattern.begin[Event]("start")
40
.where(_.name == "start")
41
.next("middle")
42
.where(_.value > 10)
43
.followedBy("end")
44
.where(_.name == "end")
45
46
// Apply pattern to data stream
47
val patternStream = CEP.pattern(dataStream, pattern)
48
49
// Process matched patterns
50
val result = patternStream.select { pattern =>
51
val startEvent = pattern("start").head
52
val endEvent = pattern("end").head
53
s"Pattern matched: ${startEvent.name} -> ${endEvent.name}"
54
}
55
```
56
57
## Architecture
58
59
The Flink CEP Scala API is built around several key components:
60
61
- **CEP Object**: Entry point for creating pattern streams from data streams
62
- **Pattern System**: Fluent API for defining complex event patterns with temporal constraints
63
- **PatternStream**: Stream abstraction for processing detected pattern sequences
64
- **Event Processing**: Type-safe pattern matching with Scala functions and Java interoperability
65
- **Time Handling**: Support for event-time and processing-time pattern detection
66
67
## Capabilities
68
69
### Pattern Creation
70
71
Define complex event patterns using a fluent Scala DSL with temporal constraints, conditions, and quantifiers.
72
73
```scala { .api }
74
object Pattern {
75
def begin[X](name: String): Pattern[X, X]
76
def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
77
}
78
79
class Pattern[T, F <: T] {
80
def where(condition: F => Boolean): Pattern[T, F]
81
def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
82
def next(name: String): Pattern[T, T]
83
def followedBy(name: String): Pattern[T, T]
84
def within(windowTime: Duration): Pattern[T, F]
85
def oneOrMore: Pattern[T, F]
86
def optional: Pattern[T, F]
87
}
88
```
89
90
[Pattern Definition](./pattern-definition.md)
91
92
### Pattern Stream Creation
93
94
Convert DataStreams into PatternStreams for complex event processing.
95
96
```scala { .api }
97
object CEP {
98
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
99
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T], comparator: EventComparator[T]): PatternStream[T]
100
}
101
```
102
103
[Pattern Stream Creation](./pattern-stream-creation.md)
104
105
### Pattern Processing
106
107
Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations.
108
109
```scala { .api }
110
class PatternStream[T] {
111
def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
112
def flatSelect[R: TypeInformation](patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit): DataStream[R]
113
def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R]): DataStream[R]
114
}
115
```
116
117
[Pattern Processing](./pattern-processing.md)
118
119
### Timeout Handling
120
121
Handle partial pattern matches that timeout with side outputs for comprehensive event processing.
122
123
```scala { .api }
124
class PatternStream[T] {
125
def select[L: TypeInformation, R: TypeInformation](
126
outputTag: OutputTag[L],
127
patternTimeoutFunction: PatternTimeoutFunction[T, L],
128
patternSelectFunction: PatternSelectFunction[T, R]
129
): DataStream[R]
130
}
131
```
132
133
[Timeout Handling](./timeout-handling.md)
134
135
### Group Pattern Management
136
137
Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns.
138
139
```scala { .api }
140
class GroupPattern[T, F <: T] extends Pattern[T, F] {
141
// Inherits Pattern methods but restricts where(), or(), and subtype()
142
}
143
144
object Pattern {
145
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
146
}
147
```
148
149
[Group Pattern Management](./group-pattern-management.md)
150
151
## Types
152
153
```scala { .api }
154
// Core pattern types
155
trait Context[T] {
156
def getEventsForPattern(name: String): Iterable[T]
157
}
158
159
// Imported from Java CEP
160
abstract class PatternSelectFunction[T, R] {
161
def select(pattern: java.util.Map[String, java.util.List[T]]): R
162
}
163
164
abstract class PatternFlatSelectFunction[T, R] {
165
def flatSelect(pattern: java.util.Map[String, java.util.List[T]], out: Collector[R]): Unit
166
}
167
168
abstract class PatternTimeoutFunction[T, L] {
169
def timeout(pattern: java.util.Map[String, java.util.List[T]], timeoutTimestamp: Long): L
170
}
171
172
abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
173
def processMatch(
174
`match`: java.util.Map[String, java.util.List[T]],
175
ctx: PatternProcessFunction.Context,
176
out: Collector[R]
177
): Unit
178
}
179
```