or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

index.mddocs/

0

# Flink Scala API

1

2

Apache Flink Scala API provides type-safe operations and functional programming paradigms for distributed stream and batch processing applications. It offers elegant Scala APIs with case class support, pattern matching, and functional composition patterns for building scalable data processing pipelines.

3

4

**⚠️ Deprecation Notice**: All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.

5

6

## Package Information

7

8

- **Package Name**: flink-scala_2.12

9

- **Package Type**: maven

10

- **Language**: Scala

11

- **Installation**: Add to Maven pom.xml:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-scala_2.12</artifactId>

17

<version>1.20.2</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```scala

24

import org.apache.flink.api.scala._

25

import org.apache.flink.api.scala.ExecutionEnvironment

26

```

27

28

For specific functionality:

29

30

```scala

31

import org.apache.flink.api.scala.{DataSet, GroupedDataSet}

32

import org.apache.flink.api.scala.typeutils.Types

33

import org.apache.flink.api.scala.extensions._ // For partial function support

34

```

35

36

## Basic Usage

37

38

```scala

39

import org.apache.flink.api.scala._

40

41

// Create execution environment

42

val env = ExecutionEnvironment.getExecutionEnvironment

43

44

// Create dataset from elements

45

val data: DataSet[String] = env.fromElements("Hello", "World", "from", "Flink")

46

47

// Transform data

48

val wordCounts = data

49

.flatMap(_.toLowerCase.split("\\W+"))

50

.filter(_.nonEmpty)

51

.map((_, 1))

52

.groupBy(0)

53

.sum(1)

54

55

// Output results

56

wordCounts.print()

57

58

// Execute the program

59

env.execute("Word Count Example")

60

61

// Example with partial functions (requires extensions import)

62

import org.apache.flink.api.scala.extensions._

63

64

case class Sale(region: String, product: String, amount: Double)

65

val sales = env.fromElements(

66

Sale("US", "ProductA", 100.0),

67

Sale("EU", "ProductA", 150.0)

68

)

69

70

val result = sales

71

.filterWith { case Sale(region, _, _) => region == "US" }

72

.mapWith { case Sale(region, product, amount) => (product, amount) }

73

.groupingBy(_._1)

74

.sum(1)

75

```

76

77

## Architecture

78

79

The Flink Scala API is built around these core concepts:

80

81

- **ExecutionEnvironment**: Context for creating and executing Flink programs

82

- **DataSet**: Immutable collection representing distributed data

83

- **Transformations**: Operations like map, filter, join that create new DataSets

84

- **Actions**: Operations like collect, print that trigger execution

85

- **Type System**: Automatic TypeInformation generation for Scala types

86

87

## Capabilities

88

89

### Execution Environment

90

91

The entry point for all Flink Scala programs, providing methods to create DataSets and configure execution.

92

93

```scala { .api }

94

object ExecutionEnvironment {

95

def getExecutionEnvironment: ExecutionEnvironment

96

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment

97

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment

98

}

99

100

class ExecutionEnvironment {

101

def setParallelism(parallelism: Int): Unit

102

def getParallelism: Int

103

def getConfig: ExecutionConfig

104

105

// Data source creation

106

def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]

107

def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]

108

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]

109

def readCsvFile[T: ClassTag: TypeInformation](filePath: String, /* ... */): DataSet[T]

110

def generateSequence(from: Long, to: Long): DataSet[Long]

111

112

// Execution

113

def execute(): JobExecutionResult

114

def execute(jobName: String): JobExecutionResult

115

def executeAsync(): JobClient

116

}

117

```

118

119

[Execution Environment](./execution-environment.md)

120

121

### DataSet Operations

122

123

Core data transformation and processing operations on distributed datasets.

124

125

```scala { .api }

126

class DataSet[T] {

127

// Basic transformations

128

def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

129

def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

130

def filter(fun: T => Boolean): DataSet[T]

131

def distinct(): DataSet[T]

132

133

// Grouping

134

def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

135

def groupBy(fields: Int*): GroupedDataSet[T]

136

137

// Aggregations

138

def reduce(fun: (T, T) => T): DataSet[T]

139

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

140

def sum(field: Int): AggregateDataSet[T]

141

def max(field: Int): AggregateDataSet[T]

142

def min(field: Int): AggregateDataSet[T]

143

144

// Output operations

145

def collect(): Seq[T]

146

def print(): Unit

147

def count(): Long

148

def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]

149

}

150

```

151

152

[DataSet Operations](./dataset-operations.md)

153

154

### Join Operations

155

156

Joining datasets on keys with various join types and optimization hints.

157

158

```scala { .api }

159

class DataSet[T] {

160

def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

161

def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

162

def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

163

def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

164

}

165

166

class UnfinishedJoinOperation[L, R] {

167

def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]

168

def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]

169

}

170

171

class JoinDataSet[L, R] {

172

def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

173

}

174

```

175

176

[Join Operations](./join-operations.md)

177

178

### Grouped DataSet Operations

179

180

Operations available on grouped datasets including sorting and specialized aggregations.

181

182

```scala { .api }

183

class GroupedDataSet[T] {

184

def sortGroup(field: Int, order: Order): GroupedDataSet[T]

185

def reduce(fun: (T, T) => T): DataSet[T]

186

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

187

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

188

def sum(field: Int): AggregateDataSet[T]

189

def max(field: Int): AggregateDataSet[T]

190

def min(field: Int): AggregateDataSet[T]

191

def maxBy(fields: Int*): DataSet[T]

192

def minBy(fields: Int*): DataSet[T]

193

def first(n: Int): DataSet[T]

194

}

195

```

196

197

[Grouped DataSet Operations](./grouped-dataset-operations.md)

198

199

### Type System

200

201

Scala-specific type information system for serialization and type safety.

202

203

```scala { .api }

204

object Types {

205

// Basic types

206

val STRING: TypeInformation[String]

207

val INT: TypeInformation[Int]

208

val LONG: TypeInformation[Long]

209

val DOUBLE: TypeInformation[Double]

210

val BOOLEAN: TypeInformation[Boolean]

211

212

// Factory methods

213

def of[T: TypeInformation]: TypeInformation[T]

214

def TUPLE[T: TypeInformation]: TypeInformation[T]

215

def CASE_CLASS[T: TypeInformation]: TypeInformation[T]

216

def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]

217

def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]

218

}

219

220

// Implicit type information generation (macro-based)

221

implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

222

```

223

224

[Type System](./type-system.md)

225

226

### Partitioned and Sorted DataSets

227

228

Specialized DataSet types for partitioned and sorted data operations.

229

230

```scala { .api }

231

class PartitionSortedDataSet[T] extends DataSet[T] {

232

def sortPartition(field: Int, order: Order): DataSet[T]

233

def sortPartition(field: String, order: Order): DataSet[T]

234

// Note: Cannot chain key selector functions

235

}

236

```

237

238

### Extension Methods for Partial Functions

239

240

Scala-friendly extension methods that accept partial functions for pattern matching.

241

242

```scala { .api }

243

// Import extensions

244

import org.apache.flink.api.scala.extensions._

245

246

class OnDataSet[T] {

247

def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

248

def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

249

def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

250

def filterWith(fun: T => Boolean): DataSet[T]

251

def reduceWith(fun: (T, T) => T): DataSet[T]

252

def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

253

def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

254

}

255

```

256

257

### Utility Functions

258

259

Additional utilities for DataSet operations including sampling, partitioning, and indexing.

260

261

```scala { .api }

262

// Import utilities

263

import org.apache.flink.api.scala.utils._

264

265

class DataSet[T] {

266

// Available via implicit conversions from utils package

267

def countElementsPerPartition(): DataSet[(Int, Long)]

268

def zipWithIndex(): DataSet[(Long, T)]

269

def zipWithUniqueId(): DataSet[(Long, T)]

270

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong()): DataSet[T]

271

def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Utils.RNG.nextLong()): DataSet[T]

272

def checksumHashCode(): ChecksumHashCode

273

}

274

```

275

276

[Utility Functions](./utility-functions.md)

277

278

## Common Types

279

280

```scala { .api }

281

// Execution configuration

282

class ExecutionConfig {

283

def setParallelism(parallelism: Int): ExecutionConfig

284

def getParallelism: Int

285

def enableClosureCleaner(): ExecutionConfig

286

def disableClosureCleaner(): ExecutionConfig

287

}

288

289

// Job execution result

290

class JobExecutionResult {

291

def getJobExecutionTime: Long

292

def getAccumulatorResult[T](accumulatorName: String): T

293

}

294

295

// Aggregation types

296

object Aggregations extends Enumeration {

297

val SUM, MAX, MIN = Value

298

}

299

300

// File system write modes

301

object FileSystem {

302

object WriteMode extends Enumeration {

303

val NO_OVERWRITE, OVERWRITE = Value

304

}

305

}

306

307

// Ordering for sorting

308

object Order extends Enumeration {

309

val ASCENDING, DESCENDING = Value

310

}

311

```

312

313

## Error Handling

314

315

The Flink Scala API can throw these exceptions:

316

317

- `IllegalArgumentException` - Invalid parameters or field names

318

- `UnsupportedOperationException` - Unsupported operations on certain data types

319

- `RuntimeException` - Runtime execution errors

320

- `IOException` - File I/O related errors

321

- `JobExecutionException` - Job execution failures