0
# Timeout Handling
1
2
Handle partial pattern matches that timeout with side outputs for comprehensive event processing, allowing separation of successful matches from timed-out partial matches.
3
4
## Capabilities
5
6
### Select with Timeout (Side Output)
7
8
Process patterns with timeout handling using side outputs for timed-out partial matches.
9
10
```scala { .api }
11
class PatternStream[T] {
12
/**
13
* Select with timeout using side output (recommended approach)
14
* @param outputTag OutputTag for timeout events
15
* @param patternTimeoutFunction Function to process timed-out patterns
16
* @param patternSelectFunction Function to process successful matches
17
* @tparam L Type of timeout result
18
* @tparam R Type of success result
19
* @return DataStream with successful matches, timeouts available via side output
20
*/
21
def select[L: TypeInformation, R: TypeInformation](
22
outputTag: OutputTag[L],
23
patternTimeoutFunction: PatternTimeoutFunction[T, L],
24
patternSelectFunction: PatternSelectFunction[T, R]
25
): DataStream[R]
26
27
/**
28
* Select with timeout using Scala functions and side output
29
* @param outputTag OutputTag for timeout events
30
* @param patternTimeoutFunction Scala function for timeouts
31
* @param patternSelectFunction Scala function for matches
32
* @tparam L Type of timeout result
33
* @tparam R Type of success result
34
* @return DataStream with successful matches
35
*/
36
def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
37
patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L
38
)(
39
patternSelectFunction: Map[String, Iterable[T]] => R
40
): DataStream[R]
41
}
42
```
43
44
**Usage Examples:**
45
46
```scala
47
import org.apache.flink.streaming.api.scala.OutputTag
48
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
49
import java.util.{List => JList, Map => JMap}
50
import java.time.Duration
51
52
case class LoginEvent(userId: String, timestamp: Long, sessionId: String)
53
case class CompletedSession(userId: String, sessionId: String, startTime: Long, endTime: Long)
54
case class IncompleteSession(userId: String, sessionId: String, startTime: Long, timeoutTime: Long)
55
56
// Pattern with time window
57
val sessionPattern = Pattern.begin[LoginEvent]("login")
58
.where(_.sessionId.nonEmpty)
59
.followedBy("logout")
60
.where(event => event.sessionId.nonEmpty)
61
.within(Duration.ofMinutes(30)) // Sessions must complete within 30 minutes
62
63
// Define timeout output tag
64
val timeoutTag = OutputTag[IncompleteSession]("session-timeouts")
65
66
// Scala function approach
67
val sessionResults = CEP.pattern(loginStream, sessionPattern)
68
.select(timeoutTag)(
69
// Timeout handler - receives partial pattern and timeout timestamp
70
(pattern: Map[String, Iterable[LoginEvent]], timeoutTimestamp: Long) => {
71
val loginEvent = pattern("login").head
72
IncompleteSession(
73
loginEvent.userId,
74
loginEvent.sessionId,
75
loginEvent.timestamp,
76
timeoutTimestamp
77
)
78
}
79
)(
80
// Success handler - receives complete pattern
81
(pattern: Map[String, Iterable[LoginEvent]]) => {
82
val loginEvent = pattern("login").head
83
val logoutEvent = pattern("logout").head
84
CompletedSession(
85
loginEvent.userId,
86
loginEvent.sessionId,
87
loginEvent.timestamp,
88
logoutEvent.timestamp
89
)
90
}
91
)
92
93
// Process main results and timeouts separately
94
sessionResults.print("Completed sessions: ")
95
sessionResults.getSideOutput(timeoutTag).print("Timed-out sessions: ")
96
97
// Java interface approach
98
val timeoutFunction = new PatternTimeoutFunction[LoginEvent, String] {
99
override def timeout(pattern: JMap[String, JList[LoginEvent]], timeoutTimestamp: Long): String = {
100
val loginEvent = pattern.get("login").get(0)
101
s"Session timeout: ${loginEvent.userId} at $timeoutTimestamp"
102
}
103
}
104
105
val selectFunction = new PatternSelectFunction[LoginEvent, String] {
106
override def select(pattern: JMap[String, JList[LoginEvent]]): String = {
107
val loginEvent = pattern.get("login").get(0)
108
val logoutEvent = pattern.get("logout").get(0)
109
s"Completed session: ${loginEvent.userId} duration ${logoutEvent.timestamp - loginEvent.timestamp}ms"
110
}
111
}
112
113
val javaTimeoutTag = OutputTag[String]("java-timeouts")
114
val javaResults = CEP.pattern(loginStream, sessionPattern)
115
.select(javaTimeoutTag, timeoutFunction, selectFunction)
116
```
117
118
### Flat Select with Timeout (Side Output)
119
120
Process patterns with timeout handling that can emit multiple results per match.
121
122
```scala { .api }
123
class PatternStream[T] {
124
/**
125
* Flat select with timeout using side output
126
* @param outputTag OutputTag for timeout events
127
* @param patternFlatTimeoutFunction Function to process timed-out patterns
128
* @param patternFlatSelectFunction Function to process successful matches
129
* @tparam L Type of timeout result
130
* @tparam R Type of success result
131
* @return DataStream with successful matches
132
*/
133
def flatSelect[L: TypeInformation, R: TypeInformation](
134
outputTag: OutputTag[L],
135
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
136
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
137
): DataStream[R]
138
139
/**
140
* Flat select with timeout using Scala functions and side output
141
* @param outputTag OutputTag for timeout events
142
* @param patternFlatTimeoutFunction Scala function for timeouts
143
* @param patternFlatSelectFunction Scala function for matches
144
* @tparam L Type of timeout result
145
* @tparam R Type of success result
146
* @return DataStream with successful matches
147
*/
148
def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
149
patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit
150
)(
151
patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit
152
): DataStream[R]
153
}
154
```
155
156
**Usage Examples:**
157
158
```scala
159
import org.apache.flink.util.Collector
160
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
161
162
case class TransactionEvent(accountId: String, amount: Double, location: String, timestamp: Long)
163
case class SuspiciousActivity(accountId: String, reason: String, amount: Double, locations: Set[String])
164
case class TimeoutAlert(accountId: String, partialAmount: Double, duration: Long)
165
166
// Pattern for detecting suspicious transaction sequences
167
val suspiciousPattern = Pattern.begin[TransactionEvent]("transactions")
168
.where(_.amount > 1000)
169
.oneOrMore
170
.within(Duration.ofMinutes(10))
171
172
val timeoutTag = OutputTag[TimeoutAlert]("suspicious-timeouts")
173
174
// Scala flat select with timeout
175
val suspiciousResults = CEP.pattern(transactionStream, suspiciousPattern)
176
.flatSelect(timeoutTag)(
177
// Timeout handler - can emit multiple timeout alerts
178
(pattern: Map[String, Iterable[TransactionEvent]], timeoutTimestamp: Long, out: Collector[TimeoutAlert]) => {
179
val transactions = pattern("transactions")
180
val accountId = transactions.head.accountId
181
val totalAmount = transactions.map(_.amount).sum
182
val firstTransaction = transactions.minBy(_.timestamp)
183
val duration = timeoutTimestamp - firstTransaction.timestamp
184
185
// Emit timeout alert
186
out.collect(TimeoutAlert(accountId, totalAmount, duration))
187
188
// Additional timeout processing if needed
189
if (totalAmount > 5000) {
190
out.collect(TimeoutAlert(s"$accountId-high-value", totalAmount, duration))
191
}
192
}
193
)(
194
// Success handler - can emit multiple suspicious activity reports
195
(pattern: Map[String, Iterable[TransactionEvent]], out: Collector[SuspiciousActivity]) => {
196
val transactions = pattern("transactions")
197
val accountId = transactions.head.accountId
198
val totalAmount = transactions.map(_.amount).sum
199
val locations = transactions.map(_.location).toSet
200
201
// Emit volume-based alert
202
if (totalAmount > 10000) {
203
out.collect(SuspiciousActivity(accountId, "high-volume", totalAmount, locations))
204
}
205
206
// Emit location-based alert
207
if (locations.size > 3) {
208
out.collect(SuspiciousActivity(accountId, "multiple-locations", totalAmount, locations))
209
}
210
211
// Emit frequency-based alert
212
if (transactions.size > 5) {
213
out.collect(SuspiciousActivity(accountId, "high-frequency", totalAmount, locations))
214
}
215
}
216
)
217
218
// Process results and timeouts
219
suspiciousResults.print("Suspicious activities: ")
220
suspiciousResults.getSideOutput(timeoutTag).print("Timeout alerts: ")
221
```
222
223
### Deprecated Either-Based Timeout Handling
224
225
Legacy timeout handling using Either types (deprecated in favor of side outputs).
226
227
```scala { .api }
228
class PatternStream[T] {
229
/**
230
* Select with timeout using Either return type (deprecated)
231
* @param patternTimeoutFunction Function for timeout processing
232
* @param patternSelectFunction Function for match processing
233
* @tparam L Type of timeout result
234
* @tparam R Type of success result
235
* @return DataStream of Either[L, R] containing both results
236
*/
237
@deprecated("Use version with side outputs", "1.18.0")
238
def select[L: TypeInformation, R: TypeInformation](
239
patternTimeoutFunction: PatternTimeoutFunction[T, L],
240
patternSelectFunction: PatternSelectFunction[T, R]
241
): DataStream[Either[L, R]]
242
243
/**
244
* Flat select with timeout using Either return type (deprecated)
245
* @param patternFlatTimeoutFunction Function for timeout processing
246
* @param patternFlatSelectFunction Function for match processing
247
* @tparam L Type of timeout result
248
* @tparam R Type of success result
249
* @return DataStream of Either[L, R] containing both results
250
*/
251
@deprecated("Use version with side outputs", "1.18.0")
252
def flatSelect[L: TypeInformation, R: TypeInformation](
253
patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
254
patternFlatSelectFunction: PatternFlatSelectFunction[T, R]
255
): DataStream[Either[L, R]]
256
}
257
```
258
259
**Usage Examples:**
260
261
```scala
262
// Deprecated Either-based approach (not recommended)
263
val eitherResults = CEP.pattern(dataStream, pattern)
264
.select(timeoutFunction, selectFunction)
265
266
// Separate Left (timeout) and Right (success) results
267
val (timeouts, successes) = eitherResults.split(
268
either => either.isLeft,
269
either => either.isRight
270
)
271
272
val timeoutResults = timeouts.map(_.left.get)
273
val successResults = successes.map(_.right.get)
274
```
275
276
### Advanced Timeout Scenarios
277
278
Complex timeout handling with multiple patterns and cascading timeouts.
279
280
**Usage Examples:**
281
282
```scala
283
case class OrderEvent(orderId: String, stage: String, timestamp: Long, customerId: String)
284
case class OrderProgress(orderId: String, completedStages: List[String], timeSpent: Long)
285
case class OrderTimeout(orderId: String, lastStage: String, customerId: String, timeoutAt: Long)
286
287
// Multi-stage order processing pattern
288
val orderPattern = Pattern.begin[OrderEvent]("created")
289
.where(_.stage == "created")
290
.followedBy("paid")
291
.where(_.stage == "paid")
292
.followedBy("shipped")
293
.where(_.stage == "shipped")
294
.followedBy("delivered")
295
.where(_.stage == "delivered")
296
.within(Duration.ofDays(7)) // Orders should complete within a week
297
298
val orderTimeoutTag = OutputTag[OrderTimeout]("order-timeouts")
299
300
val orderResults = CEP.pattern(orderStream, orderPattern)
301
.select(orderTimeoutTag)(
302
// Timeout handler - determine which stage failed
303
(pattern: Map[String, Iterable[OrderEvent]], timeoutTimestamp: Long) => {
304
val allEvents = pattern.values.flatten.toList.sortBy(_.timestamp)
305
val lastEvent = allEvents.lastOption
306
307
lastEvent match {
308
case Some(event) =>
309
OrderTimeout(event.orderId, event.stage, event.customerId, timeoutTimestamp)
310
case None =>
311
// Should not happen, but handle gracefully
312
OrderTimeout("unknown", "none", "unknown", timeoutTimestamp)
313
}
314
}
315
)(
316
// Success handler
317
(pattern: Map[String, Iterable[OrderEvent]]) => {
318
val created = pattern("created").head
319
val delivered = pattern("delivered").head
320
val stages = List("created", "paid", "shipped", "delivered")
321
val timeSpent = delivered.timestamp - created.timestamp
322
323
OrderProgress(created.orderId, stages, timeSpent)
324
}
325
)
326
327
// Advanced timeout analysis
328
val timeoutAnalysis = orderResults.getSideOutput(orderTimeoutTag)
329
.map { timeout =>
330
val stage = timeout.lastStage match {
331
case "created" => "Payment failed"
332
case "paid" => "Shipping failed"
333
case "shipped" => "Delivery failed"
334
case _ => "Unknown failure"
335
}
336
s"Order ${timeout.orderId}: $stage (customer: ${timeout.customerId})"
337
}
338
339
orderResults.print("Completed orders: ")
340
timeoutAnalysis.print("Failed orders: ")
341
```
342
343
## Types
344
345
```scala { .api }
346
// Timeout function interfaces from Java CEP
347
import org.apache.flink.cep.PatternTimeoutFunction
348
import org.apache.flink.cep.PatternFlatTimeoutFunction
349
350
// Side output support
351
import org.apache.flink.streaming.api.scala.OutputTag
352
353
// Collection types
354
import java.util.{List => JList, Map => JMap}
355
import scala.collection.Map
356
357
// Abstract timeout function interfaces
358
abstract class PatternTimeoutFunction[T, L] {
359
def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L
360
}
361
362
abstract class PatternFlatTimeoutFunction[T, L] {
363
def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long, out: Collector[L]): Unit
364
}
365
366
// Either type for deprecated methods
367
sealed abstract class Either[+A, +B]
368
case class Left[+A](value: A) extends Either[A, Nothing]
369
case class Right[+B](value: B) extends Either[Nothing, B]
370
```