or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

checkpointing-state.mddatastream-operations.mdexecution-environment.mdindex.mdsources-and-sinks.mdstream-operators.mdwindowing.md

datastream-operations.mddocs/

0

# DataStream Operations

1

2

DataStream is the core abstraction in Flink representing a stream of data elements. It provides a rich set of transformation operations to process and manipulate streaming data.

3

4

## DataStream<T>

5

6

The fundamental stream abstraction providing transformation operations.

7

8

```java { .api }

9

public class DataStream<T> {

10

// Basic transformations

11

public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<T, R> mapper);

12

public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<T, R> flatMapper);

13

public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter);

14

public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer);

15

public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder);

16

public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes);

17

18

// Aggregation operations (available on non-keyed streams for single parallelism)

19

public SingleOutputStreamOperator<T, ?> sum(int positionToSum);

20

public SingleOutputStreamOperator<T, ?> sum(String field);

21

public SingleOutputStreamOperator<T, ?> min(int positionToMin);

22

public SingleOutputStreamOperator<T, ?> min(String field);

23

public SingleOutputStreamOperator<T, ?> max(int positionToMax);

24

public SingleOutputStreamOperator<T, ?> max(String field);

25

public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy);

26

public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy);

27

public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first);

28

public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first);

29

public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy);

30

public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy);

31

public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first);

32

public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first);

33

public SingleOutputStreamOperator<Long, ?> count();

34

35

// Keyed operations

36

public GroupedDataStream<T> groupBy(KeySelector<T, ?> key);

37

public GroupedDataStream<T> groupBy(int... fields);

38

public GroupedDataStream<T> groupBy(String... fields);

39

40

// Stream composition and connectivity

41

public DataStream<T> union(DataStream<T>... streams);

42

public <R> ConnectedDataStream<T, R> connect(DataStream<R> dataStream);

43

public SplitDataStream<T> split(OutputSelector<T> outputSelector);

44

45

// Partitioning strategies

46

public DataStream<T> shuffle();

47

public DataStream<T> forward();

48

public DataStream<T> rebalance();

49

public DataStream<T> global();

50

public DataStream<T> broadcast();

51

public DataStream<T> partitionByHash(int... fields);

52

public DataStream<T> partitionByHash(String... fields);

53

public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector);

54

55

// Temporal operations (cross and join)

56

public <IN2> StreamCrossOperator<T, IN2> cross(DataStream<IN2> dataStreamToCross);

57

public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin);

58

59

// Iteration support

60

public IterativeDataStream<T> iterate();

61

public IterativeDataStream<T> iterate(long maxWaitTimeMillis);

62

63

// Windowing operations

64

public WindowedDataStream<T> window(WindowingHelper policyHelper);

65

public WindowedDataStream<T> window(TriggerPolicy<T> trigger, EvictionPolicy<T> eviction);

66

public WindowedDataStream<T> every(WindowingHelper policyHelper);

67

68

// Output operations - Console

69

public DataStreamSink<T> print();

70

public DataStreamSink<T> printToErr();

71

72

// Output operations - File (Text)

73

public DataStreamSink<T> writeAsText(String path);

74

public DataStreamSink<T> writeAsText(String path, long millis);

75

public DataStreamSink<T> writeAsText(String path, WriteMode writeMode);

76

public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis);

77

78

// Output operations - File (CSV)

79

public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path);

80

public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, long millis);

81

public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);

82

public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, long millis);

83

84

// Output operations - Network and Generic

85

public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema);

86

public DataStreamSink<T> write(OutputFormat<T> format, long millis);

87

public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);

88

89

// Advanced operations

90

public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator);

91

92

// Basic properties and configuration

93

public Integer getId();

94

public int getParallelism();

95

public TypeInformation<T> getType();

96

public StreamExecutionEnvironment getExecutionEnvironment();

97

public DataStream<T> copy();

98

}

99

```

100

101

## DataStreamSource<T>

102

103

A DataStream created from a source function.

104

105

```java { .api }

106

public class DataStreamSource<T> extends DataStream<T> {

107

public DataStreamSource<T> setParallelism(int parallelism);

108

public DataStreamSource<T> name(String name);

109

}

110

```

111

112

## GroupedDataStream<T>

113

114

A DataStream that has been partitioned by key for keyed operations.

115

116

```java { .api }

117

public class GroupedDataStream<T> {

118

// Aggregations

119

public DataStream<T> reduce(ReduceFunction<T> reducer);

120

public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> folder);

121

122

// Field-based aggregations

123

public DataStream<T> sum(int positionToSum);

124

public DataStream<T> sum(String field);

125

public DataStream<T> min(int positionToMin);

126

public DataStream<T> min(String field);

127

public DataStream<T> max(int positionToMax);

128

public DataStream<T> max(String field);

129

public DataStream<T> minBy(int positionToMinBy);

130

public DataStream<T> minBy(String field);

131

public DataStream<T> maxBy(int positionToMaxBy);

132

public DataStream<T> maxBy(String field);

133

134

// Windowing

135

public WindowedDataStream<T> window(WindowingHelper<T> helper);

136

public WindowedDataStream<T> every(WindowingHelper<T> helper);

137

138

// Configuration

139

public GroupedDataStream<T> setParallelism(int parallelism);

140

public GroupedDataStream<T> name(String name);

141

}

142

```

143

144

## ConnectedDataStream<T1, T2>

145

146

Two connected streams that can be processed jointly.

147

148

```java { .api }

149

public class ConnectedDataStream<T1, T2> {

150

// Joint transformations

151

public <R> DataStream<R> map(CoMapFunction<T1, T2, R> coMapper);

152

public <R> DataStream<R> flatMap(CoFlatMapFunction<T1, T2, R> coFlatMapper);

153

public <R> DataStream<R> reduce(CoReduceFunction<T1, T2, R> coReducer);

154

155

// Key both streams for keyed operations

156

public ConnectedDataStream<T1, T2> groupBy(KeySelector<T1, ?> keySelector1, KeySelector<T2, ?> keySelector2);

157

public ConnectedDataStream<T1, T2> groupBy(int key1, int key2);

158

public ConnectedDataStream<T1, T2> groupBy(String key1, String key2);

159

160

// Configuration

161

public ConnectedDataStream<T1, T2> setParallelism(int parallelism);

162

public ConnectedDataStream<T1, T2> name(String name);

163

}

164

```

165

166

## DataStreamSink<T>

167

168

Terminal operation that consumes stream data.

169

170

```java { .api }

171

public class DataStreamSink<T> {

172

// Configuration

173

public DataStreamSink<T> setParallelism(int parallelism);

174

public int getParallelism();

175

public DataStreamSink<T> name(String name);

176

public DataStreamSink<T> disableChaining();

177

public DataStreamSink<T> setBufferTimeout(long timeout);

178

public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);

179

}

180

```

181

182

## Usage Examples

183

184

### Basic Transformations

185

186

```java

187

DataStream<String> text = env.fromElements("hello world", "how are you");

188

189

// Map transformation

190

DataStream<String> upper = text.map(String::toUpperCase);

191

192

// FlatMap transformation

193

DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {

194

@Override

195

public void flatMap(String sentence, Collector<String> out) {

196

for (String word : sentence.split(" ")) {

197

out.collect(word);

198

}

199

}

200

});

201

202

// Filter transformation

203

DataStream<String> filtered = words.filter(word -> word.length() > 3);

204

```

205

206

### Keyed Operations

207

208

```java

209

DataStream<Tuple2<String, Integer>> counts = words

210

.map(word -> new Tuple2<>(word, 1))

211

.groupBy(0) // Group by first field (word)

212

.sum(1); // Sum second field (count)

213

214

// Using KeySelector

215

DataStream<Tuple2<String, Integer>> counts2 = words

216

.map(word -> new Tuple2<>(word, 1))

217

.groupBy(tuple -> tuple.f0) // Group by word

218

.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));

219

```

220

221

### Stream Union

222

223

```java

224

DataStream<String> stream1 = env.fromElements("a", "b", "c");

225

DataStream<String> stream2 = env.fromElements("d", "e", "f");

226

DataStream<String> stream3 = env.fromElements("g", "h", "i");

227

228

// Union multiple streams

229

DataStream<String> unionStream = stream1.union(stream2, stream3);

230

```

231

232

### Connected Streams

233

234

```java

235

DataStream<String> stream1 = env.fromElements("hello", "world");

236

DataStream<Integer> stream2 = env.fromElements(1, 2, 3);

237

238

ConnectedDataStream<String, Integer> connected = stream1.connect(stream2);

239

240

DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {

241

@Override

242

public String map1(String value) {

243

return "String: " + value;

244

}

245

246

@Override

247

public String map2(Integer value) {

248

return "Integer: " + value;

249

}

250

});

251

```

252

253

### Partitioning

254

255

```java

256

DataStream<Tuple2<String, Integer>> data = env.fromElements(

257

new Tuple2<>("key1", 1), new Tuple2<>("key2", 2));

258

259

// Different partitioning strategies

260

DataStream<Tuple2<String, Integer>> shuffled = data.shuffle();

261

DataStream<Tuple2<String, Integer>> rebalanced = data.rebalance();

262

DataStream<Tuple2<String, Integer>> broadcasted = data.broadcast();

263

264

```

265

266

### Output Operations

267

268

```java

269

DataStream<String> processed = words.map(String::toUpperCase);

270

271

// Print to standard output

272

processed.print();

273

274

// Write to file

275

processed.writeAsText("/path/to/output.txt");

276

277

// Write as CSV

278

DataStream<Tuple2<String, Integer>> tuples = processed

279

.map(word -> new Tuple2<>(word, word.length()));

280

tuples.writeAsCsv("/path/to/output.csv");

281

282

// Custom sink

283

processed.addSink(new SinkFunction<String>() {

284

@Override

285

public void invoke(String value) {

286

System.out.println("Custom sink: " + value);

287

}

288

});

289

```

290

291

### Configuration

292

293

```java

294

DataStream<String> configured = text

295

.map(String::toUpperCase)

296

.name("UpperCase Transformation")

297

.setParallelism(4)

298

.setBufferTimeout(100)

299

.disableChaining();

300

```

301

302

## Types

303

304

```java { .api }

305

// Function interfaces for transformations

306

public interface MapFunction<T, O> extends Function {

307

O map(T value) throws Exception;

308

}

309

310

public interface FlatMapFunction<T, O> extends Function {

311

void flatMap(T value, Collector<O> out) throws Exception;

312

}

313

314

public interface FilterFunction<T> extends Function {

315

boolean filter(T value) throws Exception;

316

}

317

318

public interface ReduceFunction<T> extends Function {

319

T reduce(T value1, T value2) throws Exception;

320

}

321

322

public interface FoldFunction<T, O> extends Function {

323

O fold(O accumulator, T value) throws Exception;

324

}

325

326

public interface KeySelector<IN, KEY> extends Function {

327

KEY getKey(IN value) throws Exception;

328

}

329

330

// CoFunction interfaces for connected streams

331

public interface CoMapFunction<IN1, IN2, OUT> extends Function {

332

OUT map1(IN1 value) throws Exception;

333

OUT map2(IN2 value) throws Exception;

334

}

335

336

public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {

337

void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

338

void flatMap2(IN2 value, Collector<OUT> out) throws Exception;

339

}

340

341

public interface CoReduceFunction<IN1, IN2, OUT> extends Function {

342

OUT reduce1(IN1 value, OUT accumulator) throws Exception;

343

OUT reduce2(IN2 value, OUT accumulator) throws Exception;

344

}

345

346

// Partitioner interface

347

public abstract class Partitioner<T> implements Serializable {

348

public abstract int partition(T key, int numPartitions);

349

}

350

351

// Collector interface

352

public interface Collector<T> {

353

void collect(T record);

354

void close();

355

}

356

357

// Additional types for advanced operations

358

public class SingleOutputStreamOperator<T, O extends StreamOperator<T>> extends DataStream<T> {

359

// Stream operator with configuration methods

360

public SingleOutputStreamOperator<T, O> name(String name);

361

public SingleOutputStreamOperator<T, O> setParallelism(int parallelism);

362

public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis);

363

public SingleOutputStreamOperator<T, O> disableChaining();

364

public SingleOutputStreamOperator<T, O> startNewChain();

365

public SingleOutputStreamOperator<T, O> slotSharingGroup(String slotSharingGroup);

366

}

367

368

public class SplitDataStream<T> {

369

// Split stream that can be selected by name

370

public DataStream<T> select(String... outputNames);

371

}

372

373

public interface OutputSelector<T> extends Serializable {

374

Iterable<String> select(T value);

375

}

376

377

public class IterativeDataStream<T> extends DataStream<T> {

378

// Iterative data stream for feedback loops

379

public DataStream<T> closeWith(DataStream<T> feedbackStream);

380

}

381

382

public class StreamCrossOperator<I1, I2> {

383

// Cross operation between two streams

384

public <OUT> DataStream<OUT> with(CrossFunction<I1, I2, OUT> crossFunction);

385

public StreamCrossOperator<I1, I2> where(KeySelector<I1, ?> keySelector);

386

public StreamCrossOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);

387

}

388

389

public class StreamJoinOperator<I1, I2> {

390

// Join operation between two streams

391

public <OUT> DataStream<OUT> with(JoinFunction<I1, I2, OUT> joinFunction);

392

public StreamJoinOperator<I1, I2> where(KeySelector<I1, ?> keySelector);

393

public StreamJoinOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);

394

}

395

396

// Cross and Join function interfaces

397

public interface CrossFunction<IN1, IN2, OUT> extends Function {

398

OUT cross(IN1 first, IN2 second) throws Exception;

399

}

400

401

public interface JoinFunction<IN1, IN2, OUT> extends Function {

402

OUT join(IN1 first, IN2 second) throws Exception;

403

}

404

405

// Windowing types referenced in API

406

public abstract class WindowingHelper<T> implements Serializable {

407

// Base class for windowing helpers

408

}

409

410

public interface TriggerPolicy<T> extends Serializable {

411

// Trigger policy for windowing

412

}

413

414

public interface EvictionPolicy<T> extends Serializable {

415

// Eviction policy for windowing

416

}

417

418

// Serialization schema for network output

419

public interface SerializationSchema<T, S> extends Serializable {

420

S serialize(T element);

421

}

422

423

// Write mode enum for file outputs

424

public enum WriteMode {

425

NO_OVERWRITE,

426

OVERWRITE

427

}

428

```