or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

transformations.mddocs/

0

# Transformations

1

2

Apache Flink Scala API provides a rich set of transformation operations that enable functional programming patterns with type safety. All transformations are lazy and form a directed acyclic graph (DAG) that is executed when an action is triggered.

3

4

## Basic Transformations

5

6

### Map

7

8

Transform each element using a function, producing exactly one output element per input element.

9

10

```scala { .api }

11

class DataSet[T] {

12

// Using function literal

13

def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

14

15

// Using MapFunction

16

def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]

17

18

// Using RichMapFunction with RuntimeContext access

19

def map[R: TypeInformation: ClassTag](mapper: RichMapFunction[T, R]): DataSet[R]

20

}

21

```

22

23

### FlatMap

24

25

Transform each element into zero, one, or more output elements.

26

27

```scala { .api }

28

class DataSet[T] {

29

// Using function returning TraversableOnce

30

def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

31

32

// Using function with Collector

33

def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]

34

35

// Using FlatMapFunction

36

def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]

37

38

// Using RichFlatMapFunction with RuntimeContext access

39

def flatMap[R: TypeInformation: ClassTag](flatMapper: RichFlatMapFunction[T, R]): DataSet[R]

40

}

41

```

42

43

### Filter

44

45

Keep only elements that satisfy a predicate function.

46

47

```scala { .api }

48

class DataSet[T] {

49

// Using boolean function

50

def filter(fun: T => Boolean): DataSet[T]

51

52

// Using FilterFunction

53

def filter(filter: FilterFunction[T]): DataSet[T]

54

55

// Using RichFilterFunction with RuntimeContext access

56

def filter(filter: RichFilterFunction[T]): DataSet[T]

57

}

58

```

59

60

## Partition-wise Transformations

61

62

### MapPartition

63

64

Transform entire partitions at once, useful for expensive initialization operations.

65

66

```scala { .api }

67

class DataSet[T] {

68

// Using function on Iterator

69

def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]

70

71

// Using function with Collector

72

def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

73

74

// Using MapPartitionFunction

75

def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]

76

}

77

```

78

79

## Transformations with Broadcast Variables

80

81

### MapWithBroadcastSet

82

83

Transform elements with access to broadcast variables for enrichment or lookup operations.

84

85

```scala { .api }

86

class DataSet[T] {

87

def mapWithBroadcastSet[R: TypeInformation: ClassTag](

88

fun: (T, Iterable[_]) => R,

89

broadcastSetName: String

90

): DataSet[R]

91

}

92

```

93

94

### FlatMapWithBroadcastSet

95

96

```scala { .api }

97

class DataSet[T] {

98

def flatMapWithBroadcastSet[R: TypeInformation: ClassTag](

99

fun: (T, Iterable[_], Collector[R]) => Unit,

100

broadcastSetName: String

101

): DataSet[R]

102

}

103

```

104

105

### MapPartitionWithBroadcastSet

106

107

```scala { .api }

108

class DataSet[T] {

109

def mapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](

110

fun: (Iterator[T], Iterable[_], Collector[R]) => Unit,

111

broadcastSetName: String

112

): DataSet[R]

113

}

114

```

115

116

### FlatMapPartitionWithBroadcastSet

117

118

```scala { .api }

119

class DataSet[T] {

120

def flatMapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](

121

fun: (Iterator[T], Iterable[_], Collector[R]) => Unit,

122

broadcastSetName: String

123

): DataSet[R]

124

}

125

```

126

127

### FilterWithBroadcastSet

128

129

```scala { .api }

130

class DataSet[T] {

131

def filterWithBroadcastSet(

132

fun: (T, Iterable[_]) => Boolean,

133

broadcastSetName: String

134

): DataSet[T]

135

}

136

```

137

138

## Reduce Operations

139

140

### Reduce

141

142

Combine elements using an associative and commutative function.

143

144

```scala { .api }

145

class DataSet[T] {

146

// Using function

147

def reduce(fun: (T, T) => T): DataSet[T]

148

149

// Using ReduceFunction

150

def reduce(reducer: ReduceFunction[T]): DataSet[T]

151

152

// Using RichReduceFunction with RuntimeContext access

153

def reduce(reducer: RichReduceFunction[T]): DataSet[T]

154

}

155

```

156

157

## Set Operations

158

159

### Union

160

161

Combine multiple DataSets into one containing all elements.

162

163

```scala { .api }

164

class DataSet[T] {

165

def union(other: DataSet[T]*): DataSet[T]

166

}

167

```

168

169

### Distinct

170

171

Remove duplicate elements based on equality or key extraction.

172

173

```scala { .api }

174

class DataSet[T] {

175

// Distinct by element equality

176

def distinct(): DataSet[T]

177

178

// Distinct by key function

179

def distinct[K: TypeInformation](fun: T => K): DataSet[T]

180

181

// Distinct by field positions

182

def distinct(fields: Int*): DataSet[T]

183

184

// Distinct by field names

185

def distinct(firstField: String, otherFields: String*): DataSet[T]

186

}

187

```

188

189

## Sorting and Partitioning

190

191

### SortPartition

192

193

Sort elements within each partition.

194

195

```scala { .api }

196

class DataSet[T] {

197

// Sort by key function

198

def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]

199

200

// Sort by field position

201

def sortPartition(field: Int, order: Order): DataSet[T]

202

203

// Sort by field name

204

def sortPartition(field: String, order: Order): DataSet[T]

205

}

206

```

207

208

### Partitioning Operations

209

210

Control how data is distributed across parallel instances.

211

212

```scala { .api }

213

class DataSet[T] {

214

// Hash partitioning

215

def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]

216

def partitionByHash(fields: Int*): DataSet[T]

217

def partitionByHash(firstField: String, otherFields: String*): DataSet[T]

218

219

// Range partitioning

220

def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]

221

def partitionByRange(fields: Int*): DataSet[T]

222

def partitionByRange(firstField: String, otherFields: String*): DataSet[T]

223

224

// Custom partitioning

225

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]

226

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fields: Int*): DataSet[T]

227

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], firstField: String, otherFields: String*): DataSet[T]

228

229

// Round-robin rebalancing

230

def rebalance(): DataSet[T]

231

}

232

```

233

234

## Sampling

235

236

### Sample

237

238

Extract a random sample of elements from the DataSet.

239

240

```scala { .api }

241

class DataSet[T] {

242

// Sample with fraction

243

def sample(withReplacement: Boolean, fraction: Double): DataSet[T]

244

def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataSet[T]

245

}

246

```

247

248

### First

249

250

Get the first n elements.

251

252

```scala { .api }

253

class DataSet[T] {

254

def first(n: Int): DataSet[T]

255

}

256

```

257

258

## Configuration Operations

259

260

### Naming and Parallelism

261

262

```scala { .api }

263

class DataSet[T] {

264

// Set operation name for debugging

265

def name(name: String): DataSet[T]

266

267

// Set parallelism for this operation

268

def setParallelism(parallelism: Int): DataSet[T]

269

def getParallelism: Int

270

}

271

```

272

273

### Resource Configuration

274

275

```scala { .api }

276

class DataSet[T] {

277

// Get resource specifications

278

def minResources: ResourceSpec

279

def preferredResources: ResourceSpec

280

281

// Set resource requirements

282

def setResources(minResources: ResourceSpec, preferredResources: ResourceSpec): DataSet[T]

283

}

284

```

285

286

### Broadcast Variables

287

288

```scala { .api }

289

class DataSet[T] {

290

// Add broadcast variable

291

def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]

292

}

293

```

294

295

## Usage Examples

296

297

### Basic Transformations

298

299

```scala

300

import org.apache.flink.api.scala._

301

302

val env = ExecutionEnvironment.getExecutionEnvironment

303

304

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

305

306

// Map: square each number

307

val squares = numbers.map(x => x * x)

308

309

// Filter: keep only even numbers

310

val evens = numbers.filter(_ % 2 == 0)

311

312

// FlatMap: split range into individual numbers

313

val ranges = env.fromElements("1-3", "4-6", "7-9")

314

val flatMapped = ranges.flatMap { range =>

315

val parts = range.split("-")

316

val start = parts(0).toInt

317

val end = parts(1).toInt

318

start to end

319

}

320

```

321

322

### Working with Case Classes

323

324

```scala

325

import org.apache.flink.api.scala._

326

327

case class Person(name: String, age: Int, city: String)

328

329

val env = ExecutionEnvironment.getExecutionEnvironment

330

val people = env.fromElements(

331

Person("Alice", 25, "New York"),

332

Person("Bob", 30, "London"),

333

Person("Charlie", 35, "Paris")

334

)

335

336

// Map: extract names

337

val names = people.map(_.name)

338

339

// Filter: adults only

340

val adults = people.filter(_.age >= 18)

341

342

// Transform: create summary

343

val summaries = people.map(p => s"${p.name} (${p.age}) from ${p.city}")

344

```

345

346

### Partition Operations

347

348

```scala

349

import org.apache.flink.api.scala._

350

import org.apache.flink.api.common.operators.Order

351

352

val env = ExecutionEnvironment.getExecutionEnvironment

353

354

case class Sale(product: String, amount: Double, region: String)

355

356

val sales = env.fromElements(

357

Sale("laptop", 1000.0, "US"),

358

Sale("phone", 500.0, "EU"),

359

Sale("tablet", 300.0, "US")

360

)

361

362

// Partition by region for co-location

363

val partitioned = sales.partitionByHash(_.region)

364

365

// Sort within partitions by amount

366

val sorted = sales.sortPartition(_.amount, Order.DESCENDING)

367

368

// Rebalance for load distribution

369

val rebalanced = sales.rebalance()

370

```

371

372

### Broadcast Variables

373

374

```scala

375

import org.apache.flink.api.scala._

376

377

val env = ExecutionEnvironment.getExecutionEnvironment

378

379

// Lookup data

380

val exchangeRates = env.fromElements(

381

("USD", 1.0),

382

("EUR", 0.85),

383

("GBP", 0.73)

384

)

385

386

// Transaction data

387

val transactions = env.fromElements(

388

("USD", 100.0),

389

("EUR", 85.0),

390

("GBP", 73.0)

391

)

392

393

// Convert to USD using broadcast variable

394

val convertedTransactions = transactions

395

.map { case (currency, amount) =>

396

// This would access broadcast variable in real implementation

397

amount * 1.0 // Simplified example

398

}

399

.withBroadcastSet(exchangeRates, "rates")

400

```

401

402

### MapPartition for Expensive Operations

403

404

```scala

405

import org.apache.flink.api.scala._

406

407

val env = ExecutionEnvironment.getExecutionEnvironment

408

val data = env.fromElements("hello", "world", "flink", "scala")

409

410

// Expensive initialization per partition

411

val processed = data.mapPartition { iter =>

412

// Expensive setup (done once per partition)

413

val expensiveResource = initializeExpensiveResource()

414

415

// Process all elements in partition

416

iter.map { element =>

417

expensiveResource.process(element)

418

}

419

}

420

421

def initializeExpensiveResource(): AnyRef = {

422

// Simulate expensive initialization

423

new AnyRef

424

}

425

```

426

427

## Rich Functions

428

429

Rich functions provide access to the RuntimeContext for advanced features like accumulators, broadcast variables, and execution parameters.

430

431

### Rich Function Types

432

433

```scala { .api }

434

// Rich function base classes

435

abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction

436

abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction

437

abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction

438

abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction

439

abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction

440

abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction

441

442

// RichFunction interface

443

trait RichFunction {

444

def getRuntimeContext: RuntimeContext

445

def open(parameters: Configuration): Unit = {}

446

def close(): Unit = {}

447

def setRuntimeContext(t: RuntimeContext): Unit

448

}

449

450

// RuntimeContext interface

451

trait RuntimeContext {

452

def getExecutionConfig: ExecutionConfig

453

def getJobId: JobID

454

def getTaskName: String

455

def getTaskNameWithSubtasks: String

456

def getNumberOfParallelSubtasks: Int

457

def getIndexOfThisSubtask: Int

458

def getAttemptNumber: Int

459

460

// Accumulators

461

def getAccumulator[V, A](name: String): A

462

def getIntCounter(name: String): IntCounter

463

def getLongCounter(name: String): LongCounter

464

def getDoubleCounter(name: String): DoubleCounter

465

def getHistogram(name: String): Histogram

466

467

// Broadcast variables

468

def getBroadcastVariable[T](name: String): util.List[T]

469

def getBroadcastVariableWithInitializer[T, C](name: String, initializer: BroadcastVariableInitializer[T, C]): C

470

}

471

```

472

473

### Rich Function Usage Examples

474

475

```scala

476

import org.apache.flink.api.scala._

477

import org.apache.flink.api.common.functions.RichMapFunction

478

import org.apache.flink.configuration.Configuration

479

480

val env = ExecutionEnvironment.getExecutionEnvironment

481

482

// Rich function with accumulator access

483

class CountingMapper extends RichMapFunction[String, String] {

484

var counter: IntCounter = _

485

486

override def open(parameters: Configuration): Unit = {

487

counter = getRuntimeContext.getIntCounter("processed-records")

488

}

489

490

override def map(value: String): String = {

491

counter.add(1)

492

value.toUpperCase

493

}

494

}

495

496

val data = env.fromElements("hello", "world", "flink")

497

val result = data.map(new CountingMapper())

498

499

// Rich function with broadcast variables

500

class EnrichingMapper extends RichMapFunction[String, String] {

501

var broadcastData: util.List[String] = _

502

503

override def open(parameters: Configuration): Unit = {

504

broadcastData = getRuntimeContext.getBroadcastVariable("lookup-data")

505

}

506

507

override def map(value: String): String = {

508

// Use broadcast data for enrichment

509

val enrichment = if (broadcastData.contains(value)) " [FOUND]" else " [NOT_FOUND]"

510

value + enrichment

511

}

512

}

513

514

val lookupData = env.fromElements("hello", "flink")

515

val enriched = data

516

.map(new EnrichingMapper())

517

.withBroadcastSet(lookupData, "lookup-data")

518

```