or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-streaming-scala_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala-2-12@1.20.0

0

# Flink Streaming Scala

1

2

Apache Flink Streaming Scala provides idiomatic Scala bindings for Apache Flink's streaming data processing capabilities. It wraps Flink's Java streaming API with elegant Scala constructs, enabling developers to create high-throughput, low-latency stream processing applications using familiar Scala syntax and type safety.

3

4

**⚠️ Deprecation Notice**: All Flink Scala APIs are deprecated since version 1.18.0 and will be removed in a future Flink major version. Users should migrate to the Java DataStream API or Table API while still being able to write applications in Scala. See [FLIP-265](https://s.apache.org/flip-265) for details.

5

6

## Package Information

7

8

- **Package Name**: flink-streaming-scala_2.12

9

- **Package Type**: maven

10

- **Language**: Scala

11

- **Group ID**: org.apache.flink

12

- **Artifact ID**: flink-streaming-scala_2.12

13

- **Version**: 1.20.2

14

- **Installation**: Add to your Maven/SBT dependencies

15

16

Maven:

17

```xml

18

<dependency>

19

<groupId>org.apache.flink</groupId>

20

<artifactId>flink-streaming-scala_2.12</artifactId>

21

<version>1.20.2</version>

22

</dependency>

23

```

24

25

SBT:

26

```scala

27

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.20.2"

28

```

29

30

## Core Imports

31

32

```scala

33

import org.apache.flink.streaming.api.scala._

34

import org.apache.flink.streaming.api.scala.function._

35

```

36

37

For specific functionality:

38

```scala

39

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}

40

import org.apache.flink.streaming.api.scala.async.AsyncDataStream

41

import org.apache.flink.streaming.api.windowing.time.Time

42

import org.apache.flink.streaming.api.functions.ProcessFunction

43

```

44

45

## Basic Usage

46

47

```scala

48

import org.apache.flink.streaming.api.scala._

49

50

// Create execution environment

51

val env = StreamExecutionEnvironment.getExecutionEnvironment

52

53

// Create data source

54

val numbers = env.fromElements(1, 2, 3, 4, 5)

55

56

// Transform data

57

val doubled = numbers

58

.map(_ * 2)

59

.filter(_ > 5)

60

61

// Output results

62

doubled.print()

63

64

// Execute the job

65

env.execute("Simple Flink Job")

66

```

67

68

## Architecture

69

70

Flink Streaming Scala is built around several key components:

71

72

- **Execution Environment**: `StreamExecutionEnvironment` serves as the entry point for creating streaming applications

73

- **Data Streams**: `DataStream[T]` represents bounded or unbounded streams of data with type safety

74

- **Keyed Streams**: `KeyedStream[T, K]` enables partitioned processing and stateful operations

75

- **Windowing**: Time and count-based windows for bounded computations on infinite streams

76

- **Connectors**: Sources and sinks for various external systems

77

- **Processing Functions**: Low-level processing functions for complex event-driven logic

78

- **Async I/O**: High-performance async operations for external service calls

79

80

## Capabilities

81

82

### Execution Environment

83

84

Core environment setup and job execution functionality. The entry point for all Flink streaming applications.

85

86

```scala { .api }

87

object StreamExecutionEnvironment {

88

def getExecutionEnvironment: StreamExecutionEnvironment

89

def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment

90

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment

91

}

92

93

class StreamExecutionEnvironment {

94

def setParallelism(parallelism: Int): Unit

95

def enableCheckpointing(interval: Long): StreamExecutionEnvironment

96

def execute(): JobExecutionResult

97

def execute(jobName: String): JobExecutionResult

98

}

99

```

100

101

[Execution Environment](./execution-environment.md)

102

103

### Data Sources and Streams

104

105

Core data stream operations including creation, transformation, and partitioning. The foundation for all stream processing operations.

106

107

```scala { .api }

108

class StreamExecutionEnvironment {

109

def fromElements[T: TypeInformation](data: T*): DataStream[T]

110

def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]

111

def readTextFile(filePath: String): DataStream[String]

112

def socketTextStream(hostname: String, port: Int): DataStream[String]

113

}

114

115

class DataStream[T] {

116

def map[R: TypeInformation](fun: T => R): DataStream[R]

117

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]

118

def filter(fun: T => Boolean): DataStream[T]

119

def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

120

}

121

```

122

123

[Data Sources and Streams](./data-streams.md)

124

125

### Keyed Streams and Stateful Processing

126

127

Partitioned stream processing with state management and aggregations. Essential for stateful computations and windowed operations.

128

129

```scala { .api }

130

class KeyedStream[T, K] {

131

def reduce(fun: (T, T) => T): DataStream[T]

132

def sum(position: Int): DataStream[T]

133

def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]

134

def process[R: TypeInformation](processFunction: KeyedProcessFunction[K, T, R]): DataStream[R]

135

def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]

136

}

137

```

138

139

[Keyed Streams and State](./keyed-streams.md)

140

141

### Windowing Operations

142

143

Time and count-based windowing for bounded computations on infinite streams. Supports various window types and aggregation functions.

144

145

```scala { .api }

146

class WindowedStream[T, K, W <: Window] {

147

def reduce(function: (T, T) => T): DataStream[T]

148

def aggregate[ACC: TypeInformation, R: TypeInformation](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]

149

def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]

150

def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]

151

def allowedLateness(lateness: Time): WindowedStream[T, K, W]

152

}

153

```

154

155

[Windowing Operations](./windowing.md)

156

157

### Stream Connections and Joins

158

159

Multi-stream operations including unions, connects, joins, and co-processing. Enables complex multi-input stream processing patterns.

160

161

```scala { .api }

162

class DataStream[T] {

163

def union(dataStreams: DataStream[T]*): DataStream[T]

164

def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]

165

def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]

166

def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]

167

}

168

169

class ConnectedStreams[IN1, IN2] {

170

def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]

171

def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]

172

}

173

```

174

175

[Stream Connections and Joins](./stream-connections.md)

176

177

### Async I/O Operations

178

179

High-performance async operations for external service calls without blocking stream processing. Essential for enriching streams with external data.

180

181

```scala { .api }

182

object AsyncDataStream {

183

def unorderedWait[IN, OUT: TypeInformation](

184

input: DataStream[IN],

185

asyncFunction: AsyncFunction[IN, OUT],

186

timeout: Long,

187

timeUnit: TimeUnit

188

): DataStream[OUT]

189

190

def orderedWait[IN, OUT: TypeInformation](

191

input: DataStream[IN],

192

asyncFunction: AsyncFunction[IN, OUT],

193

timeout: Long,

194

timeUnit: TimeUnit

195

): DataStream[OUT]

196

}

197

198

trait AsyncFunction[IN, OUT] {

199

def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit

200

}

201

```

202

203

[Async I/O Operations](./async-io.md)

204

205

### Processing Functions

206

207

Low-level processing functions for complex event-driven logic with access to timers, state, and side outputs.

208

209

```scala { .api }

210

abstract class ProcessFunction[I, O] {

211

def processElement(value: I, ctx: Context, out: Collector[O]): Unit

212

def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit

213

}

214

215

abstract class KeyedProcessFunction[K, I, O] {

216

def processElement(value: I, ctx: Context, out: Collector[O]): Unit

217

def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit

218

}

219

```

220

221

[Processing Functions](./processing-functions.md)

222

223

### Window Functions

224

225

Specialized functions for processing windowed data with access to window metadata and state.

226

227

```scala { .api }

228

trait WindowFunction[IN, OUT, KEY, W <: Window] {

229

def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit

230

}

231

232

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {

233

def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

234

def clear(context: Context): Unit

235

}

236

```

237

238

[Window Functions](./window-functions.md)

239

240

### Sinks and Output

241

242

Output operations for writing processed data to external systems and monitoring stream results.

243

244

```scala { .api }

245

class DataStream[T] {

246

def print(): DataStreamSink[T]

247

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]

248

def sinkTo(sink: Sink[T]): DataStreamSink[T]

249

def executeAndCollect(): CloseableIterator[T]

250

}

251

252

trait SinkFunction[T] {

253

def invoke(value: T, context: Context): Unit

254

}

255

```

256

257

[Sinks and Output](./sinks-output.md)

258

259

## Types

260

261

```scala { .api }

262

// Core execution types

263

trait JobExecutionResult {

264

def getJobExecutionTime: Long

265

def getAccumulatorResult[V](accumulatorName: String): V

266

}

267

268

// Time and watermark types

269

class Time(val duration: Long, val unit: TimeUnit)

270

object Time {

271

def milliseconds(milliseconds: Long): Time

272

def seconds(seconds: Long): Time

273

def minutes(minutes: Long): Time

274

def hours(hours: Long): Time

275

def days(days: Long): Time

276

}

277

278

// Type information for Scala types

279

trait TypeInformation[T] {

280

def getTypeClass: Class[T]

281

def isBasicType: Boolean

282

def isTupleType: Boolean

283

}

284

285

// Output tag for side outputs

286

case class OutputTag[T: TypeInformation](id: String)

287

288

// Iterator for collecting results

289

trait CloseableIterator[T] extends Iterator[T] with AutoCloseable

290

```