or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

index.mddocs/

0

# Apache Flink Scala API

1

2

Apache Flink Scala API provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework. This module enables Scala developers to write type-safe data processing applications using idiomatic Scala constructs, including case classes, pattern matching, and functional programming patterns.

3

4

## Package Information

5

6

- **Package Name**: flink-scala_2.11

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**: Add to your Maven pom.xml:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-scala_2.11</artifactId>

14

<version>1.14.6</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```scala

21

import org.apache.flink.api.scala._

22

import org.apache.flink.api.scala.ExecutionEnvironment

23

```

24

25

For type utilities:

26

```scala

27

import org.apache.flink.api.scala.typeutils.Types

28

```

29

30

For extension methods (partial function support):

31

```scala

32

import org.apache.flink.api.scala.extensions._

33

```

34

35

## Basic Usage

36

37

```scala

38

import org.apache.flink.api.scala._

39

40

// Create execution environment

41

val env = ExecutionEnvironment.getExecutionEnvironment

42

43

// Create DataSet from collection

44

val data = env.fromCollection(List(1, 2, 3, 4, 5))

45

46

// Transform data

47

val result = data

48

.filter(_ > 2)

49

.map(_ * 2)

50

.reduce(_ + _)

51

52

// Execute and get result

53

println(result.collect().head) // Prints: 18

54

```

55

56

## Architecture

57

58

The Flink Scala API is built around several key components:

59

60

- **ExecutionEnvironment**: Entry point for creating and configuring Flink programs

61

- **DataSet[T]**: Core abstraction representing distributed collections with type safety

62

- **Type System**: Automatic type information generation via Scala macros

63

- **Serialization Framework**: Specialized serializers for Scala types (Option, Either, Try, case classes)

64

- **Fluent API**: Method chaining for building complex data processing pipelines

65

- **Extensions**: Partial function support for pattern matching in transformations

66

67

## Capabilities

68

69

### Execution Environment

70

71

Environment setup, data source creation, and job execution management. The primary entry point for all Flink programs.

72

73

```scala { .api }

74

object ExecutionEnvironment {

75

def getExecutionEnvironment: ExecutionEnvironment

76

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment

77

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment

78

}

79

80

class ExecutionEnvironment {

81

def setParallelism(parallelism: Int): Unit

82

def getParallelism: Int

83

def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]

84

def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]

85

def readTextFile(filePath: String): DataSet[String]

86

def execute(): JobExecutionResult

87

def execute(jobName: String): JobExecutionResult

88

}

89

```

90

91

[Execution Environment](./execution-environment.md)

92

93

### Data Transformations

94

95

Core data processing operations including map, filter, reduce, and aggregations. The heart of Flink's data processing capabilities.

96

97

```scala { .api }

98

class DataSet[T] {

99

def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

100

def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

101

def filter(fun: T => Boolean): DataSet[T]

102

def reduce(fun: (T, T) => T): DataSet[T]

103

def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

104

def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

105

def union(other: DataSet[T]): DataSet[T]

106

}

107

```

108

109

[Data Transformations](./data-transformations.md)

110

111

### Grouping and Aggregation

112

113

Group-wise operations and aggregation functions for summarizing and analyzing grouped data.

114

115

```scala { .api }

116

class GroupedDataSet[T] {

117

def reduce(fun: (T, T) => T): DataSet[T]

118

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

119

def sum(field: String): AggregateDataSet[T]

120

def max(field: String): AggregateDataSet[T]

121

def min(field: String): AggregateDataSet[T]

122

def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]

123

}

124

```

125

126

[Grouping and Aggregation](./grouping-aggregation.md)

127

128

### Binary Operations

129

130

Join, cross, and coGroup operations for combining multiple DataSets.

131

132

```scala { .api }

133

class DataSet[T] {

134

def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

135

def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

136

def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

137

def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

138

def cross[O](other: DataSet[O]): CrossDataSet[T, O]

139

def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]

140

}

141

```

142

143

[Binary Operations](./binary-operations.md)

144

145

### Type System and Serialization

146

147

Comprehensive type information system and Scala-specific serialization support.

148

149

```scala { .api }

150

object Types {

151

def of[T: TypeInformation]: TypeInformation[T]

152

def OPTION[A](valueType: TypeInformation[A]): TypeInformation[Option[A]]

153

def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]

154

def TRY[A](valueType: TypeInformation[A]): TypeInformation[Try[A]]

155

def CASE_CLASS[T: TypeInformation]: TypeInformation[T]

156

}

157

158

// Automatic type information generation via macro

159

implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

160

```

161

162

[Type System](./type-system.md)

163

164

### Partitioning and Distribution

165

166

Control over data distribution and partitioning strategies across the cluster.

167

168

```scala { .api }

169

class DataSet[T] {

170

def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]

171

def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]

172

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]

173

def rebalance(): DataSet[T]

174

def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]

175

}

176

```

177

178

[Partitioning and Distribution](./partitioning-distribution.md)

179

180

### Input and Output Operations

181

182

Reading data from various sources and writing results to different sinks.

183

184

```scala { .api }

185

class ExecutionEnvironment {

186

def readTextFile(filePath: String): DataSet[String]

187

def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]

188

def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]

189

}

190

191

class DataSet[T] {

192

def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]

193

def writeAsCsv(filePath: String): DataSink[T]

194

def print(): Unit

195

def collect(): Seq[T]

196

}

197

```

198

199

[Input and Output](./input-output.md)

200

201

### Utility Functions

202

203

Advanced utilities for sampling, indexing, and data analysis.

204

205

```scala { .api }

206

implicit class DataSetUtils[T](dataSet: DataSet[T]) {

207

def zipWithIndex: DataSet[(Long, T)]

208

def zipWithUniqueId: DataSet[(Long, T)]

209

def sample(withReplacement: Boolean, fraction: Double): DataSet[T]

210

def countElementsPerPartition: DataSet[(Int, Long)]

211

}

212

```

213

214

[Utilities](./utilities.md)

215

216

## Types

217

218

### Core Types

219

220

```scala { .api }

221

trait TypeInformation[T] {

222

def getTypeClass: Class[T]

223

def createSerializer(config: ExecutionConfig): TypeSerializer[T]

224

}

225

226

class JobExecutionResult {

227

def getJobID: JobID

228

def getNetRuntime: Long

229

def getNetRuntime(timeUnit: TimeUnit): Long

230

}

231

232

sealed trait Order

233

object Order {

234

case object ASCENDING extends Order

235

case object DESCENDING extends Order

236

}

237

238

abstract class Partitioner[T] {

239

def partition(key: T, numPartitions: Int): Int

240

}

241

```