or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md

data-stream.mddocs/

0

# Data Stream

1

2

DataStream represents a stream of elements of the same type. It provides the core stream processing operations for transforming, filtering, and routing data through your streaming application with type safety.

3

4

## Capabilities

5

6

### Basic Transformations

7

8

Core transformation operations that modify stream elements.

9

10

```scala { .api }

11

/**

12

* Basic stream transformations

13

*/

14

class DataStream[T] {

15

def map[R](mapper: T => R): DataStream[R]

16

def flatMap[R](flatMapper: T => TraversableOnce[R]): DataStream[R]

17

def filter(predicate: T => Boolean): DataStream[T]

18

def project(fieldIndexes: Int*): DataStream[Product]

19

}

20

```

21

22

**Usage Examples:**

23

24

```scala

25

import org.apache.flink.streaming.api.scala._

26

27

val dataStream = env.fromElements(1, 2, 3, 4, 5)

28

29

// Transform elements

30

val doubled = dataStream.map(_ * 2)

31

32

// Filter elements

33

val evenNumbers = dataStream.filter(_ % 2 == 0)

34

35

// Flat map for one-to-many transformations

36

val words = env.fromElements("hello world", "flink scala")

37

val splitWords = words.flatMap(_.split(" "))

38

```

39

40

### Keying Operations

41

42

Operations for partitioning streams by key for stateful processing.

43

44

```scala { .api }

45

/**

46

* Stream keying operations

47

*/

48

class DataStream[T] {

49

def keyBy[K](keySelector: T => K): KeyedStream[T, K]

50

def keyBy(fields: String*): KeyedStream[T, Tuple]

51

def keyBy(fields: Int*): KeyedStream[T, Tuple]

52

}

53

```

54

55

**Usage Examples:**

56

57

```scala

58

case class User(id: Int, name: String, age: Int)

59

60

val users = env.fromElements(

61

User(1, "Alice", 25),

62

User(2, "Bob", 30),

63

User(1, "Alice", 26)

64

)

65

66

// Key by field

67

val keyedByUserId = users.keyBy(_.id)

68

69

// Key by field name (for case classes)

70

val keyedByName = users.keyBy("name")

71

72

// Key by field index

73

val keyedByAge = users.keyBy(2)

74

```

75

76

### Stream Unions and Connections

77

78

Operations for combining multiple streams.

79

80

```scala { .api }

81

/**

82

* Stream combination operations

83

*/

84

class DataStream[T] {

85

def union(otherStreams: DataStream[T]*): DataStream[T]

86

def connect[T2](otherStream: DataStream[T2]): ConnectedStreams[T, T2]

87

def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]

88

def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]

89

}

90

```

91

92

**Usage Examples:**

93

94

```scala

95

val stream1 = env.fromElements(1, 2, 3)

96

val stream2 = env.fromElements(4, 5, 6)

97

val stream3 = env.fromElements(7, 8, 9)

98

99

// Union streams of same type

100

val unionStream = stream1.union(stream2, stream3)

101

102

// Connect streams of different types

103

val connectedStream = stream1.connect(env.fromElements("a", "b", "c"))

104

105

// Join two streams

106

val joinedStream = stream1.join(stream2)

107

```

108

109

### Side Outputs

110

111

Split streams into multiple output streams using OutputTags.

112

113

```scala { .api }

114

/**

115

* Side output operations

116

*/

117

class DataStream[T] {

118

def getSideOutput[X](sideOutputTag: OutputTag[X]): DataStream[X]

119

def split(splitter: T => TraversableOnce[String]): SplitStream[T]

120

}

121

```

122

123

**Usage Examples:**

124

125

```scala

126

val evenTag = OutputTag[Int]("even-numbers")

127

val oddTag = OutputTag[Int]("odd-numbers")

128

129

val numbers = env.fromElements(1, 2, 3, 4, 5, 6)

130

131

val processedStream = numbers.process(new ProcessFunction[Int, String] {

132

override def processElement(value: Int, ctx: ProcessFunction.Context, out: Collector[String]): Unit = {

133

if (value % 2 == 0) {

134

ctx.output(evenTag, value)

135

} else {

136

ctx.output(oddTag, value)

137

}

138

out.collect(s"processed: $value")

139

}

140

})

141

142

val evenNumbers = processedStream.getSideOutput(evenTag)

143

val oddNumbers = processedStream.getSideOutput(oddTag)

144

```

145

146

### Sinks and Output

147

148

Operations for sending stream data to external systems.

149

150

```scala { .api }

151

/**

152

* Output and sink operations

153

*/

154

class DataStream[T] {

155

def print(): DataStreamSink[T]

156

def print(sinkIdentifier: String): DataStreamSink[T]

157

def printToErr(): DataStreamSink[T]

158

def printToErr(sinkIdentifier: String): DataStreamSink[T]

159

def writeAsText(path: String): DataStreamSink[T]

160

def writeAsText(path: String, writeMode: WriteMode): DataStreamSink[T]

161

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]

162

def sinkTo(sink: Sink[T]): DataStreamSink[T]

163

}

164

```

165

166

**Usage Examples:**

167

168

```scala

169

val dataStream = env.fromElements("hello", "world", "flink")

170

171

// Print to console

172

dataStream.print()

173

174

// Print with identifier

175

dataStream.print("my-output")

176

177

// Write to file

178

dataStream.writeAsText("output/result.txt")

179

180

// Custom sink

181

dataStream.addSink(new MyCustomSinkFunction())

182

```

183

184

### Stream Configuration

185

186

Methods for configuring stream properties.

187

188

```scala { .api }

189

/**

190

* Stream configuration operations

191

*/

192

class DataStream[T] {

193

def setParallelism(parallelism: Int): DataStream[T]

194

def getParallelism: Int

195

def setMaxParallelism(maxParallelism: Int): DataStream[T]

196

def getMaxParallelism: Int

197

def name(name: String): DataStream[T]

198

def uid(uid: String): DataStream[T]

199

def setUidHash(uidHash: String): DataStream[T]

200

def disableChaining(): DataStream[T]

201

def startNewChain(): DataStream[T]

202

def slotSharingGroup(slotSharingGroup: String): DataStream[T]

203

}

204

```

205

206

### Iteration Operations

207

208

Operations for creating iterative streams (loops).

209

210

```scala { .api }

211

/**

212

* Iteration operations

213

*/

214

class DataStream[T] {

215

def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R]

216

def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), maxWaitTimeMillis: Long): DataStream[R]

217

}

218

```

219

220

### Time and Watermark Operations

221

222

Configure event time processing and watermark generation.

223

224

```scala { .api }

225

/**

226

* Time and watermark operations

227

*/

228

class DataStream[T] {

229

def assignAscendingTimestamps(timestampExtractor: T => Long): DataStream[T]

230

def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]

231

}

232

```

233

234

**Usage Examples:**

235

236

```scala

237

case class Event(timestamp: Long, data: String)

238

239

val events = env.fromElements(

240

Event(1000L, "first"),

241

Event(2000L, "second"),

242

Event(3000L, "third")

243

)

244

245

// Assign ascending timestamps

246

val timestampedEvents = events.assignAscendingTimestamps(_.timestamp)

247

248

// Custom watermark strategy

249

val watermarkedEvents = events.assignTimestampsAndWatermarks(

250

WatermarkStrategy

251

.forBoundedOutOfOrderness(Duration.ofSeconds(5))

252

.withTimestampAssigner(new SerializableTimestampAssigner[Event] {

253

override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp

254

})

255

)

256

```

257

258

### Process Functions

259

260

Apply custom processing logic using ProcessFunction.

261

262

```scala { .api }

263

/**

264

* Process function operations

265

*/

266

class DataStream[T] {

267

def process[R](processFunction: ProcessFunction[T, R]): DataStream[R]

268

}

269

```

270

271

## Types

272

273

```scala { .api }

274

// Main stream class

275

class DataStream[T]

276

277

// Related stream types

278

class KeyedStream[T, K]

279

class ConnectedStreams[T1, T2]

280

class JoinedStreams[T1, T2]

281

class CoGroupedStreams[T1, T2]

282

class SplitStream[T]

283

284

// Sink types

285

class DataStreamSink[T]

286

287

// Output and utility types

288

class OutputTag[T](id: String)

289

290

// Time-related types

291

trait WatermarkStrategy[T]

292

trait SerializableTimestampAssigner[T]

293

294

// Function types

295

trait ProcessFunction[I, O]

296

trait SinkFunction[T]

297

trait Sink[T]

298

```