or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md

pattern-stream-creation.mddocs/

0

# Pattern Stream Creation

1

2

Convert DataStreams into PatternStreams for complex event processing by applying patterns to data streams.

3

4

## Capabilities

5

6

### Basic Pattern Stream Creation

7

8

Create a PatternStream from a DataStream and Pattern definition.

9

10

```scala { .api }

11

object CEP {

12

/**

13

* Create PatternStream from DataStream and Pattern

14

* @param input DataStream containing the input events

15

* @param pattern Pattern specification to detect

16

* @tparam T Type of input events

17

* @return PatternStream for processing matches

18

*/

19

def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T]

20

}

21

```

22

23

**Usage Examples:**

24

25

```scala

26

import org.apache.flink.cep.scala.CEP

27

import org.apache.flink.cep.scala.pattern.Pattern

28

import org.apache.flink.streaming.api.scala._

29

30

case class Event(id: String, eventType: String, timestamp: Long)

31

32

val env = StreamExecutionEnvironment.getExecutionEnvironment

33

val dataStream: DataStream[Event] = env.fromElements(

34

Event("1", "start", 1000L),

35

Event("2", "middle", 2000L),

36

Event("3", "end", 3000L)

37

)

38

39

// Define pattern

40

val pattern = Pattern.begin[Event]("start")

41

.where(_.eventType == "start")

42

.next("middle")

43

.where(_.eventType == "middle")

44

.followedBy("end")

45

.where(_.eventType == "end")

46

47

// Create pattern stream

48

val patternStream = CEP.pattern(dataStream, pattern)

49

```

50

51

### Pattern Stream with Event Comparator

52

53

Create a PatternStream with custom event ordering for events with equal timestamps.

54

55

```scala { .api }

56

object CEP {

57

/**

58

* Create PatternStream with custom event comparator

59

* @param input DataStream containing the input events

60

* @param pattern Pattern specification to detect

61

* @param comparator Comparator for events with equal timestamps

62

* @tparam T Type of input events

63

* @return PatternStream for processing matches

64

*/

65

def pattern[T](

66

input: DataStream[T],

67

pattern: Pattern[T, _ <: T],

68

comparator: EventComparator[T]

69

): PatternStream[T]

70

}

71

```

72

73

**Usage Examples:**

74

75

```scala

76

import org.apache.flink.cep.EventComparator

77

78

case class PriorityEvent(priority: Int, eventType: String, timestamp: Long)

79

80

// Custom comparator for events with same timestamp

81

val priorityComparator = new EventComparator[PriorityEvent] {

82

override def compare(o1: PriorityEvent, o2: PriorityEvent): Int = {

83

// Higher priority events first

84

Integer.compare(o2.priority, o1.priority)

85

}

86

}

87

88

val dataStream: DataStream[PriorityEvent] = env.fromElements(

89

PriorityEvent(1, "low", 1000L),

90

PriorityEvent(5, "high", 1000L), // Same timestamp, higher priority

91

PriorityEvent(3, "medium", 2000L)

92

)

93

94

val pattern = Pattern.begin[PriorityEvent]("start")

95

.where(_.eventType.nonEmpty)

96

97

// Pattern stream with custom ordering

98

val patternStream = CEP.pattern(dataStream, pattern, priorityComparator)

99

```

100

101

### Time Characteristics Configuration

102

103

Configure time characteristics for the pattern stream processing.

104

105

```scala { .api }

106

class PatternStream[T] {

107

/**

108

* Use processing time for pattern detection

109

* @return PatternStream configured for processing time

110

*/

111

def inProcessingTime(): PatternStream[T]

112

113

/**

114

* Use event time for pattern detection

115

* @return PatternStream configured for event time

116

*/

117

def inEventTime(): PatternStream[T]

118

}

119

```

120

121

**Usage Examples:**

122

123

```scala

124

// Processing time pattern stream

125

val processingTimeStream = CEP.pattern(dataStream, pattern)

126

.inProcessingTime()

127

128

// Event time pattern stream (requires watermarks in source)

129

val eventTimeStream = CEP.pattern(dataStream, pattern)

130

.inEventTime()

131

```

132

133

### Late Data Handling

134

135

Configure side output for late arriving data that misses pattern windows.

136

137

```scala { .api }

138

class PatternStream[T] {

139

/**

140

* Configure side output for late data

141

* @param lateDataOutputTag OutputTag for late events

142

* @return PatternStream with late data handling

143

*/

144

def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T]

145

}

146

```

147

148

**Usage Examples:**

149

150

```scala

151

import org.apache.flink.streaming.api.scala.OutputTag

152

153

case class TimestampedEvent(data: String, eventTime: Long)

154

155

val lateDataTag = OutputTag[TimestampedEvent]("late-data")

156

157

val patternStream = CEP.pattern(dataStream, pattern)

158

.inEventTime()

159

.sideOutputLateData(lateDataTag)

160

161

// Process main results

162

val results = patternStream.select { pattern =>

163

// Process matched patterns

164

pattern.toString

165

}

166

167

// Handle late data separately

168

val lateData = results.getSideOutput(lateDataTag)

169

lateData.print("Late data: ")

170

```

171

172

### Advanced Pattern Stream Configuration

173

174

Combine multiple configuration options for comprehensive pattern stream setup.

175

176

**Usage Examples:**

177

178

```scala

179

import org.apache.flink.cep.EventComparator

180

import org.apache.flink.streaming.api.scala.OutputTag

181

import java.time.Duration

182

183

case class ComplexEvent(

184

id: String,

185

eventType: String,

186

priority: Int,

187

eventTime: Long,

188

processingTime: Long

189

)

190

191

// Custom comparator considering both priority and processing time

192

val complexComparator = new EventComparator[ComplexEvent] {

193

override def compare(o1: ComplexEvent, o2: ComplexEvent): Int = {

194

val priorityComp = Integer.compare(o2.priority, o1.priority)

195

if (priorityComp != 0) priorityComp

196

else Long.compare(o1.processingTime, o2.processingTime)

197

}

198

}

199

200

// Complex pattern with time window

201

val complexPattern = Pattern.begin[ComplexEvent]("high-priority")

202

.where(_.priority >= 8)

203

.followedBy("any-event")

204

.where(_.eventType.nonEmpty)

205

.within(Duration.ofMinutes(5))

206

207

val lateDataTag = OutputTag[ComplexEvent]("late-events")

208

209

// Fully configured pattern stream

210

val fullPatternStream = CEP.pattern(dataStream, complexPattern, complexComparator)

211

.inEventTime()

212

.sideOutputLateData(lateDataTag)

213

214

// Process with timeout handling

215

val timeoutTag = OutputTag[String]("timeouts")

216

val results = fullPatternStream.select(timeoutTag)(

217

// Timeout function

218

(pattern: Map[String, Iterable[ComplexEvent]], timestamp: Long) => {

219

s"Pattern timed out at $timestamp: ${pattern.keys.mkString(", ")}"

220

}

221

)(

222

// Success function

223

(pattern: Map[String, Iterable[ComplexEvent]]) => {

224

val highPriorityEvent = pattern("high-priority").head

225

val followingEvent = pattern("any-event").head

226

s"Complex pattern: ${highPriorityEvent.id} -> ${followingEvent.id}"

227

}

228

)

229

230

// Handle all output streams

231

results.print("Matches: ")

232

results.getSideOutput(timeoutTag).print("Timeouts: ")

233

results.getSideOutput(lateDataTag).print("Late data: ")

234

```

235

236

## Types

237

238

```scala { .api }

239

// Core types from Flink

240

import org.apache.flink.streaming.api.scala.DataStream

241

import org.apache.flink.streaming.api.scala.OutputTag

242

import org.apache.flink.cep.EventComparator

243

244

// Pattern stream wrapper

245

class PatternStream[T] {

246

// Internal Java pattern stream (not directly accessible)

247

private[flink] def wrappedPatternStream: org.apache.flink.cep.PatternStream[T]

248

}

249

250

// Event comparator interface

251

abstract class EventComparator[T] {

252

def compare(o1: T, o2: T): Int

253

}

254

```