or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md

stream-transformations.mddocs/

0

# Stream Transformations and Operations

1

2

DataStream provides comprehensive transformation operations for processing stream elements, including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.

3

4

## Basic Transformations

5

6

### Map Operations

7

8

```scala { .api }

9

class DataStream[T] {

10

def map[R: TypeInformation](fun: T => R): DataStream[R]

11

def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]

12

}

13

```

14

15

Transform each element to a new element:

16

17

```scala

18

import org.apache.flink.streaming.api.scala._

19

import org.apache.flink.api.common.functions.MapFunction

20

21

val env = StreamExecutionEnvironment.getExecutionEnvironment

22

val stream = env.fromElements(1, 2, 3, 4, 5)

23

24

// Map with lambda function

25

val doubled = stream.map(_ * 2)

26

27

// Map with explicit function

28

val stringified = stream.map(x => s"Number: $x")

29

30

// Map with MapFunction interface

31

class SquareMapper extends MapFunction[Int, Int] {

32

override def map(value: Int): Int = value * value

33

}

34

val squared = stream.map(new SquareMapper)

35

36

// Map with case class transformation

37

case class User(id: Int, name: String)

38

val users = env.fromElements(1, 2, 3)

39

.map(id => User(id, s"User$id"))

40

```

41

42

### FlatMap Operations

43

44

```scala { .api }

45

class DataStream[T] {

46

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]

47

def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]

48

def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R]

49

}

50

```

51

52

Transform each element to zero or more elements:

53

54

```scala

55

import org.apache.flink.util.Collector

56

import org.apache.flink.api.common.functions.FlatMapFunction

57

58

val env = StreamExecutionEnvironment.getExecutionEnvironment

59

val sentences = env.fromElements("Hello world", "How are you", "Flink streaming")

60

61

// FlatMap with lambda returning TraversableOnce

62

val words = sentences.flatMap(_.split("\\s+"))

63

64

// FlatMap with lambda using Collector

65

val wordsWithCollector = sentences.flatMap { (sentence, out) =>

66

sentence.split("\\s+").foreach(out.collect)

67

}

68

69

// FlatMap with FlatMapFunction

70

class WordSplitter extends FlatMapFunction[String, String] {

71

override def flatMap(sentence: String, out: Collector[String]): Unit = {

72

sentence.toLowerCase.split("\\W+")

73

.filter(_.nonEmpty)

74

.foreach(out.collect)

75

}

76

}

77

val cleanWords = sentences.flatMap(new WordSplitter)

78

79

// FlatMap for conditional emission

80

val evenNumbers = env.fromElements(1, 2, 3, 4, 5, 6)

81

.flatMap(x => if (x % 2 == 0) Some(x) else None)

82

```

83

84

### Filter Operations

85

86

```scala { .api }

87

class DataStream[T] {

88

def filter(fun: T => Boolean): DataStream[T]

89

def filter(filter: FilterFunction[T]): DataStream[T]

90

}

91

```

92

93

Filter elements based on predicates:

94

95

```scala

96

import org.apache.flink.api.common.functions.FilterFunction

97

98

val env = StreamExecutionEnvironment.getExecutionEnvironment

99

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

100

101

// Filter with lambda

102

val evenNumbers = numbers.filter(_ % 2 == 0)

103

104

// Filter with complex condition

105

case class Person(name: String, age: Int)

106

val people = env.fromElements(

107

Person("Alice", 25),

108

Person("Bob", 17),

109

Person("Charlie", 30)

110

)

111

val adults = people.filter(_.age >= 18)

112

113

// Filter with FilterFunction

114

class PositiveFilter extends FilterFunction[Int] {

115

override def filter(value: Int): Boolean = value > 0

116

}

117

val positives = numbers.filter(new PositiveFilter)

118

```

119

120

## Advanced Transformations

121

122

### Process Functions

123

124

```scala { .api }

125

class DataStream[T] {

126

def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]

127

}

128

```

129

130

Low-level processing with access to timers and state:

131

132

```scala

133

import org.apache.flink.streaming.api.functions.ProcessFunction

134

import org.apache.flink.streaming.api.functions.ProcessFunction.Context

135

import org.apache.flink.util.Collector

136

137

class EventProcessor extends ProcessFunction[String, String] {

138

override def processElement(

139

value: String,

140

ctx: Context,

141

out: Collector[String]

142

): Unit = {

143

// Access current timestamp

144

val timestamp = ctx.timestamp()

145

146

// Access current watermark

147

val watermark = ctx.timerService().currentWatermark()

148

149

// Register timer for 10 seconds from now

150

ctx.timerService().registerProcessingTimeTimer(

151

ctx.timerService().currentProcessingTime() + 10000

152

)

153

154

// Emit result

155

out.collect(s"Processed: $value at $timestamp")

156

}

157

158

override def onTimer(

159

timestamp: Long,

160

ctx: OnTimerContext,

161

out: Collector[String]

162

): Unit = {

163

out.collect(s"Timer fired at $timestamp")

164

}

165

}

166

167

val env = StreamExecutionEnvironment.getExecutionEnvironment

168

val processed = env.socketTextStream("localhost", 9999)

169

.process(new EventProcessor)

170

```

171

172

## Side Outputs

173

174

### OutputTag and Side Output Streams

175

176

```scala { .api }

177

class DataStream[T] {

178

def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]

179

}

180

181

case class OutputTag[T](id: String)(implicit val typeInfo: TypeInformation[T])

182

```

183

184

Route different types of data to side outputs:

185

186

```scala

187

import org.apache.flink.streaming.api.scala.OutputTag

188

189

val env = StreamExecutionEnvironment.getExecutionEnvironment

190

191

// Define output tags

192

val evenTag = OutputTag[Int]("even-numbers")

193

val oddTag = OutputTag[Int]("odd-numbers")

194

195

class NumberSplitter extends ProcessFunction[Int, String] {

196

override def processElement(

197

value: Int,

198

ctx: Context,

199

out: Collector[String]

200

): Unit = {

201

if (value % 2 == 0) {

202

ctx.output(evenTag, value)

203

} else {

204

ctx.output(oddTag, value)

205

}

206

out.collect(s"Processed: $value")

207

}

208

}

209

210

val mainStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

211

.process(new NumberSplitter)

212

213

// Get side output streams

214

val evenNumbers = mainStream.getSideOutput(evenTag)

215

val oddNumbers = mainStream.getSideOutput(oddTag)

216

217

evenNumbers.print("Even")

218

oddNumbers.print("Odd")

219

mainStream.print("Main")

220

```

221

222

## Stream Splitting (Legacy)

223

224

### Split and Select Operations

225

226

```scala { .api }

227

class DataStream[T] {

228

def split(selector: OutputSelector[T]): SplitStream[T]

229

def split(fun: T => TraversableOnce[String]): SplitStream[T]

230

}

231

232

class SplitStream[T] {

233

def select(outputNames: String*): DataStream[T]

234

}

235

```

236

237

Split streams based on conditions (deprecated, use side outputs instead):

238

239

```scala

240

import org.apache.flink.streaming.api.collector.selector.OutputSelector

241

242

val env = StreamExecutionEnvironment.getExecutionEnvironment

243

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

244

245

// Split with OutputSelector

246

class EvenOddSelector extends OutputSelector[Int] {

247

override def select(value: Int): java.lang.Iterable[String] = {

248

if (value % 2 == 0) List("even").asJava else List("odd").asJava

249

}

250

}

251

252

val splitStream = numbers.split(new EvenOddSelector)

253

val evenStream = splitStream.select("even")

254

val oddStream = splitStream.select("odd")

255

256

// Split with lambda

257

val splitStreamLambda = numbers.split(value =>

258

if (value % 2 == 0) List("even") else List("odd")

259

)

260

```

261

262

## Stream Configuration

263

264

### Operator Configuration

265

266

```scala { .api }

267

class DataStream[T] {

268

def setParallelism(parallelism: Int): DataStream[T]

269

def setMaxParallelism(maxParallelism: Int): DataStream[T]

270

def name(name: String): DataStream[T]

271

def uid(uid: String): DataStream[T]

272

def setUidHash(hash: String): DataStream[T]

273

def disableChaining(): DataStream[T]

274

def startNewChain(): DataStream[T]

275

def slotSharingGroup(slotSharingGroup: String): DataStream[T]

276

def setBufferTimeout(timeoutMillis: Long): DataStream[T]

277

}

278

```

279

280

Configure transformation operators:

281

282

```scala

283

val env = StreamExecutionEnvironment.getExecutionEnvironment

284

val stream = env.fromElements(1, 2, 3, 4, 5)

285

286

val result = stream

287

.map(_ * 2)

288

.name("Doubler") // Set operator name

289

.uid("doubler-operator") // Set unique ID

290

.setParallelism(4) // Set parallelism

291

.disableChaining() // Disable operator chaining

292

.slotSharingGroup("group1") // Set slot sharing group

293

.filter(_ > 5)

294

.name("Filter Large Numbers")

295

.startNewChain() // Start new operator chain

296

```

297

298

## Error Handling in Transformations

299

300

### Exception Handling

301

302

```scala

303

import org.apache.flink.api.common.functions.RichMapFunction

304

import org.apache.flink.configuration.Configuration

305

306

class SafeMapper extends RichMapFunction[String, Int] {

307

override def map(value: String): Int = {

308

try {

309

value.toInt

310

} catch {

311

case _: NumberFormatException =>

312

// Log error or emit to side output

313

getRuntimeContext.getMetricGroup

314

.counter("parsing-errors")

315

.inc()

316

-1 // Default value

317

}

318

}

319

}

320

```

321

322

### Dead Letter Queue Pattern

323

324

```scala

325

val validTag = OutputTag[Int]("valid")

326

val invalidTag = OutputTag[String]("invalid")

327

328

class ValidatingProcessor extends ProcessFunction[String, String] {

329

override def processElement(

330

value: String,

331

ctx: Context,

332

out: Collector[String]

333

): Unit = {

334

try {

335

val number = value.toInt

336

ctx.output(validTag, number)

337

out.collect(s"Processed: $number")

338

} catch {

339

case _: NumberFormatException =>

340

ctx.output(invalidTag, value)

341

}

342

}

343

}

344

345

val env = StreamExecutionEnvironment.getExecutionEnvironment

346

val input = env.fromElements("1", "2", "invalid", "4", "not-a-number", "6")

347

348

val processed = input.process(new ValidatingProcessor)

349

val validNumbers = processed.getSideOutput(validTag)

350

val invalidInputs = processed.getSideOutput(invalidTag)

351

352

// Handle valid and invalid data separately

353

validNumbers.print("Valid")

354

invalidInputs.print("Invalid")

355

```

356

357

## Performance Considerations

358

359

### Operator Fusion and Chaining

360

361

```scala

362

val env = StreamExecutionEnvironment.getExecutionEnvironment

363

364

// Operations are chained together by default for efficiency

365

val stream = env.fromElements(1, 2, 3, 4, 5)

366

.map(_ * 2) // Chained with filter

367

.filter(_ > 5) // Chained with map

368

.map(_ + 1) // Chained with previous operations

369

370

// Explicit chaining control

371

val explicitChaining = env.fromElements(1, 2, 3, 4, 5)

372

.map(_ * 2)

373

.startNewChain() // Force new chain

374

.filter(_ > 5)

375

.disableChaining() // Disable chaining for this operator

376

.map(_ + 1)

377

```

378

379

### Memory and State Considerations

380

381

```scala

382

// Prefer stateless transformations when possible

383

val stateless = stream.map(_ * 2) // No state required

384

385

// Use process functions judiciously as they can maintain state

386

class StatefulProcessor extends ProcessFunction[Int, Int] {

387

// This would maintain state per key

388

override def processElement(value: Int, ctx: Context, out: Collector[Int]): Unit = {

389

// Processing logic

390

}

391

}

392

```

393

394

## Complete Example: Text Processing Pipeline

395

396

```scala

397

import org.apache.flink.streaming.api.scala._

398

import org.apache.flink.streaming.api.scala.OutputTag

399

import org.apache.flink.streaming.api.functions.ProcessFunction

400

import org.apache.flink.util.Collector

401

402

object TextProcessingPipeline {

403

404

case class WordCount(word: String, count: Int)

405

406

// Output tags for different types of words

407

val shortWordsTag = OutputTag[String]("short-words")

408

val longWordsTag = OutputTag[String]("long-words")

409

410

class WordClassifier extends ProcessFunction[String, WordCount] {

411

override def processElement(

412

word: String,

413

ctx: Context,

414

out: Collector[WordCount]

415

): Unit = {

416

val cleanWord = word.toLowerCase.trim

417

418

if (cleanWord.length < 4) {

419

ctx.output(shortWordsTag, cleanWord)

420

} else if (cleanWord.length > 8) {

421

ctx.output(longWordsTag, cleanWord)

422

}

423

424

out.collect(WordCount(cleanWord, 1))

425

}

426

}

427

428

def main(args: Array[String]): Unit = {

429

val env = StreamExecutionEnvironment.getExecutionEnvironment

430

431

// Read text input

432

val textStream = env.socketTextStream("localhost", 9999)

433

434

// Process text through transformation pipeline

435

val words = textStream

436

.flatMap(_.split("\\W+"))

437

.filter(_.nonEmpty)

438

.name("Word Splitter")

439

.setParallelism(4)

440

441

// Classify and count words

442

val wordCounts = words

443

.process(new WordClassifier)

444

.name("Word Classifier")

445

446

// Get side outputs

447

val shortWords = wordCounts.getSideOutput(shortWordsTag)

448

val longWords = wordCounts.getSideOutput(longWordsTag)

449

450

// Print results

451

wordCounts.print("Word Counts")

452

shortWords.print("Short Words")

453

longWords.print("Long Words")

454

455

env.execute("Text Processing Pipeline")

456

}

457

}

458

```