or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md

timeout-handling.mddocs/

0

# Timeout Handling

1

2

Handle partial pattern matches that timeout with side outputs for comprehensive event processing, allowing separation of successful matches from timed-out partial matches.

3

4

## Capabilities

5

6

### Select with Timeout (Side Output)

7

8

Process patterns with timeout handling using side outputs for timed-out partial matches.

9

10

```scala { .api }

11

class PatternStream[T] {

12

/**

13

* Select with timeout using side output (recommended approach)

14

* @param outputTag OutputTag for timeout events

15

* @param patternTimeoutFunction Function to process timed-out patterns

16

* @param patternSelectFunction Function to process successful matches

17

* @tparam L Type of timeout result

18

* @tparam R Type of success result

19

* @return DataStream with successful matches, timeouts available via side output

20

*/

21

def select[L: TypeInformation, R: TypeInformation](

22

outputTag: OutputTag[L],

23

patternTimeoutFunction: PatternTimeoutFunction[T, L],

24

patternSelectFunction: PatternSelectFunction[T, R]

25

): DataStream[R]

26

27

/**

28

* Select with timeout using Scala functions and side output

29

* @param outputTag OutputTag for timeout events

30

* @param patternTimeoutFunction Scala function for timeouts

31

* @param patternSelectFunction Scala function for matches

32

* @tparam L Type of timeout result

33

* @tparam R Type of success result

34

* @return DataStream with successful matches

35

*/

36

def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(

37

patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L

38

)(

39

patternSelectFunction: Map[String, Iterable[T]] => R

40

): DataStream[R]

41

}

42

```

43

44

**Usage Examples:**

45

46

```scala

47

import org.apache.flink.streaming.api.scala.OutputTag

48

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}

49

import java.util.{List => JList, Map => JMap}

50

import java.time.Duration

51

52

case class LoginEvent(userId: String, timestamp: Long, sessionId: String)

53

case class CompletedSession(userId: String, sessionId: String, startTime: Long, endTime: Long)

54

case class IncompleteSession(userId: String, sessionId: String, startTime: Long, timeoutTime: Long)

55

56

// Pattern with time window

57

val sessionPattern = Pattern.begin[LoginEvent]("login")

58

.where(_.sessionId.nonEmpty)

59

.followedBy("logout")

60

.where(event => event.sessionId.nonEmpty)

61

.within(Duration.ofMinutes(30)) // Sessions must complete within 30 minutes

62

63

// Define timeout output tag

64

val timeoutTag = OutputTag[IncompleteSession]("session-timeouts")

65

66

// Scala function approach

67

val sessionResults = CEP.pattern(loginStream, sessionPattern)

68

.select(timeoutTag)(

69

// Timeout handler - receives partial pattern and timeout timestamp

70

(pattern: Map[String, Iterable[LoginEvent]], timeoutTimestamp: Long) => {

71

val loginEvent = pattern("login").head

72

IncompleteSession(

73

loginEvent.userId,

74

loginEvent.sessionId,

75

loginEvent.timestamp,

76

timeoutTimestamp

77

)

78

}

79

)(

80

// Success handler - receives complete pattern

81

(pattern: Map[String, Iterable[LoginEvent]]) => {

82

val loginEvent = pattern("login").head

83

val logoutEvent = pattern("logout").head

84

CompletedSession(

85

loginEvent.userId,

86

loginEvent.sessionId,

87

loginEvent.timestamp,

88

logoutEvent.timestamp

89

)

90

}

91

)

92

93

// Process main results and timeouts separately

94

sessionResults.print("Completed sessions: ")

95

sessionResults.getSideOutput(timeoutTag).print("Timed-out sessions: ")

96

97

// Java interface approach

98

val timeoutFunction = new PatternTimeoutFunction[LoginEvent, String] {

99

override def timeout(pattern: JMap[String, JList[LoginEvent]], timeoutTimestamp: Long): String = {

100

val loginEvent = pattern.get("login").get(0)

101

s"Session timeout: ${loginEvent.userId} at $timeoutTimestamp"

102

}

103

}

104

105

val selectFunction = new PatternSelectFunction[LoginEvent, String] {

106

override def select(pattern: JMap[String, JList[LoginEvent]]): String = {

107

val loginEvent = pattern.get("login").get(0)

108

val logoutEvent = pattern.get("logout").get(0)

109

s"Completed session: ${loginEvent.userId} duration ${logoutEvent.timestamp - loginEvent.timestamp}ms"

110

}

111

}

112

113

val javaTimeoutTag = OutputTag[String]("java-timeouts")

114

val javaResults = CEP.pattern(loginStream, sessionPattern)

115

.select(javaTimeoutTag, timeoutFunction, selectFunction)

116

```

117

118

### Flat Select with Timeout (Side Output)

119

120

Process patterns with timeout handling that can emit multiple results per match.

121

122

```scala { .api }

123

class PatternStream[T] {

124

/**

125

* Flat select with timeout using side output

126

* @param outputTag OutputTag for timeout events

127

* @param patternFlatTimeoutFunction Function to process timed-out patterns

128

* @param patternFlatSelectFunction Function to process successful matches

129

* @tparam L Type of timeout result

130

* @tparam R Type of success result

131

* @return DataStream with successful matches

132

*/

133

def flatSelect[L: TypeInformation, R: TypeInformation](

134

outputTag: OutputTag[L],

135

patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],

136

patternFlatSelectFunction: PatternFlatSelectFunction[T, R]

137

): DataStream[R]

138

139

/**

140

* Flat select with timeout using Scala functions and side output

141

* @param outputTag OutputTag for timeout events

142

* @param patternFlatTimeoutFunction Scala function for timeouts

143

* @param patternFlatSelectFunction Scala function for matches

144

* @tparam L Type of timeout result

145

* @tparam R Type of success result

146

* @return DataStream with successful matches

147

*/

148

def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(

149

patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit

150

)(

151

patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit

152

): DataStream[R]

153

}

154

```

155

156

**Usage Examples:**

157

158

```scala

159

import org.apache.flink.util.Collector

160

import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}

161

162

case class TransactionEvent(accountId: String, amount: Double, location: String, timestamp: Long)

163

case class SuspiciousActivity(accountId: String, reason: String, amount: Double, locations: Set[String])

164

case class TimeoutAlert(accountId: String, partialAmount: Double, duration: Long)

165

166

// Pattern for detecting suspicious transaction sequences

167

val suspiciousPattern = Pattern.begin[TransactionEvent]("transactions")

168

.where(_.amount > 1000)

169

.oneOrMore

170

.within(Duration.ofMinutes(10))

171

172

val timeoutTag = OutputTag[TimeoutAlert]("suspicious-timeouts")

173

174

// Scala flat select with timeout

175

val suspiciousResults = CEP.pattern(transactionStream, suspiciousPattern)

176

.flatSelect(timeoutTag)(

177

// Timeout handler - can emit multiple timeout alerts

178

(pattern: Map[String, Iterable[TransactionEvent]], timeoutTimestamp: Long, out: Collector[TimeoutAlert]) => {

179

val transactions = pattern("transactions")

180

val accountId = transactions.head.accountId

181

val totalAmount = transactions.map(_.amount).sum

182

val firstTransaction = transactions.minBy(_.timestamp)

183

val duration = timeoutTimestamp - firstTransaction.timestamp

184

185

// Emit timeout alert

186

out.collect(TimeoutAlert(accountId, totalAmount, duration))

187

188

// Additional timeout processing if needed

189

if (totalAmount > 5000) {

190

out.collect(TimeoutAlert(s"$accountId-high-value", totalAmount, duration))

191

}

192

}

193

)(

194

// Success handler - can emit multiple suspicious activity reports

195

(pattern: Map[String, Iterable[TransactionEvent]], out: Collector[SuspiciousActivity]) => {

196

val transactions = pattern("transactions")

197

val accountId = transactions.head.accountId

198

val totalAmount = transactions.map(_.amount).sum

199

val locations = transactions.map(_.location).toSet

200

201

// Emit volume-based alert

202

if (totalAmount > 10000) {

203

out.collect(SuspiciousActivity(accountId, "high-volume", totalAmount, locations))

204

}

205

206

// Emit location-based alert

207

if (locations.size > 3) {

208

out.collect(SuspiciousActivity(accountId, "multiple-locations", totalAmount, locations))

209

}

210

211

// Emit frequency-based alert

212

if (transactions.size > 5) {

213

out.collect(SuspiciousActivity(accountId, "high-frequency", totalAmount, locations))

214

}

215

}

216

)

217

218

// Process results and timeouts

219

suspiciousResults.print("Suspicious activities: ")

220

suspiciousResults.getSideOutput(timeoutTag).print("Timeout alerts: ")

221

```

222

223

### Deprecated Either-Based Timeout Handling

224

225

Legacy timeout handling using Either types (deprecated in favor of side outputs).

226

227

```scala { .api }

228

class PatternStream[T] {

229

/**

230

* Select with timeout using Either return type (deprecated)

231

* @param patternTimeoutFunction Function for timeout processing

232

* @param patternSelectFunction Function for match processing

233

* @tparam L Type of timeout result

234

* @tparam R Type of success result

235

* @return DataStream of Either[L, R] containing both results

236

*/

237

@deprecated("Use version with side outputs", "1.18.0")

238

def select[L: TypeInformation, R: TypeInformation](

239

patternTimeoutFunction: PatternTimeoutFunction[T, L],

240

patternSelectFunction: PatternSelectFunction[T, R]

241

): DataStream[Either[L, R]]

242

243

/**

244

* Flat select with timeout using Either return type (deprecated)

245

* @param patternFlatTimeoutFunction Function for timeout processing

246

* @param patternFlatSelectFunction Function for match processing

247

* @tparam L Type of timeout result

248

* @tparam R Type of success result

249

* @return DataStream of Either[L, R] containing both results

250

*/

251

@deprecated("Use version with side outputs", "1.18.0")

252

def flatSelect[L: TypeInformation, R: TypeInformation](

253

patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],

254

patternFlatSelectFunction: PatternFlatSelectFunction[T, R]

255

): DataStream[Either[L, R]]

256

}

257

```

258

259

**Usage Examples:**

260

261

```scala

262

// Deprecated Either-based approach (not recommended)

263

val eitherResults = CEP.pattern(dataStream, pattern)

264

.select(timeoutFunction, selectFunction)

265

266

// Separate Left (timeout) and Right (success) results

267

val (timeouts, successes) = eitherResults.split(

268

either => either.isLeft,

269

either => either.isRight

270

)

271

272

val timeoutResults = timeouts.map(_.left.get)

273

val successResults = successes.map(_.right.get)

274

```

275

276

### Advanced Timeout Scenarios

277

278

Complex timeout handling with multiple patterns and cascading timeouts.

279

280

**Usage Examples:**

281

282

```scala

283

case class OrderEvent(orderId: String, stage: String, timestamp: Long, customerId: String)

284

case class OrderProgress(orderId: String, completedStages: List[String], timeSpent: Long)

285

case class OrderTimeout(orderId: String, lastStage: String, customerId: String, timeoutAt: Long)

286

287

// Multi-stage order processing pattern

288

val orderPattern = Pattern.begin[OrderEvent]("created")

289

.where(_.stage == "created")

290

.followedBy("paid")

291

.where(_.stage == "paid")

292

.followedBy("shipped")

293

.where(_.stage == "shipped")

294

.followedBy("delivered")

295

.where(_.stage == "delivered")

296

.within(Duration.ofDays(7)) // Orders should complete within a week

297

298

val orderTimeoutTag = OutputTag[OrderTimeout]("order-timeouts")

299

300

val orderResults = CEP.pattern(orderStream, orderPattern)

301

.select(orderTimeoutTag)(

302

// Timeout handler - determine which stage failed

303

(pattern: Map[String, Iterable[OrderEvent]], timeoutTimestamp: Long) => {

304

val allEvents = pattern.values.flatten.toList.sortBy(_.timestamp)

305

val lastEvent = allEvents.lastOption

306

307

lastEvent match {

308

case Some(event) =>

309

OrderTimeout(event.orderId, event.stage, event.customerId, timeoutTimestamp)

310

case None =>

311

// Should not happen, but handle gracefully

312

OrderTimeout("unknown", "none", "unknown", timeoutTimestamp)

313

}

314

}

315

)(

316

// Success handler

317

(pattern: Map[String, Iterable[OrderEvent]]) => {

318

val created = pattern("created").head

319

val delivered = pattern("delivered").head

320

val stages = List("created", "paid", "shipped", "delivered")

321

val timeSpent = delivered.timestamp - created.timestamp

322

323

OrderProgress(created.orderId, stages, timeSpent)

324

}

325

)

326

327

// Advanced timeout analysis

328

val timeoutAnalysis = orderResults.getSideOutput(orderTimeoutTag)

329

.map { timeout =>

330

val stage = timeout.lastStage match {

331

case "created" => "Payment failed"

332

case "paid" => "Shipping failed"

333

case "shipped" => "Delivery failed"

334

case _ => "Unknown failure"

335

}

336

s"Order ${timeout.orderId}: $stage (customer: ${timeout.customerId})"

337

}

338

339

orderResults.print("Completed orders: ")

340

timeoutAnalysis.print("Failed orders: ")

341

```

342

343

## Types

344

345

```scala { .api }

346

// Timeout function interfaces from Java CEP

347

import org.apache.flink.cep.PatternTimeoutFunction

348

import org.apache.flink.cep.PatternFlatTimeoutFunction

349

350

// Side output support

351

import org.apache.flink.streaming.api.scala.OutputTag

352

353

// Collection types

354

import java.util.{List => JList, Map => JMap}

355

import scala.collection.Map

356

357

// Abstract timeout function interfaces

358

abstract class PatternTimeoutFunction[T, L] {

359

def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L

360

}

361

362

abstract class PatternFlatTimeoutFunction[T, L] {

363

def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long, out: Collector[L]): Unit

364

}

365

366

// Either type for deprecated methods

367

sealed abstract class Either[+A, +B]

368

case class Left[+A](value: A) extends Either[A, Nothing]

369

case class Right[+B](value: B) extends Either[Nothing, B]

370

```