or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

data-streams.mddocs/

0

# Data Sources and Streams

1

2

DataStream represents the core abstraction for processing streams of data in Flink. It provides a rich set of transformation operations while maintaining type safety through Scala's type system.

3

4

## Capabilities

5

6

### Stream Properties and Configuration

7

8

Access stream metadata and configure stream behavior.

9

10

```scala { .api }

11

class DataStream[T] {

12

/**

13

* Get the type information for stream elements

14

* @return TypeInformation for type T

15

*/

16

def dataType: TypeInformation[T]

17

18

/**

19

* Get the execution environment associated with this stream

20

* @return StreamExecutionEnvironment instance

21

*/

22

def executionEnvironment: StreamExecutionEnvironment

23

24

/**

25

* Get the current parallelism for this stream

26

* @return Current parallelism degree

27

*/

28

def parallelism: Int

29

30

/**

31

* Set the parallelism for this operation

32

* @param parallelism Parallelism degree

33

* @return New DataStream with specified parallelism

34

*/

35

def setParallelism(parallelism: Int): DataStream[T]

36

37

/**

38

* Set the maximum parallelism for this operation

39

* @param maxParallelism Maximum parallelism degree

40

* @return New DataStream with specified max parallelism

41

*/

42

def setMaxParallelism(maxParallelism: Int): DataStream[T]

43

44

/**

45

* Set a name for this operation

46

* @param name Operator name

47

* @return New DataStream with specified name

48

*/

49

def name(name: String): DataStream[T]

50

51

/**

52

* Set a unique identifier for this operation

53

* @param uid Unique identifier

54

* @return New DataStream with specified UID

55

*/

56

def uid(uid: String): DataStream[T]

57

}

58

```

59

60

### Basic Transformations

61

62

Core transformation operations for modifying stream elements.

63

64

```scala { .api }

65

class DataStream[T] {

66

/**

67

* Apply a function to each element in the stream

68

* @param fun Mapping function from T to R

69

* @return DataStream of mapped elements

70

*/

71

def map[R: TypeInformation](fun: T => R): DataStream[R]

72

73

/**

74

* Apply a MapFunction to each element

75

* @param mapper MapFunction implementation

76

* @return DataStream of mapped elements

77

*/

78

def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]

79

80

/**

81

* Apply a function that returns multiple elements for each input

82

* @param fun Function returning TraversableOnce of R

83

* @return DataStream of flattened results

84

*/

85

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]

86

87

/**

88

* Apply a FlatMapFunction that outputs to a Collector

89

* @param fun Function that outputs to Collector

90

* @return DataStream of collected results

91

*/

92

def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]

93

94

/**

95

* Filter elements based on a predicate

96

* @param fun Predicate function returning Boolean

97

* @return DataStream of filtered elements

98

*/

99

def filter(fun: T => Boolean): DataStream[T]

100

101

/**

102

* Filter elements using a FilterFunction

103

* @param filter FilterFunction implementation

104

* @return DataStream of filtered elements

105

*/

106

def filter(filter: FilterFunction[T]): DataStream[T]

107

}

108

```

109

110

**Usage Examples:**

111

112

```scala

113

import org.apache.flink.streaming.api.scala._

114

115

val env = StreamExecutionEnvironment.getExecutionEnvironment

116

val numbers = env.fromElements(1, 2, 3, 4, 5)

117

118

// Map transformation

119

val doubled = numbers.map(_ * 2)

120

121

// FlatMap transformation

122

val words = env.fromElements("hello world", "scala flink")

123

.flatMap(_.split(" "))

124

125

// Filter transformation

126

val evenNumbers = numbers.filter(_ % 2 == 0)

127

128

// Chaining transformations

129

val result = numbers

130

.filter(_ > 2)

131

.map(_ * 3)

132

.filter(_ < 15)

133

```

134

135

### Stream Partitioning

136

137

Control how stream elements are distributed across parallel instances.

138

139

```scala { .api }

140

class DataStream[T] {

141

/**

142

* Partition by key using a key selector function

143

* @param fun Key selector function

144

* @return KeyedStream partitioned by the key

145

*/

146

def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

147

148

/**

149

* Partition by key using a KeySelector

150

* @param fun KeySelector implementation

151

* @return KeyedStream partitioned by the key

152

*/

153

def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K]

154

155

/**

156

* Custom partitioning using a Partitioner

157

* @param partitioner Custom partitioner implementation

158

* @param fun Key selector for partitioning

159

* @return DataStream with custom partitioning

160

*/

161

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]

162

163

/**

164

* Broadcast all elements to all downstream operators

165

* @return DataStream with broadcast partitioning

166

*/

167

def broadcast: DataStream[T]

168

169

/**

170

* Round-robin distribution across parallel instances

171

* @return DataStream with rebalanced partitioning

172

*/

173

def rebalance: DataStream[T]

174

175

/**

176

* Local round-robin within the same TaskManager

177

* @return DataStream with rescaled partitioning

178

*/

179

def rescale: DataStream[T]

180

181

/**

182

* Random distribution across parallel instances

183

* @return DataStream with shuffle partitioning

184

*/

185

def shuffle: DataStream[T]

186

187

/**

188

* Forward elements to next operator (no redistribution)

189

* @return DataStream with forward partitioning

190

*/

191

def forward: DataStream[T]

192

193

/**

194

* Send all elements to the first parallel instance

195

* @return DataStream with global partitioning

196

*/

197

def global: DataStream[T]

198

}

199

```

200

201

**Usage Examples:**

202

203

```scala

204

import org.apache.flink.streaming.api.scala._

205

206

case class User(id: Int, name: String, department: String)

207

208

val users = env.fromElements(

209

User(1, "Alice", "Engineering"),

210

User(2, "Bob", "Sales"),

211

User(3, "Charlie", "Engineering")

212

)

213

214

// Key by user department

215

val usersByDept = users.keyBy(_.department)

216

217

// Key by user ID

218

val usersById = users.keyBy(_.id)

219

220

// Rebalance for load distribution

221

val balanced = users.rebalance

222

223

// Broadcast to all downstream operators

224

val broadcast = users.broadcast

225

```

226

227

### Stream Union and Connections

228

229

Combine multiple streams into unified processing pipelines.

230

231

```scala { .api }

232

class DataStream[T] {

233

/**

234

* Union this stream with other streams of the same type

235

* @param dataStreams Other streams to union with

236

* @return DataStream containing elements from all input streams

237

*/

238

def union(dataStreams: DataStream[T]*): DataStream[T]

239

240

/**

241

* Connect this stream with another stream of different type

242

* @param dataStream Stream to connect with

243

* @return ConnectedStreams for co-processing

244

*/

245

def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]

246

247

/**

248

* Connect with a broadcast stream for broadcast state

249

* @param broadcastStream Broadcast stream to connect with

250

* @return BroadcastConnectedStream for broadcast processing

251

*/

252

def connect[R](broadcastStream: BroadcastStream[R]): BroadcastConnectedStream[T, R]

253

}

254

```

255

256

### Windowing (All-Window Operations)

257

258

Apply windowing operations on non-keyed streams.

259

260

```scala { .api }

261

class DataStream[T] {

262

/**

263

* Apply time-based tumbling windowing to all elements (deprecated)

264

* @param size Window size

265

* @return AllWindowedStream for aggregations

266

*/

267

@deprecated("Use windowAll(TumblingEventTimeWindows.of(size))", "1.12.0")

268

def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]

269

270

/**

271

* Apply time-based sliding windowing to all elements (deprecated)

272

* @param size Window size

273

* @param slide Slide interval

274

* @return AllWindowedStream for aggregations

275

*/

276

@deprecated("Use windowAll(SlidingEventTimeWindows.of(size, slide))", "1.12.0")

277

def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]

278

279

/**

280

* Apply count-based windowing to all elements

281

* @param size Window size (number of elements)

282

* @return AllWindowedStream for aggregations

283

*/

284

def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]

285

286

/**

287

* Apply sliding count-based windowing to all elements

288

* @param size Window size (number of elements)

289

* @param slide Slide size (number of elements)

290

* @return AllWindowedStream for aggregations

291

*/

292

def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]

293

294

/**

295

* Apply custom windowing to all elements

296

* @param assigner Window assigner implementation

297

* @return AllWindowedStream for aggregations

298

*/

299

def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]

300

}

301

```

302

303

### Time and Watermarks

304

305

Configure event time processing and watermark generation.

306

307

```scala { .api }

308

class DataStream[T] {

309

/**

310

* Assign timestamps and watermarks using a WatermarkStrategy

311

* @param watermarkStrategy Strategy for timestamp and watermark assignment

312

* @return DataStream with assigned timestamps

313

*/

314

def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]

315

316

/**

317

* Assign ascending timestamps (deprecated)

318

* @param extractor Function to extract timestamps

319

* @return DataStream with assigned timestamps

320

*/

321

def assignAscendingTimestamps(extractor: T => Long): DataStream[T]

322

}

323

```

324

325

**Usage Examples:**

326

327

```scala

328

import org.apache.flink.streaming.api.scala._

329

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}

330

import java.time.Duration

331

332

case class Event(id: String, timestamp: Long, value: Double)

333

334

val events = env.fromElements(

335

Event("A", 1000L, 1.0),

336

Event("B", 2000L, 2.0),

337

Event("C", 3000L, 3.0)

338

)

339

340

// Assign watermarks for event time processing

341

val eventsWithWatermarks = events

342

.assignTimestampsAndWatermarks(

343

WatermarkStrategy

344

.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))

345

.withTimestampAssigner(new SerializableTimestampAssigner[Event] {

346

override def extractTimestamp(element: Event, recordTimestamp: Long): Long =

347

element.timestamp

348

})

349

)

350

```

351

352

### Iterations

353

354

Create iterative processing patterns for complex algorithms.

355

356

```scala { .api }

357

class DataStream[T] {

358

/**

359

* Create an iteration with feedback loop

360

* @param stepFunction Function defining iteration step

361

* @param maxWaitTimeMillis Maximum wait time for iteration

362

* @return DataStream with iteration results

363

*/

364

def iterate[R](

365

stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),

366

maxWaitTimeMillis: Long = 0

367

): DataStream[R]

368

369

/**

370

* Create an iteration with connected streams

371

* @param stepFunction Function with connected streams step

372

* @param maxWaitTimeMillis Maximum wait time for iteration

373

* @return DataStream with iteration results

374

*/

375

def iterate[R, F: TypeInformation](

376

stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),

377

maxWaitTimeMillis: Long

378

): DataStream[R]

379

}

380

```

381

382

### Processing Functions

383

384

Apply custom processing logic with access to runtime context.

385

386

```scala { .api }

387

class DataStream[T] {

388

/**

389

* Apply a ProcessFunction for low-level processing

390

* @param processFunction ProcessFunction implementation

391

* @return DataStream with processed results

392

*/

393

def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]

394

}

395

```

396

397

### Side Outputs

398

399

Extract additional output streams from processing functions.

400

401

```scala { .api }

402

class DataStream[T] {

403

/**

404

* Get a side output stream by tag

405

* @param tag OutputTag identifying the side output

406

* @return DataStream of side output elements

407

*/

408

def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]

409

}

410

```

411

412

### Advanced Operations

413

414

Low-level operations for custom stream processing.

415

416

```scala { .api }

417

class DataStream[T] {

418

/**

419

* Apply a custom stream operator

420

* @param operatorName Name for the operator

421

* @param operator Custom operator implementation

422

* @return DataStream with custom transformation

423

*/

424

def transform[R: TypeInformation](

425

operatorName: String,

426

operator: OneInputStreamOperator[T, R]

427

): DataStream[R]

428

429

/**

430

* Cache this stream for reuse in multiple downstream operations

431

* @return CachedDataStream for reuse

432

*/

433

def cache(): CachedDataStream[T]

434

}

435

```

436

437

## Types

438

439

```scala { .api }

440

// Core function interfaces

441

trait MapFunction[T, R] {

442

def map(value: T): R

443

}

444

445

trait FlatMapFunction[T, R] {

446

def flatMap(value: T, out: Collector[R]): Unit

447

}

448

449

trait FilterFunction[T] {

450

def filter(value: T): Boolean

451

}

452

453

// Key selector interface

454

trait KeySelector[T, K] {

455

def getKey(value: T): K

456

}

457

458

// Partitioner interface

459

trait Partitioner[K] {

460

def partition(key: K, numPartitions: Int): Int

461

}

462

463

// Collector interface for output

464

trait Collector[T] {

465

def collect(record: T): Unit

466

def close(): Unit

467

}

468

469

// Output tag for side outputs

470

case class OutputTag[T: TypeInformation](id: String) {

471

def getTypeInfo: TypeInformation[T]

472

}

473

474

// Cached data stream

475

class CachedDataStream[T](dataStream: DataStream[T]) extends DataStream[T] {

476

def invalidateCache(): Unit

477

}

478

```