or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md

datastream-transformations.mddocs/

0

# DataStream Transformations

1

2

DataStream transformations are the core operations for processing unbounded streams of data in Apache Flink. These operations transform one or more DataStreams into new DataStreams, enabling complex data processing pipelines.

3

4

## Capabilities

5

6

### Basic Transformations

7

8

Transform individual elements in the stream using map, filter, and flatMap operations.

9

10

```java { .api }

11

/**

12

* Apply a MapFunction to transform each element

13

* @param mapper - the map function to apply

14

* @return transformed DataStream

15

*/

16

<R> DataStream<R> map(MapFunction<T, R> mapper);

17

18

/**

19

* Filter elements based on a predicate

20

* @param filter - the filter function

21

* @return filtered DataStream

22

*/

23

DataStream<T> filter(FilterFunction<T> filter);

24

25

/**

26

* Apply a FlatMapFunction that can produce zero, one, or more elements for each input

27

* @param flatMapper - the flatmap function

28

* @return transformed DataStream

29

*/

30

<R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);

31

```

32

33

**Usage Examples:**

34

35

```java

36

DataStream<String> text = env.fromElements("hello world", "flink streaming");

37

38

// Map transformation - convert to uppercase

39

DataStream<String> upperCase = text.map(new MapFunction<String, String>() {

40

@Override

41

public String map(String value) {

42

return value.toUpperCase();

43

}

44

});

45

46

// Using lambda expressions

47

DataStream<String> upperCase = text.map(String::toUpperCase);

48

49

// Filter transformation - keep only strings with more than 5 characters

50

DataStream<String> filtered = text.filter(s -> s.length() > 5);

51

52

// FlatMap transformation - split sentences into words

53

DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {

54

@Override

55

public void flatMap(String sentence, Collector<String> out) {

56

for (String word : sentence.split(" ")) {

57

out.collect(word);

58

}

59

}

60

});

61

62

// Using lambda expressions

63

DataStream<String> words = text.flatMap(

64

(sentence, out) -> Arrays.stream(sentence.split(" ")).forEach(out::collect)

65

);

66

```

67

68

### Stream Partitioning

69

70

Control how data is distributed across parallel instances of operators.

71

72

```java { .api }

73

/**

74

* Partition the stream by key

75

* @param key - the key selector function

76

* @return KeyedStream partitioned by the key

77

*/

78

<K> KeyedStream<T, K> keyBy(KeySelector<T, K> key);

79

80

/**

81

* Partition by field positions (for Tuple types)

82

* @param fields - field positions to partition by

83

* @return KeyedStream partitioned by the fields

84

*/

85

KeyedStream<T, Tuple> keyBy(int... fields);

86

87

/**

88

* Partition by field names (for POJO types)

89

* @param fields - field names to partition by

90

* @return KeyedStream partitioned by the fields

91

*/

92

KeyedStream<T, Tuple> keyBy(String... fields);

93

94

/**

95

* Random partitioning - elements are randomly distributed

96

* @return randomly partitioned DataStream

97

*/

98

DataStream<T> shuffle();

99

100

/**

101

* Round-robin partitioning - elements are distributed in round-robin fashion

102

* @return rebalanced DataStream

103

*/

104

DataStream<T> rebalance();

105

106

/**

107

* Rescale partitioning - locally rebalance between upstream and downstream operators

108

* @return rescaled DataStream

109

*/

110

DataStream<T> rescale();

111

112

/**

113

* Broadcast - send elements to all downstream operators

114

* @return broadcasted DataStream

115

*/

116

DataStream<T> broadcast();

117

118

/**

119

* Forward partitioning - send elements to the next operator in the same subtask

120

* @return forwarded DataStream

121

*/

122

DataStream<T> forward();

123

124

/**

125

* Global partitioning - send all elements to the first instance of the next operator

126

* @return globally partitioned DataStream

127

*/

128

DataStream<T> global();

129

```

130

131

**Usage Examples:**

132

133

```java

134

DataStream<Tuple2<String, Integer>> tuples = env.fromElements(

135

Tuple2.of("a", 1), Tuple2.of("b", 2), Tuple2.of("a", 3)

136

);

137

138

// Key by field position

139

KeyedStream<Tuple2<String, Integer>, Tuple> keyedByPosition = tuples.keyBy(0);

140

141

// Key by lambda function

142

KeyedStream<Tuple2<String, Integer>, String> keyedByFunction =

143

tuples.keyBy(value -> value.f0);

144

145

// For POJO types

146

DataStream<Person> people = env.fromElements(new Person("John", 25), new Person("Jane", 30));

147

KeyedStream<Person, String> keyedByName = people.keyBy(person -> person.getName());

148

149

// Partitioning strategies

150

DataStream<String> shuffled = text.shuffle();

151

DataStream<String> rebalanced = text.rebalance();

152

DataStream<String> broadcasted = text.broadcast();

153

```

154

155

### Stream Composition

156

157

Combine multiple streams into a single stream or create connected streams for joint processing.

158

159

```java { .api }

160

/**

161

* Union with other DataStreams of the same type

162

* @param streams - streams to union with

163

* @return unified DataStream

164

*/

165

DataStream<T> union(DataStream<T>... streams);

166

167

/**

168

* Connect with another DataStream for joint processing

169

* @param dataStream - stream to connect with

170

* @return ConnectedStreams for joint processing

171

*/

172

<R> ConnectedStreams<T, R> connect(DataStream<R> dataStream);

173

```

174

175

**Usage Examples:**

176

177

```java

178

DataStream<String> stream1 = env.fromElements("a", "b");

179

DataStream<String> stream2 = env.fromElements("c", "d");

180

DataStream<String> stream3 = env.fromElements("e", "f");

181

182

// Union streams of the same type

183

DataStream<String> unionedStream = stream1.union(stream2, stream3);

184

185

// Connect streams of different types

186

DataStream<Integer> numbers = env.fromElements(1, 2, 3);

187

ConnectedStreams<String, Integer> connected = stream1.connect(numbers);

188

189

// Process connected streams

190

DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {

191

@Override

192

public String map1(String value) {

193

return "String: " + value;

194

}

195

196

@Override

197

public String map2(Integer value) {

198

return "Number: " + value;

199

}

200

});

201

```

202

203

### Rich Transformations

204

205

Use rich functions that provide access to runtime context and lifecycle methods.

206

207

```java { .api }

208

/**

209

* Apply a RichMapFunction with access to runtime context

210

* @param mapper - the rich map function

211

* @return transformed DataStream

212

*/

213

<R> DataStream<R> map(RichMapFunction<T, R> mapper);

214

215

/**

216

* Apply a RichFilterFunction with access to runtime context

217

* @param filter - the rich filter function

218

* @return filtered DataStream

219

*/

220

DataStream<T> filter(RichFilterFunction<T> filter);

221

222

/**

223

* Apply a RichFlatMapFunction with access to runtime context

224

* @param flatMapper - the rich flatmap function

225

* @return transformed DataStream

226

*/

227

<R> DataStream<R> flatMap(RichFlatMapFunction<T, R> flatMapper);

228

```

229

230

**Usage Examples:**

231

232

```java

233

// Rich function with initialization

234

DataStream<String> enriched = text.map(new RichMapFunction<String, String>() {

235

private String prefix;

236

237

@Override

238

public void open(Configuration parameters) {

239

prefix = getRuntimeContext().getExecutionConfig()

240

.getGlobalJobParameters().get("prefix", "default");

241

}

242

243

@Override

244

public String map(String value) {

245

return prefix + ": " + value;

246

}

247

});

248

```

249

250

### Process Functions

251

252

Use process functions for complex processing logic with access to timers and state.

253

254

```java { .api }

255

/**

256

* Apply a ProcessFunction for complex stream processing

257

* @param processFunction - the process function

258

* @return processed DataStream

259

*/

260

<R> DataStream<R> process(ProcessFunction<T, R> processFunction);

261

```

262

263

**Usage Examples:**

264

265

```java

266

DataStream<String> processed = text.process(new ProcessFunction<String, String>() {

267

@Override

268

public void processElement(String value, Context ctx, Collector<String> out) {

269

// Custom processing logic

270

if (value.length() > 0) {

271

out.collect("Processed: " + value);

272

273

// Set timer for 60 seconds from now

274

ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 60000);

275

}

276

}

277

278

@Override

279

public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {

280

out.collect("Timer fired at: " + timestamp);

281

}

282

});

283

```

284

285

### Stream Splitting (Deprecated)

286

287

Note: Stream splitting using split() and select() is deprecated in newer versions. Use side outputs instead.

288

289

```java { .api }

290

/**

291

* Split the stream based on an OutputSelector (DEPRECATED)

292

* @param outputSelector - selector to determine output streams

293

* @return SplitStream for selecting split streams

294

*/

295

@Deprecated

296

SplitStream<T> split(OutputSelector<T> outputSelector);

297

```

298

299

### Side Outputs

300

301

Use side outputs to emit data to multiple output streams from a single operator.

302

303

```java { .api }

304

// Side outputs are used within ProcessFunction

305

public void processElement(T element, Context ctx, Collector<R> out) {

306

// Emit to main output

307

out.collect(mainResult);

308

309

// Emit to side output

310

ctx.output(sideOutputTag, sideResult);

311

}

312

313

// Retrieve side output from SingleOutputStreamOperator

314

DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);

315

```

316

317

**Usage Examples:**

318

319

```java

320

// Define side output tag

321

final OutputTag<String> lateDataTag = new OutputTag<String>("late-data"){};

322

323

// Process function with side output

324

SingleOutputStreamOperator<String> mainStream = input.process(

325

new ProcessFunction<String, String>() {

326

@Override

327

public void processElement(String value, Context ctx, Collector<String> out) {

328

if (isLate(value)) {

329

// Emit to side output

330

ctx.output(lateDataTag, value);

331

} else {

332

// Emit to main output

333

out.collect(value);

334

}

335

}

336

}

337

);

338

339

// Get side output stream

340

DataStream<String> lateData = mainStream.getSideOutput(lateDataTag);

341

```

342

343

### Iteration

344

345

Create iterative streaming programs for machine learning and graph processing.

346

347

```java { .api }

348

/**

349

* Create an iterative stream

350

* @return IterativeStream for iteration processing

351

*/

352

IterativeStream<T> iterate();

353

354

/**

355

* Create an iterative stream with timeout

356

* @param maxWaitTimeMillis - maximum wait time for iteration

357

* @return IterativeStream for iteration processing

358

*/

359

IterativeStream<T> iterate(long maxWaitTimeMillis);

360

```

361

362

**Usage Examples:**

363

364

```java

365

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

366

367

IterativeStream<Long> iteration = someIntegers.iterate();

368

369

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {

370

@Override

371

public Long map(Long value) throws Exception {

372

return value - 1;

373

}

374

});

375

376

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {

377

@Override

378

public boolean filter(Long value) throws Exception {

379

return value > 0;

380

}

381

});

382

383

iteration.closeWith(stillGreaterThanZero);

384

385

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {

386

@Override

387

public boolean filter(Long value) throws Exception {

388

return value <= 0;

389

}

390

});

391

```

392

393

## Types

394

395

### Transformation Function Interfaces

396

397

```java { .api }

398

// Basic transformation functions

399

interface MapFunction<T, O> extends Function {

400

O map(T value) throws Exception;

401

}

402

403

interface FilterFunction<T> extends Function {

404

boolean filter(T value) throws Exception;

405

}

406

407

interface FlatMapFunction<T, O> extends Function {

408

void flatMap(T value, Collector<O> out) throws Exception;

409

}

410

411

// Rich transformation functions

412

abstract class RichMapFunction<T, O> extends AbstractRichFunction implements MapFunction<T, O> {

413

// Provides access to RuntimeContext and lifecycle methods

414

}

415

416

abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {

417

// Provides access to RuntimeContext and lifecycle methods

418

}

419

420

abstract class RichFlatMapFunction<T, O> extends AbstractRichFunction implements FlatMapFunction<T, O> {

421

// Provides access to RuntimeContext and lifecycle methods

422

}

423

424

// Key selector for partitioning

425

interface KeySelector<IN, KEY> extends Function {

426

KEY getKey(IN value) throws Exception;

427

}

428

429

// Output selector for splitting (deprecated)

430

@Deprecated

431

interface OutputSelector<OUT> extends Function {

432

Iterable<String> select(OUT value);

433

}

434

```

435

436

### Stream Types

437

438

```java { .api }

439

// Main stream type

440

class DataStream<T> {

441

// All transformation methods as documented above

442

}

443

444

// Result of transformations

445

class SingleOutputStreamOperator<T> extends DataStream<T> {

446

// Additional operator configuration methods

447

SingleOutputStreamOperator<T> name(String name);

448

SingleOutputStreamOperator<T> uid(String uid);

449

SingleOutputStreamOperator<T> setParallelism(int parallelism);

450

<X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);

451

}

452

453

// Keyed stream for stateful operations

454

class KeyedStream<T, KEY> {

455

// Stateful operations (documented in keyed-streams-state.md)

456

}

457

458

// Connected streams for joint processing

459

class ConnectedStreams<T1, T2> {

460

// Joint processing operations (documented in connected-streams.md)

461

}

462

463

// Split stream (deprecated)

464

@Deprecated

465

class SplitStream<T> extends DataStream<T> {

466

DataStream<T> select(String... outputNames);

467

}

468

469

// Iterative stream

470

class IterativeStream<T> extends DataStream<T> {

471

DataStream<T> closeWith(DataStream<T> feedbackStream);

472

}

473

```

474

475

### Utility Types

476

477

```java { .api }

478

// Collector for emitting results

479

interface Collector<T> {

480

void collect(T record);

481

void close();

482

}

483

484

// Output tag for side outputs

485

class OutputTag<T> {

486

public OutputTag(String id) {}

487

public OutputTag(String id, TypeInformation<T> typeInfo) {}

488

}

489

490

// Runtime context for rich functions

491

interface RuntimeContext {

492

String getTaskName();

493

int getNumberOfParallelSubtasks();

494

int getIndexOfThisSubtask();

495

ExecutionConfig getExecutionConfig();

496

// State access methods

497

}

498

```