or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

keyed-streams.mddocs/

0

# Keyed Streams and State

1

2

KeyedStream represents a partitioned stream where elements are grouped by key, enabling stateful operations, aggregations, and windowing. This is essential for maintaining state per key and performing keyed computations.

3

4

## Capabilities

5

6

### Stream Properties

7

8

Access key type information and stream metadata.

9

10

```scala { .api }

11

class KeyedStream[T, K] {

12

/**

13

* Get the type information for the key

14

* @return TypeInformation for key type K

15

*/

16

def getKeyType: TypeInformation[K]

17

}

18

```

19

20

### Aggregation Operations

21

22

Built-in aggregation functions for common operations.

23

24

```scala { .api }

25

class KeyedStream[T, K] {

26

/**

27

* Reduce elements using a reduction function

28

* @param reducer Function to combine two elements

29

* @return DataStream with reduced elements per key

30

*/

31

def reduce(reducer: ReduceFunction[T]): DataStream[T]

32

33

/**

34

* Reduce elements using a function

35

* @param fun Function to combine two elements

36

* @return DataStream with reduced elements per key

37

*/

38

def reduce(fun: (T, T) => T): DataStream[T]

39

40

/**

41

* Sum numeric field by position

42

* @param position Field position for summation

43

* @return DataStream with summed values per key

44

*/

45

def sum(position: Int): DataStream[T]

46

47

/**

48

* Sum numeric field by name

49

* @param field Field name for summation

50

* @return DataStream with summed values per key

51

*/

52

def sum(field: String): DataStream[T]

53

54

/**

55

* Get maximum value by field position

56

* @param position Field position for maximum

57

* @return DataStream with maximum values per key

58

*/

59

def max(position: Int): DataStream[T]

60

61

/**

62

* Get maximum value by field name

63

* @param field Field name for maximum

64

* @return DataStream with maximum values per key

65

*/

66

def max(field: String): DataStream[T]

67

68

/**

69

* Get minimum value by field position

70

* @param position Field position for minimum

71

* @return DataStream with minimum values per key

72

*/

73

def min(position: Int): DataStream[T]

74

75

/**

76

* Get minimum value by field name

77

* @param field Field name for minimum

78

* @return DataStream with minimum values per key

79

*/

80

def min(field: String): DataStream[T]

81

82

/**

83

* Get element with maximum value by field position

84

* @param position Field position for maximum element

85

* @return DataStream with maximum elements per key

86

*/

87

def maxBy(position: Int): DataStream[T]

88

89

/**

90

* Get element with maximum value by field name

91

* @param field Field name for maximum element

92

* @return DataStream with maximum elements per key

93

*/

94

def maxBy(field: String): DataStream[T]

95

96

/**

97

* Get element with minimum value by field position

98

* @param position Field position for minimum element

99

* @return DataStream with minimum elements per key

100

*/

101

def minBy(position: Int): DataStream[T]

102

103

/**

104

* Get element with minimum value by field name

105

* @param field Field name for minimum element

106

* @return DataStream with minimum elements per key

107

*/

108

def minBy(field: String): DataStream[T]

109

}

110

```

111

112

**Usage Examples:**

113

114

```scala

115

import org.apache.flink.streaming.api.scala._

116

117

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

118

119

val readings = env.fromElements(

120

SensorReading("sensor1", 20.0, 1000L),

121

SensorReading("sensor1", 25.0, 2000L),

122

SensorReading("sensor2", 15.0, 1500L),

123

SensorReading("sensor2", 18.0, 2500L)

124

)

125

126

val keyedReadings = readings.keyBy(_.sensorId)

127

128

// Sum temperatures by sensor

129

val totalTemps = keyedReadings.sum("temperature")

130

131

// Get maximum temperature per sensor

132

val maxTemps = keyedReadings.max("temperature")

133

134

// Get reading with maximum temperature per sensor

135

val maxTempReadings = keyedReadings.maxBy("temperature")

136

137

// Custom reduction - average temperature

138

val avgTemps = keyedReadings.reduce((r1, r2) =>

139

SensorReading(r1.sensorId, (r1.temperature + r2.temperature) / 2, math.max(r1.timestamp, r2.timestamp))

140

)

141

```

142

143

### Windowing Operations

144

145

Apply time or count-based windows to keyed streams.

146

147

```scala { .api }

148

class KeyedStream[T, K] {

149

/**

150

* Apply time-based tumbling windowing (deprecated)

151

* @param size Window size

152

* @return WindowedStream for aggregations

153

*/

154

@deprecated("Use window(TumblingEventTimeWindows.of(size))", "1.12.0")

155

def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]

156

157

/**

158

* Apply time-based sliding windowing (deprecated)

159

* @param size Window size

160

* @param slide Slide interval

161

* @return WindowedStream for aggregations

162

*/

163

@deprecated("Use window(SlidingEventTimeWindows.of(size, slide))", "1.12.0")

164

def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]

165

166

/**

167

* Apply count-based windowing

168

* @param size Window size (number of elements)

169

* @return WindowedStream for aggregations

170

*/

171

def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]

172

173

/**

174

* Apply sliding count-based windowing

175

* @param size Window size (number of elements)

176

* @param slide Slide size (number of elements)

177

* @return WindowedStream for aggregations

178

*/

179

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]

180

181

/**

182

* Apply custom windowing

183

* @param assigner Window assigner implementation

184

* @return WindowedStream for aggregations

185

*/

186

def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]

187

}

188

```

189

190

**Usage Examples:**

191

192

```scala

193

import org.apache.flink.streaming.api.scala._

194

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

195

import org.apache.flink.streaming.api.windowing.time.Time

196

197

val keyedReadings = readings.keyBy(_.sensorId)

198

199

// Count-based window - every 5 readings per sensor

200

val countWindow = keyedReadings

201

.countWindow(5)

202

.reduce((r1, r2) => SensorReading(r1.sensorId, math.max(r1.temperature, r2.temperature), r2.timestamp))

203

204

// Time-based window - 1 minute tumbling windows

205

val timeWindow = keyedReadings

206

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

207

.reduce((r1, r2) => SensorReading(r1.sensorId, (r1.temperature + r2.temperature) / 2, r2.timestamp))

208

```

209

210

### Stateful Operations

211

212

Manage per-key state for complex processing logic.

213

214

```scala { .api }

215

class KeyedStream[T, K] {

216

/**

217

* Map with per-key state management

218

* @param fun Function with state access: (value, state) => (result, newState)

219

* @return DataStream with stateful mapping results

220

*/

221

def mapWithState[R: TypeInformation, S: TypeInformation](

222

fun: (T, Option[S]) => (R, Option[S])

223

): DataStream[R]

224

225

/**

226

* FlatMap with per-key state management

227

* @param fun Function with state access: (value, state) => (results, newState)

228

* @return DataStream with stateful flatMap results

229

*/

230

def flatMapWithState[R: TypeInformation, S: TypeInformation](

231

fun: (T, Option[S]) => (TraversableOnce[R], Option[S])

232

): DataStream[R]

233

234

/**

235

* Filter with per-key state management

236

* @param fun Function with state access: (value, state) => (keep, newState)

237

* @return DataStream with stateful filtering results

238

*/

239

def filterWithState[S: TypeInformation](

240

fun: (T, Option[S]) => (Boolean, Option[S])

241

): DataStream[T]

242

}

243

```

244

245

**Usage Examples:**

246

247

```scala

248

import org.apache.flink.streaming.api.scala._

249

250

case class Event(key: String, value: Int, timestamp: Long)

251

252

val events = env.fromElements(

253

Event("A", 1, 1000L),

254

Event("A", 2, 2000L),

255

Event("B", 5, 1500L),

256

Event("A", 3, 3000L)

257

)

258

259

val keyedEvents = events.keyBy(_.key)

260

261

// Count events per key with state

262

val eventCounts = keyedEvents.mapWithState[Int, Int] { (event, count) =>

263

val newCount = count.getOrElse(0) + 1

264

(newCount, Some(newCount))

265

}

266

267

// Running sum with state

268

val runningSums = keyedEvents.mapWithState[Int, Int] { (event, sum) =>

269

val newSum = sum.getOrElse(0) + event.value

270

(newSum, Some(newSum))

271

}

272

273

// Filter events based on count state (only keep first 3 events per key)

274

val limitedEvents = keyedEvents.filterWithState[Int] { (event, count) =>

275

val currentCount = count.getOrElse(0) + 1

276

(currentCount <= 3, Some(currentCount))

277

}

278

```

279

280

### Processing Functions

281

282

Apply custom processing logic with access to timers and state.

283

284

```scala { .api }

285

class KeyedStream[T, K] {

286

/**

287

* Apply a KeyedProcessFunction for low-level processing

288

* @param keyedProcessFunction ProcessFunction implementation with key access

289

* @return DataStream with processed results

290

*/

291

def process[R: TypeInformation](

292

keyedProcessFunction: KeyedProcessFunction[K, T, R]

293

): DataStream[R]

294

}

295

```

296

297

### Queryable State

298

299

Make keyed state queryable from external applications.

300

301

```scala { .api }

302

class KeyedStream[T, K] {

303

/**

304

* Make stream queryable with default state descriptor

305

* @param queryableStateName Name for queryable state

306

* @return QueryableStateStream for external queries

307

*/

308

def asQueryableState(queryableStateName: String): QueryableStateStream[K, T]

309

310

/**

311

* Make stream queryable with custom ValueStateDescriptor

312

* @param queryableStateName Name for queryable state

313

* @param stateDescriptor State descriptor for value state

314

* @return QueryableStateStream for external queries

315

*/

316

def asQueryableState(

317

queryableStateName: String,

318

stateDescriptor: ValueStateDescriptor[T]

319

): QueryableStateStream[K, T]

320

321

/**

322

* Make stream queryable with ReducingStateDescriptor

323

* @param queryableStateName Name for queryable state

324

* @param stateDescriptor State descriptor for reducing state

325

* @return QueryableStateStream for external queries

326

*/

327

def asQueryableState(

328

queryableStateName: String,

329

stateDescriptor: ReducingStateDescriptor[T]

330

): QueryableStateStream[K, T]

331

}

332

```

333

334

### Interval Joins

335

336

Join with another keyed stream within a time interval.

337

338

```scala { .api }

339

class KeyedStream[T, K] {

340

/**

341

* Create an interval join with another keyed stream

342

* @param otherStream Other keyed stream to join with

343

* @return IntervalJoin for configuring join parameters

344

*/

345

def intervalJoin[OTHER](otherStream: KeyedStream[OTHER, K]): IntervalJoin[T, OTHER, K]

346

}

347

348

// IntervalJoin configuration

349

class IntervalJoin[IN1, IN2, KEY] {

350

/**

351

* Define the time interval for the join

352

* @param lowerBound Lower bound of the time interval

353

* @param upperBound Upper bound of the time interval

354

* @return IntervalJoined for processing configuration

355

*/

356

def between(lowerBound: Time, upperBound: Time): IntervalJoined[IN1, IN2, KEY]

357

}

358

359

class IntervalJoined[IN1, IN2, KEY] {

360

/**

361

* Make lower bound exclusive

362

* @return IntervalJoined with exclusive lower bound

363

*/

364

def lowerBoundExclusive(): IntervalJoined[IN1, IN2, KEY]

365

366

/**

367

* Make upper bound exclusive

368

* @return IntervalJoined with exclusive upper bound

369

*/

370

def upperBoundExclusive(): IntervalJoined[IN1, IN2, KEY]

371

372

/**

373

* Process joined elements with a ProcessJoinFunction

374

* @param processJoinFunction Function to process joined elements

375

* @return DataStream with join results

376

*/

377

def process[OUT: TypeInformation](

378

processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT]

379

): DataStream[OUT]

380

}

381

```

382

383

**Usage Examples:**

384

385

```scala

386

import org.apache.flink.streaming.api.scala._

387

import org.apache.flink.streaming.api.windowing.time.Time

388

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction

389

import org.apache.flink.util.Collector

390

391

case class Order(id: String, customerId: String, amount: Double, timestamp: Long)

392

case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)

393

case class OrderPayment(orderId: String, paymentId: String, customerId: String, timestamp: Long)

394

395

val orders = env.fromElements(

396

Order("o1", "c1", 100.0, 1000L),

397

Order("o2", "c2", 200.0, 2000L)

398

).keyBy(_.customerId)

399

400

val payments = env.fromElements(

401

Payment("p1", "c1", 100.0, 1100L),

402

Payment("p2", "c2", 200.0, 2100L)

403

).keyBy(_.customerId)

404

405

// Join orders and payments within 5 minutes

406

val joined = orders

407

.intervalJoin(payments)

408

.between(Time.minutes(-5), Time.minutes(5))

409

.process(new ProcessJoinFunction[Order, Payment, OrderPayment] {

410

override def processElement(

411

left: Order,

412

right: Payment,

413

ctx: ProcessJoinFunction[Order, Payment, OrderPayment]#Context,

414

out: Collector[OrderPayment]

415

): Unit = {

416

out.collect(OrderPayment(left.id, right.id, left.customerId, math.max(left.timestamp, right.timestamp)))

417

}

418

})

419

```

420

421

## Types

422

423

```scala { .api }

424

// Reduce function interface

425

trait ReduceFunction[T] {

426

def reduce(value1: T, value2: T): T

427

}

428

429

// Process function for keyed streams

430

abstract class KeyedProcessFunction[K, I, O] {

431

def processElement(value: I, ctx: Context, out: Collector[O]): Unit

432

def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}

433

434

abstract class Context {

435

def timestamp(): Long

436

def getCurrentKey: K

437

def timerService(): TimerService

438

def output[X](outputTag: OutputTag[X], value: X): Unit

439

}

440

441

abstract class OnTimerContext extends Context {

442

def timeDomain(): TimeDomain

443

}

444

}

445

446

// Process join function for interval joins

447

abstract class ProcessJoinFunction[IN1, IN2, OUT] {

448

def processElement(left: IN1, right: IN2, ctx: Context, out: Collector[OUT]): Unit

449

450

abstract class Context {

451

def getLeftTimestamp: Long

452

def getRightTimestamp: Long

453

def getCurrentWatermark: Long

454

}

455

}

456

457

// Queryable state stream

458

class QueryableStateStream[K, V] {

459

def getQueryableStateName: String

460

def getKeyType: TypeInformation[K]

461

def getValueType: TypeInformation[V]

462

}

463

464

// Timer service for processing functions

465

trait TimerService {

466

def currentProcessingTime(): Long

467

def currentWatermark(): Long

468

def registerProcessingTimeTimer(time: Long): Unit

469

def registerEventTimeTimer(time: Long): Unit

470

def deleteProcessingTimeTimer(time: Long): Unit

471

def deleteEventTimeTimer(time: Long): Unit

472

}

473

474

// Time domain enum

475

sealed trait TimeDomain

476

object TimeDomain {

477

case object EVENT_TIME extends TimeDomain

478

case object PROCESSING_TIME extends TimeDomain

479

}

480

481

// State descriptors for queryable state

482

class ValueStateDescriptor[T](name: String, typeInfo: TypeInformation[T])

483

class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeInfo: TypeInformation[T])

484

```