or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

data-transformations.mddocs/

0

# Data Transformations

1

2

Core data processing operations for mapping, filtering, reducing, and transforming DataSets. These operations form the foundation of Flink's data processing capabilities.

3

4

## Capabilities

5

6

### Map Transformations

7

8

Transform each element in a DataSet to a new element, potentially of a different type.

9

10

```scala { .api }

11

class DataSet[T] {

12

/**

13

* Applies a transformation function to each element

14

* @param fun Transformation function T => R

15

* @return DataSet with transformed elements

16

*/

17

def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

18

19

/**

20

* Applies a MapFunction to each element

21

* @param mapper MapFunction implementation

22

* @return DataSet with transformed elements

23

*/

24

def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]

25

}

26

```

27

28

**Usage Examples:**

29

30

```scala

31

import org.apache.flink.api.scala._

32

33

val env = ExecutionEnvironment.getExecutionEnvironment

34

val numbers = env.fromElements(1, 2, 3, 4, 5)

35

36

// Simple transformation

37

val doubled = numbers.map(_ * 2)

38

39

// Type transformation

40

case class Person(name: String, age: Int)

41

val people = env.fromElements("Alice,25", "Bob,30")

42

val persons = people.map { line =>

43

val parts = line.split(",")

44

Person(parts(0), parts(1).toInt)

45

}

46

```

47

48

### FlatMap Transformations

49

50

Transform each element into zero or more elements, useful for splitting and expanding data.

51

52

```scala { .api }

53

class DataSet[T] {

54

/**

55

* Transforms each element into a traversable collection

56

* @param fun Function returning TraversableOnce[R]

57

* @return DataSet with flattened results

58

*/

59

def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

60

61

/**

62

* Transforms using a FlatMapFunction with collector

63

* @param fun Function using collector to emit results

64

* @return DataSet with collected results

65

*/

66

def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]

67

68

/**

69

* Applies a FlatMapFunction to each element

70

* @param flatMapper FlatMapFunction implementation

71

* @return DataSet with flattened results

72

*/

73

def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]

74

}

75

```

76

77

**Usage Examples:**

78

79

```scala

80

val sentences = env.fromElements("Hello world", "Scala Flink", "Data processing")

81

82

// Split sentences into words

83

val words = sentences.flatMap(_.split(" "))

84

85

// Generate multiple values per input

86

val numbers = env.fromElements(1, 2, 3)

87

val expanded = numbers.flatMap(n => 1 to n)

88

// Result: [1, 1, 2, 1, 2, 3]

89

```

90

91

### MapPartition Transformations

92

93

Transform entire partitions at once, useful for initialization-heavy operations.

94

95

```scala { .api }

96

class DataSet[T] {

97

/**

98

* Transforms entire partitions using an iterator

99

* @param fun Function transforming Iterator[T] to TraversableOnce[R]

100

* @return DataSet with partition-wise transformations

101

*/

102

def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]

103

104

/**

105

* Transforms partitions using iterator and collector

106

* @param fun Function with iterator input and collector output

107

* @return DataSet with collected partition results

108

*/

109

def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

110

111

/**

112

* Applies a MapPartitionFunction to each partition

113

* @param partitionMapper MapPartitionFunction implementation

114

* @return DataSet with partition transformations

115

*/

116

def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]

117

}

118

```

119

120

### Filter Operations

121

122

Select elements based on predicates, removing elements that don't match criteria.

123

124

```scala { .api }

125

class DataSet[T] {

126

/**

127

* Filters elements using a predicate function

128

* @param fun Predicate function returning Boolean

129

* @return DataSet with filtered elements

130

*/

131

def filter(fun: T => Boolean): DataSet[T]

132

133

/**

134

* Filters elements using a FilterFunction

135

* @param filter FilterFunction implementation

136

* @return DataSet with filtered elements

137

*/

138

def filter(filter: FilterFunction[T]): DataSet[T]

139

}

140

```

141

142

**Usage Examples:**

143

144

```scala

145

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

146

147

// Filter even numbers

148

val evenNumbers = numbers.filter(_ % 2 == 0)

149

150

// Filter with complex condition

151

case class Product(name: String, price: Double, inStock: Boolean)

152

val products = env.fromElements(

153

Product("Laptop", 999.99, true),

154

Product("Mouse", 29.99, false),

155

Product("Keyboard", 79.99, true)

156

)

157

158

val availableProducts = products.filter(p => p.inStock && p.price < 100)

159

```

160

161

### Reduce Operations

162

163

Combine elements using associative operations to produce aggregated results.

164

165

```scala { .api }

166

class DataSet[T] {

167

/**

168

* Reduces all elements using a combining function

169

* @param fun Binary combining function (T, T) => T

170

* @return DataSet with single reduced element

171

*/

172

def reduce(fun: (T, T) => T): DataSet[T]

173

174

/**

175

* Reduces elements using a ReduceFunction

176

* @param reducer ReduceFunction implementation

177

* @return DataSet with reduced result

178

*/

179

def reduce(reducer: ReduceFunction[T]): DataSet[T]

180

}

181

```

182

183

### ReduceGroup Operations

184

185

Process all elements together, potentially producing different output types.

186

187

```scala { .api }

188

class DataSet[T] {

189

/**

190

* Processes all elements as a group using iterator

191

* @param fun Function processing Iterator[T] to produce R

192

* @return DataSet with group processing result

193

*/

194

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

195

196

/**

197

* Processes groups using iterator and collector

198

* @param fun Function with iterator input and collector output

199

* @return DataSet with collected group results

200

*/

201

def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

202

203

/**

204

* Applies a GroupReduceFunction to process groups

205

* @param reducer GroupReduceFunction implementation

206

* @return DataSet with group reduction results

207

*/

208

def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

209

}

210

```

211

212

### CombineGroup Operations

213

214

Pre-aggregate elements within partitions before final grouping, improving performance.

215

216

```scala { .api }

217

class DataSet[T] {

218

/**

219

* Combines elements within partitions using iterator and collector

220

* @param fun Function for partition-wise combining

221

* @return DataSet with partition-combined results

222

*/

223

def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

224

225

/**

226

* Applies a GroupCombineFunction for partition-wise combining

227

* @param combiner GroupCombineFunction implementation

228

* @return DataSet with combined results

229

*/

230

def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]

231

}

232

```

233

234

### Distinct Operations

235

236

Remove duplicate elements from DataSets with various key selection strategies.

237

238

```scala { .api }

239

class DataSet[T] {

240

/**

241

* Removes all duplicates from the DataSet

242

* @return DataSet with unique elements

243

*/

244

def distinct(): DataSet[T]

245

246

/**

247

* Removes duplicates based on field positions

248

* @param fields Field positions to consider for uniqueness

249

* @return DataSet with unique elements by fields

250

*/

251

def distinct(fields: Int*): DataSet[T]

252

253

/**

254

* Removes duplicates based on field names

255

* @param firstField First field name

256

* @param otherFields Additional field names

257

* @return DataSet with unique elements by named fields

258

*/

259

def distinct(firstField: String, otherFields: String*): DataSet[T]

260

261

/**

262

* Removes duplicates based on key selector function

263

* @param fun Key selector function

264

* @return DataSet with unique elements by key

265

*/

266

def distinct[K: TypeInformation](fun: T => K): DataSet[T]

267

}

268

```

269

270

**Usage Examples:**

271

272

```scala

273

val numbers = env.fromElements(1, 2, 2, 3, 3, 3, 4, 5)

274

275

// Remove all duplicates

276

val unique = numbers.distinct()

277

278

// Remove duplicates by key

279

case class Person(name: String, age: Int, city: String)

280

val people = env.fromElements(

281

Person("Alice", 25, "NYC"),

282

Person("Bob", 25, "NYC"),

283

Person("Alice", 30, "LA")

284

)

285

286

// Unique by name

287

val uniqueByName = people.distinct(_.name)

288

289

// Unique by age and city

290

val uniqueByAgeCity = people.distinct(p => (p.age, p.city))

291

```

292

293

### Selection Operations

294

295

Select specific elements or subsets from DataSets.

296

297

```scala { .api }

298

class DataSet[T] {

299

/**

300

* Selects the first n elements

301

* @param n Number of elements to select

302

* @return DataSet with first n elements

303

*/

304

def first(n: Int): DataSet[T]

305

306

/**

307

* Selects elements with minimum values in specified fields

308

* @param fields Field positions for minimum comparison

309

* @return DataSet with minimum elements

310

*/

311

def minBy(fields: Int*): DataSet[T]

312

313

/**

314

* Selects elements with maximum values in specified fields

315

* @param fields Field positions for maximum comparison

316

* @return DataSet with maximum elements

317

*/

318

def maxBy(fields: Int*): DataSet[T]

319

}

320

```

321

322

### Counting and Collection

323

324

Get counts and collect elements for inspection.

325

326

```scala { .api }

327

class DataSet[T] {

328

/**

329

* Counts the number of elements in the DataSet

330

* @return Number of elements

331

*/

332

def count(): Long

333

334

/**

335

* Collects all elements to the driver program

336

* @return Sequence containing all elements

337

*/

338

def collect(): Seq[T]

339

}

340

```

341

342

**Usage Examples:**

343

344

```scala

345

val data = env.fromElements(1, 2, 3, 4, 5)

346

347

// Count elements

348

val elementCount = data.count()

349

println(s"Total elements: $elementCount")

350

351

// Collect results (use carefully with large datasets)

352

val results = data.map(_ * 2).collect()

353

results.foreach(println)

354

```

355

356

## Types

357

358

```scala { .api }

359

abstract class MapFunction[T, O] extends Function {

360

def map(value: T): O

361

}

362

363

abstract class FlatMapFunction[T, O] extends Function {

364

def flatMap(value: T, out: Collector[O]): Unit

365

}

366

367

abstract class MapPartitionFunction[T, O] extends Function {

368

def mapPartition(values: java.lang.Iterable[T], out: Collector[O]): Unit

369

}

370

371

abstract class FilterFunction[T] extends Function {

372

def filter(value: T): Boolean

373

}

374

375

abstract class ReduceFunction[T] extends Function {

376

def reduce(value1: T, value2: T): T

377

}

378

379

abstract class GroupReduceFunction[T, O] extends Function {

380

def reduce(values: java.lang.Iterable[T], out: Collector[O]): Unit

381

}

382

383

abstract class GroupCombineFunction[T, O] extends Function {

384

def combine(values: java.lang.Iterable[T], out: Collector[O]): Unit

385

}

386

387

trait Collector[T] {

388

def collect(record: T): Unit

389

}

390

```