or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-streaming-scala_2-10

Apache Flink Scala API for DataStream processing with type-safe, functional programming constructs for building streaming data processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-streaming-scala_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala_2-10@1.3.0

0

# Apache Flink Scala API for DataStream Processing

1

2

Apache Flink Scala API provides a type-safe, functional programming interface for building streaming data processing applications. It wraps Flink's Java DataStream API with Scala-friendly constructs, enabling developers to use functional programming patterns, type safety, and expressive syntax for real-time stream processing with exactly-once processing guarantees.

3

4

## Package Information

5

6

- **Package Name**: flink-streaming-scala_2.10

7

- **Package Type**: Maven

8

- **Language**: Scala 2.10

9

- **Installation**: `org.apache.flink:flink-streaming-scala_2.10:1.3.3`

10

11

## Core Imports

12

13

```scala

14

import org.apache.flink.streaming.api.scala._

15

```

16

17

For specific functionality:

18

19

```scala

20

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}

21

import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}

22

import org.apache.flink.streaming.api.windowing.time.Time

23

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

24

```

25

26

## Basic Usage

27

28

```scala

29

import org.apache.flink.streaming.api.scala._

30

import org.apache.flink.streaming.api.windowing.time.Time

31

32

// Set up the execution environment

33

val env = StreamExecutionEnvironment.getExecutionEnvironment

34

35

// Create a data stream from a collection

36

val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

37

38

// Transform the data

39

val result = stream

40

.filter(_ % 2 == 0) // Filter even numbers

41

.map(_ * 2) // Double each number

42

.keyBy(identity) // Key by value

43

.timeWindow(Time.seconds(5)) // 5-second tumbling windows

44

.sum(0) // Sum values in each window

45

46

// Add a sink to print results

47

result.print()

48

49

// Execute the streaming program

50

env.execute("Basic Flink Scala Example")

51

```

52

53

## Architecture

54

55

Flink Scala API is built around several key architectural components:

56

57

- **StreamExecutionEnvironment**: Entry point for creating and configuring streaming applications

58

- **DataStream**: Core abstraction representing unbounded streams of elements with type safety

59

- **KeyedStream**: Partitioned streams enabling stateful operations and keyed transformations

60

- **WindowedStream**: Time or count-based partitioned streams for aggregations over bounded intervals

61

- **Function Interfaces**: Type-safe interfaces for user-defined operations (transformations, aggregations, windows)

62

- **Type System Integration**: Automatic TypeInformation generation via Scala macros for serialization

63

- **State Management**: Managed state with exactly-once consistency guarantees and fault tolerance

64

- **Time Processing**: Support for event time, processing time, and ingestion time with watermark handling

65

66

## Capabilities

67

68

### Stream Environment and Execution

69

70

Core functionality for creating and configuring Flink streaming applications, including environment setup, parallelism control, checkpointing, and program execution.

71

72

```scala { .api }

73

object StreamExecutionEnvironment {

74

def getExecutionEnvironment: StreamExecutionEnvironment

75

def createLocalEnvironment(): StreamExecutionEnvironment

76

def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment

77

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment

78

}

79

80

class StreamExecutionEnvironment {

81

def setParallelism(parallelism: Int): Unit

82

def getParallelism: Int

83

def enableCheckpointing(interval: Long): Unit

84

def execute(): JobExecutionResult

85

def execute(jobName: String): JobExecutionResult

86

}

87

```

88

89

[Stream Environment and Execution](./stream-environment.md)

90

91

### Data Sources and Stream Creation

92

93

Comprehensive functionality for creating DataStreams from various sources including collections, files, sockets, and custom source functions.

94

95

```scala { .api }

96

class StreamExecutionEnvironment {

97

def fromElements[T: TypeInformation](data: T*): DataStream[T]

98

def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]

99

def readTextFile(filePath: String): DataStream[String]

100

def socketTextStream(hostname: String, port: Int): DataStream[String]

101

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]

102

def generateSequence(from: Long, to: Long): DataStream[Long]

103

}

104

```

105

106

[Data Sources and Stream Creation](./data-sources.md)

107

108

### Stream Transformations and Operations

109

110

Core stream processing operations including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.

111

112

```scala { .api }

113

class DataStream[T] {

114

def map[R: TypeInformation](fun: T => R): DataStream[R]

115

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]

116

def filter(fun: T => Boolean): DataStream[T]

117

def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]

118

}

119

```

120

121

[Stream Transformations and Operations](./stream-transformations.md)

122

123

### Stream Partitioning and Distribution

124

125

Functionality for controlling data distribution across parallel operators including key-based partitioning, broadcasting, and custom partitioning strategies.

126

127

```scala { .api }

128

class DataStream[T] {

129

def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

130

def keyBy(fields: Int*): KeyedStream[T, _]

131

def keyBy(firstField: String, otherFields: String*): KeyedStream[T, _]

132

def broadcast: DataStream[T]

133

def shuffle: DataStream[T]

134

def rebalance: DataStream[T]

135

}

136

```

137

138

[Stream Partitioning and Distribution](./stream-partitioning.md)

139

140

### Keyed Streams and Stateful Processing

141

142

Advanced operations on partitioned streams including stateful transformations, aggregations, and state management with exactly-once consistency.

143

144

```scala { .api }

145

class KeyedStream[T, K] {

146

def reduce(fun: (T, T) => T): DataStream[T]

147

def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]

148

def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]

149

def sum(position: Int): DataStream[T]

150

def min(position: Int): DataStream[T]

151

def max(position: Int): DataStream[T]

152

}

153

```

154

155

[Keyed Streams and Stateful Processing](./keyed-streams.md)

156

157

### Windowing and Time-Based Processing

158

159

Comprehensive windowing functionality for both keyed and non-keyed streams, including time-based and count-based windows with custom triggers and evictors.

160

161

```scala { .api }

162

class KeyedStream[T, K] {

163

def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]

164

def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]

165

def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]

166

def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]

167

}

168

169

class DataStream[T] {

170

def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]

171

def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]

172

}

173

```

174

175

[Windowing and Time-Based Processing](./windowing.md)

176

177

### Window Operations and Aggregations

178

179

Operations that can be applied to windowed streams including built-in aggregations, custom window functions, and incremental aggregations.

180

181

```scala { .api }

182

class WindowedStream[T, K, W <: Window] {

183

def reduce(function: (T, T) => T): DataStream[T]

184

def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]

185

def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]

186

def sum(position: Int): DataStream[T]

187

def min(position: Int): DataStream[T]

188

def max(position: Int): DataStream[T]

189

}

190

```

191

192

[Window Operations and Aggregations](./window-operations.md)

193

194

### Stream Composition and Joining

195

196

Advanced stream composition operations including union, connect, join, and co-group operations for processing multiple streams together.

197

198

```scala { .api }

199

class DataStream[T] {

200

def union(dataStreams: DataStream[T]*): DataStream[T]

201

def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]

202

def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]

203

def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]

204

}

205

206

class ConnectedStreams[IN1, IN2] {

207

def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]

208

def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]

209

}

210

```

211

212

[Stream Composition and Joining](./stream-composition.md)

213

214

### Asynchronous I/O Operations

215

216

High-performance asynchronous I/O operations for external system integration with configurable parallelism, timeouts, and result ordering.

217

218

```scala { .api }

219

object AsyncDataStream {

220

def unorderedWait[IN, OUT: TypeInformation](

221

input: DataStream[IN],

222

asyncFunction: AsyncFunction[IN, OUT],

223

timeout: Long,

224

timeUnit: TimeUnit

225

): DataStream[OUT]

226

227

def orderedWait[IN, OUT: TypeInformation](

228

input: DataStream[IN],

229

asyncFunction: AsyncFunction[IN, OUT],

230

timeout: Long,

231

timeUnit: TimeUnit

232

): DataStream[OUT]

233

}

234

235

trait AsyncFunction[IN, OUT] {

236

def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit

237

}

238

```

239

240

[Asynchronous I/O Operations](./async-operations.md)

241

242

### Output Operations and Sinks

243

244

Comprehensive output functionality for writing stream results to various destinations including files, databases, message queues, and custom sinks.

245

246

```scala { .api }

247

class DataStream[T] {

248

def print(): DataStreamSink[T]

249

def writeAsText(path: String): DataStreamSink[T]

250

def writeAsCsv(path: String): DataStreamSink[T]

251

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]

252

def addSink(fun: T => Unit): DataStreamSink[T]

253

}

254

```

255

256

[Output Operations and Sinks](./output-operations.md)

257

258

### Function Interfaces and User-Defined Functions

259

260

Type-safe interfaces for implementing custom processing logic including window functions, process functions, and rich functions with lifecycle management.

261

262

```scala { .api }

263

trait WindowFunction[IN, OUT, KEY, W <: Window] {

264

def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit

265

}

266

267

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {

268

def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

269

def clear(context: Context): Unit

270

}

271

272

trait AsyncFunction[IN, OUT] {

273

def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit

274

}

275

```

276

277

[Function Interfaces and User-Defined Functions](./function-interfaces.md)

278

279

### Scala Extensions and Partial Functions

280

281

Scala-specific extensions that enable partial function support for more idiomatic Scala programming with automatic conversion between partial and total functions.

282

283

```scala { .api }

284

import org.apache.flink.streaming.api.scala.extensions._

285

286

class OnDataStream[T] {

287

def mapWith[R: TypeInformation](fun: PartialFunction[T, R]): DataStream[R]

288

def filterWith(fun: PartialFunction[T, Boolean]): DataStream[T]

289

def flatMapWith[R: TypeInformation](fun: PartialFunction[T, TraversableOnce[R]]): DataStream[R]

290

}

291

```

292

293

[Scala Extensions and Partial Functions](./scala-extensions.md)

294

295

## Types

296

297

### Core Stream Types

298

299

```scala { .api }

300

class DataStream[T]

301

class KeyedStream[T, K] extends DataStream[T]

302

class ConnectedStreams[IN1, IN2]

303

class WindowedStream[T, K, W <: Window]

304

class AllWindowedStream[T, W <: Window]

305

class SplitStream[T] extends DataStream[T]

306

```

307

308

### Environment and Configuration Types

309

310

```scala { .api }

311

class StreamExecutionEnvironment

312

class ExecutionConfig

313

class CheckpointConfig

314

class RestartStrategies

315

```

316

317

### Function Interface Types

318

319

```scala { .api }

320

trait WindowFunction[IN, OUT, KEY, W <: Window]

321

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]

322

abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]

323

trait AsyncFunction[IN, OUT]

324

trait AsyncCollector[OUT]

325

```

326

327

### Builder Types

328

329

```scala { .api }

330

class JoinedStreams[T1, T2]

331

class CoGroupedStreams[T1, T2]

332

class OutputTag[T]

333

```