0
# Pattern Processing
1
2
Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations for transforming matched patterns into result streams.
3
4
## Capabilities
5
6
### Select Operations
7
8
Transform matched patterns into single output elements using select functions.
9
10
```scala { .api }
11
class PatternStream[T] {
12
/**
13
* Process patterns with Scala function returning single result per match
14
* @param patternSelectFun Function transforming pattern map to result
15
* @tparam R Type of result elements
16
* @return DataStream containing results
17
*/
18
def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]
19
20
/**
21
* Process patterns with PatternSelectFunction interface
22
* @param patternSelectFunction Java interface for pattern selection
23
* @tparam R Type of result elements
24
* @return DataStream containing results
25
*/
26
def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R]
27
}
28
```
29
30
**Usage Examples:**
31
32
```scala
33
import org.apache.flink.api.common.typeinfo.TypeInformation
34
import org.apache.flink.cep.PatternSelectFunction
35
import java.util.{List => JList, Map => JMap}
36
37
case class Event(userId: String, action: String, timestamp: Long)
38
case class UserSession(userId: String, startAction: String, endAction: String, duration: Long)
39
40
// Scala function approach
41
val sessionPattern = Pattern.begin[Event]("start")
42
.where(_.action == "login")
43
.followedBy("end")
44
.where(_.action == "logout")
45
46
val sessions = CEP.pattern(dataStream, sessionPattern)
47
.select { pattern: Map[String, Iterable[Event]] =>
48
val startEvent = pattern("start").head
49
val endEvent = pattern("end").head
50
UserSession(
51
startEvent.userId,
52
startEvent.action,
53
endEvent.action,
54
endEvent.timestamp - startEvent.timestamp
55
)
56
}
57
58
// Java interface approach
59
val javaSelectFunction = new PatternSelectFunction[Event, String] {
60
override def select(pattern: JMap[String, JList[Event]]): String = {
61
val start = pattern.get("start").get(0)
62
val end = pattern.get("end").get(0)
63
s"Session: ${start.userId} from ${start.timestamp} to ${end.timestamp}"
64
}
65
}
66
67
val sessionsJava = CEP.pattern(dataStream, sessionPattern)
68
.select(javaSelectFunction)
69
```
70
71
### Flat Select Operations
72
73
Transform matched patterns into zero or more output elements using flatSelect functions.
74
75
```scala { .api }
76
class PatternStream[T] {
77
/**
78
* Process patterns with Scala function that can emit multiple results
79
* @param patternFlatSelectFun Function taking pattern and collector
80
* @tparam R Type of result elements
81
* @return DataStream containing results
82
*/
83
def flatSelect[R: TypeInformation](
84
patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit
85
): DataStream[R]
86
87
/**
88
* Process patterns with PatternFlatSelectFunction interface
89
* @param patternFlatSelectFunction Java interface for flat pattern selection
90
* @tparam R Type of result elements
91
* @return DataStream containing results
92
*/
93
def flatSelect[R: TypeInformation](
94
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
95
): DataStream[R]
96
}
97
```
98
99
**Usage Examples:**
100
101
```scala
102
import org.apache.flink.util.Collector
103
import org.apache.flink.cep.PatternFlatSelectFunction
104
105
case class Purchase(userId: String, item: String, amount: Double, timestamp: Long)
106
case class Alert(userId: String, message: String, severity: String)
107
108
// Pattern for multiple purchases
109
val purchasePattern = Pattern.begin[Purchase]("purchases")
110
.where(_.amount > 0)
111
.oneOrMore
112
.within(Duration.ofHours(1))
113
114
// Scala flat select - generate multiple alerts per pattern
115
val alerts = CEP.pattern(purchaseStream, purchasePattern)
116
.flatSelect { (pattern: Map[String, Iterable[Purchase]], out: Collector[Alert]) =>
117
val purchases = pattern("purchases")
118
val totalAmount = purchases.map(_.amount).sum
119
val userId = purchases.head.userId
120
121
// Emit volume alert
122
if (totalAmount > 1000) {
123
out.collect(Alert(userId, s"High volume: $$${totalAmount}", "HIGH"))
124
}
125
126
// Emit frequency alert
127
if (purchases.size > 10) {
128
out.collect(Alert(userId, s"High frequency: ${purchases.size} purchases", "MEDIUM"))
129
}
130
131
// Emit item-specific alerts
132
purchases.groupBy(_.item).foreach { case (item, itemPurchases) =>
133
if (itemPurchases.size > 3) {
134
out.collect(Alert(userId, s"Repeated item: $item (${itemPurchases.size}x)", "LOW"))
135
}
136
}
137
}
138
139
// Java interface approach
140
val javaFlatSelectFunction = new PatternFlatSelectFunction[Purchase, String] {
141
override def flatSelect(pattern: JMap[String, JList[Purchase]], out: Collector[String]): Unit = {
142
val purchases = pattern.get("purchases")
143
purchases.forEach { purchase =>
144
out.collect(s"Purchase: ${purchase.userId} bought ${purchase.item}")
145
}
146
}
147
}
148
```
149
150
### Process Function Operations
151
152
Use PatternProcessFunction for advanced pattern processing with rich context access.
153
154
```scala { .api }
155
class PatternStream[T] {
156
/**
157
* Process patterns using PatternProcessFunction with full context
158
* @param patternProcessFunction Process function with rich context
159
* @tparam R Type of result elements
160
* @return DataStream containing results
161
*/
162
def process[R: TypeInformation](
163
patternProcessFunction: PatternProcessFunction[T, R]
164
): DataStream[R]
165
}
166
```
167
168
**Usage Examples:**
169
170
```scala
171
import org.apache.flink.cep.functions.PatternProcessFunction
172
import org.apache.flink.configuration.Configuration
173
import org.apache.flink.util.Collector
174
175
case class Transaction(accountId: String, amount: Double, location: String, timestamp: Long)
176
case class FraudAlert(accountId: String, reason: String, transactions: List[Transaction], riskScore: Double)
177
178
class FraudDetectionProcessFunction extends PatternProcessFunction[Transaction, FraudAlert] {
179
180
override def processMatch(
181
`match`: JMap[String, JList[Transaction]],
182
ctx: PatternProcessFunction.Context,
183
out: Collector[FraudAlert]
184
): Unit = {
185
186
val suspiciousTransactions = `match`.get("suspicious")
187
import scala.jdk.CollectionConverters._
188
val transactions = suspiciousTransactions.asScala.toList
189
190
val accountId = transactions.head.accountId
191
val totalAmount = transactions.map(_.amount).sum
192
val locations = transactions.map(_.location).toSet
193
194
// Calculate risk score based on various factors
195
var riskScore = 0.0
196
var reasons = List.empty[String]
197
198
// High transaction volume
199
if (totalAmount > 10000) {
200
riskScore += 0.4
201
reasons = "high-volume" :: reasons
202
}
203
204
// Multiple locations
205
if (locations.size > 2) {
206
riskScore += 0.3
207
reasons = "multiple-locations" :: reasons
208
}
209
210
// Rapid succession
211
val timeSpan = transactions.maxBy(_.timestamp).timestamp - transactions.minBy(_.timestamp).timestamp
212
if (timeSpan < 300000) { // 5 minutes
213
riskScore += 0.3
214
reasons = "rapid-succession" :: reasons
215
}
216
217
if (riskScore > 0.5) {
218
out.collect(FraudAlert(
219
accountId,
220
reasons.mkString(", "),
221
transactions,
222
riskScore
223
))
224
}
225
}
226
}
227
228
// Usage with process function
229
val fraudPattern = Pattern.begin[Transaction]("suspicious")
230
.where(_.amount > 500)
231
.oneOrMore
232
.within(Duration.ofMinutes(10))
233
234
val fraudAlerts = CEP.pattern(transactionStream, fraudPattern)
235
.process(new FraudDetectionProcessFunction())
236
```
237
238
### Pattern Map Access
239
240
Understanding how to access matched events from pattern maps.
241
242
**Usage Examples:**
243
244
```scala
245
// Pattern with multiple named elements
246
val complexPattern = Pattern.begin[Event]("first")
247
.where(_.action == "start")
248
.next("second")
249
.where(_.action == "process")
250
.followedBy("third")
251
.where(_.action == "end")
252
253
val results = CEP.pattern(dataStream, complexPattern)
254
.select { pattern: Map[String, Iterable[Event]] =>
255
// Access events by pattern name
256
val firstEvents = pattern("first") // Iterable[Event]
257
val secondEvents = pattern("second") // Iterable[Event]
258
val thirdEvents = pattern("third") // Iterable[Event]
259
260
// For non-quantified patterns, typically one event
261
val firstEvent = firstEvents.head
262
val secondEvent = secondEvents.head
263
val thirdEvent = thirdEvents.head
264
265
s"Sequence: ${firstEvent.action} -> ${secondEvent.action} -> ${thirdEvent.action}"
266
}
267
268
// Pattern with quantified elements
269
val quantifiedPattern = Pattern.begin[Event]("start")
270
.where(_.action == "begin")
271
.followedBy("repeated")
272
.where(_.action == "process")
273
.oneOrMore // This can match multiple events
274
.followedBy("end")
275
.where(_.action == "finish")
276
277
val quantifiedResults = CEP.pattern(dataStream, quantifiedPattern)
278
.select { pattern: Map[String, Iterable[Event]] =>
279
val startEvent = pattern("start").head
280
val repeatedEvents = pattern("repeated") // Multiple events possible
281
val endEvent = pattern("end").head
282
283
s"Start: ${startEvent.action}, Repeated: ${repeatedEvents.size} times, End: ${endEvent.action}"
284
}
285
```
286
287
## Types
288
289
```scala { .api }
290
// Core collector interface
291
import org.apache.flink.util.Collector
292
293
// Pattern processing interfaces from Java CEP
294
import org.apache.flink.cep.PatternSelectFunction
295
import org.apache.flink.cep.PatternFlatSelectFunction
296
import org.apache.flink.cep.functions.PatternProcessFunction
297
298
// Type information for Scala
299
import org.apache.flink.api.common.typeinfo.TypeInformation
300
301
// Collection conversions
302
import scala.collection.Map
303
import java.util.{List => JList, Map => JMap}
304
305
// Abstract interfaces
306
abstract class PatternSelectFunction[T, R] {
307
def select(pattern: JMap[String, JList[T]]): R
308
}
309
310
abstract class PatternFlatSelectFunction[T, R] {
311
def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit
312
}
313
314
abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {
315
def processMatch(
316
`match`: JMap[String, JList[T]],
317
ctx: PatternProcessFunction.Context,
318
out: Collector[R]
319
): Unit
320
}
321
```