0
# Pattern Stream Creation
1
2
Convert DataStreams into PatternStreams for complex event processing by applying patterns to data streams.
3
4
## Capabilities
5
6
### Basic Pattern Stream Creation
7
8
Create a PatternStream from a DataStream and Pattern definition.
9
10
```scala { .api }
11
object CEP {
12
/**
13
* Create PatternStream from DataStream and Pattern
14
* @param input DataStream containing the input events
15
* @param pattern Pattern specification to detect
16
* @tparam T Type of input events
17
* @return PatternStream for processing matches
18
*/
19
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]
20
}
21
```
22
23
**Usage Examples:**
24
25
```scala
26
import org.apache.flink.cep.scala.CEP
27
import org.apache.flink.cep.scala.pattern.Pattern
28
import org.apache.flink.streaming.api.scala._
29
30
case class Event(id: String, eventType: String, timestamp: Long)
31
32
val env = StreamExecutionEnvironment.getExecutionEnvironment
33
val dataStream: DataStream[Event] = env.fromElements(
34
Event("1", "start", 1000L),
35
Event("2", "middle", 2000L),
36
Event("3", "end", 3000L)
37
)
38
39
// Define pattern
40
val pattern = Pattern.begin[Event]("start")
41
.where(_.eventType == "start")
42
.next("middle")
43
.where(_.eventType == "middle")
44
.followedBy("end")
45
.where(_.eventType == "end")
46
47
// Create pattern stream
48
val patternStream = CEP.pattern(dataStream, pattern)
49
```
50
51
### Pattern Stream with Event Comparator
52
53
Create a PatternStream with custom event ordering for events with equal timestamps.
54
55
```scala { .api }
56
object CEP {
57
/**
58
* Create PatternStream with custom event comparator
59
* @param input DataStream containing the input events
60
* @param pattern Pattern specification to detect
61
* @param comparator Comparator for events with equal timestamps
62
* @tparam T Type of input events
63
* @return PatternStream for processing matches
64
*/
65
def pattern[T](
66
input: DataStream[T],
67
pattern: Pattern[T, _ <: T],
68
comparator: EventComparator[T]
69
): PatternStream[T]
70
}
71
```
72
73
**Usage Examples:**
74
75
```scala
76
import org.apache.flink.cep.EventComparator
77
78
case class PriorityEvent(priority: Int, eventType: String, timestamp: Long)
79
80
// Custom comparator for events with same timestamp
81
val priorityComparator = new EventComparator[PriorityEvent] {
82
override def compare(o1: PriorityEvent, o2: PriorityEvent): Int = {
83
// Higher priority events first
84
Integer.compare(o2.priority, o1.priority)
85
}
86
}
87
88
val dataStream: DataStream[PriorityEvent] = env.fromElements(
89
PriorityEvent(1, "low", 1000L),
90
PriorityEvent(5, "high", 1000L), // Same timestamp, higher priority
91
PriorityEvent(3, "medium", 2000L)
92
)
93
94
val pattern = Pattern.begin[PriorityEvent]("start")
95
.where(_.eventType.nonEmpty)
96
97
// Pattern stream with custom ordering
98
val patternStream = CEP.pattern(dataStream, pattern, priorityComparator)
99
```
100
101
### Time Characteristics Configuration
102
103
Configure time characteristics for the pattern stream processing.
104
105
```scala { .api }
106
class PatternStream[T] {
107
/**
108
* Use processing time for pattern detection
109
* @return PatternStream configured for processing time
110
*/
111
def inProcessingTime(): PatternStream[T]
112
113
/**
114
* Use event time for pattern detection
115
* @return PatternStream configured for event time
116
*/
117
def inEventTime(): PatternStream[T]
118
}
119
```
120
121
**Usage Examples:**
122
123
```scala
124
// Processing time pattern stream
125
val processingTimeStream = CEP.pattern(dataStream, pattern)
126
.inProcessingTime()
127
128
// Event time pattern stream (requires watermarks in source)
129
val eventTimeStream = CEP.pattern(dataStream, pattern)
130
.inEventTime()
131
```
132
133
### Late Data Handling
134
135
Configure side output for late arriving data that misses pattern windows.
136
137
```scala { .api }
138
class PatternStream[T] {
139
/**
140
* Configure side output for late data
141
* @param lateDataOutputTag OutputTag for late events
142
* @return PatternStream with late data handling
143
*/
144
def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T]
145
}
146
```
147
148
**Usage Examples:**
149
150
```scala
151
import org.apache.flink.streaming.api.scala.OutputTag
152
153
case class TimestampedEvent(data: String, eventTime: Long)
154
155
val lateDataTag = OutputTag[TimestampedEvent]("late-data")
156
157
val patternStream = CEP.pattern(dataStream, pattern)
158
.inEventTime()
159
.sideOutputLateData(lateDataTag)
160
161
// Process main results
162
val results = patternStream.select { pattern =>
163
// Process matched patterns
164
pattern.toString
165
}
166
167
// Handle late data separately
168
val lateData = results.getSideOutput(lateDataTag)
169
lateData.print("Late data: ")
170
```
171
172
### Advanced Pattern Stream Configuration
173
174
Combine multiple configuration options for comprehensive pattern stream setup.
175
176
**Usage Examples:**
177
178
```scala
179
import org.apache.flink.cep.EventComparator
180
import org.apache.flink.streaming.api.scala.OutputTag
181
import java.time.Duration
182
183
case class ComplexEvent(
184
id: String,
185
eventType: String,
186
priority: Int,
187
eventTime: Long,
188
processingTime: Long
189
)
190
191
// Custom comparator considering both priority and processing time
192
val complexComparator = new EventComparator[ComplexEvent] {
193
override def compare(o1: ComplexEvent, o2: ComplexEvent): Int = {
194
val priorityComp = Integer.compare(o2.priority, o1.priority)
195
if (priorityComp != 0) priorityComp
196
else Long.compare(o1.processingTime, o2.processingTime)
197
}
198
}
199
200
// Complex pattern with time window
201
val complexPattern = Pattern.begin[ComplexEvent]("high-priority")
202
.where(_.priority >= 8)
203
.followedBy("any-event")
204
.where(_.eventType.nonEmpty)
205
.within(Duration.ofMinutes(5))
206
207
val lateDataTag = OutputTag[ComplexEvent]("late-events")
208
209
// Fully configured pattern stream
210
val fullPatternStream = CEP.pattern(dataStream, complexPattern, complexComparator)
211
.inEventTime()
212
.sideOutputLateData(lateDataTag)
213
214
// Process with timeout handling
215
val timeoutTag = OutputTag[String]("timeouts")
216
val results = fullPatternStream.select(timeoutTag)(
217
// Timeout function
218
(pattern: Map[String, Iterable[ComplexEvent]], timestamp: Long) => {
219
s"Pattern timed out at $timestamp: ${pattern.keys.mkString(", ")}"
220
}
221
)(
222
// Success function
223
(pattern: Map[String, Iterable[ComplexEvent]]) => {
224
val highPriorityEvent = pattern("high-priority").head
225
val followingEvent = pattern("any-event").head
226
s"Complex pattern: ${highPriorityEvent.id} -> ${followingEvent.id}"
227
}
228
)
229
230
// Handle all output streams
231
results.print("Matches: ")
232
results.getSideOutput(timeoutTag).print("Timeouts: ")
233
results.getSideOutput(lateDataTag).print("Late data: ")
234
```
235
236
## Types
237
238
```scala { .api }
239
// Core types from Flink
240
import org.apache.flink.streaming.api.scala.DataStream
241
import org.apache.flink.streaming.api.scala.OutputTag
242
import org.apache.flink.cep.EventComparator
243
244
// Pattern stream wrapper
245
class PatternStream[T] {
246
// Internal Java pattern stream (not directly accessible)
247
private[flink] def wrappedPatternStream: org.apache.flink.cep.PatternStream[T]
248
}
249
250
// Event comparator interface
251
abstract class EventComparator[T] {
252
def compare(o1: T, o2: T): Int
253
}
254
```