or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdcore-functions.mddatastream-traditional.mddatastream-v2.mdindex.mdstate-management.mdtable-api.mdwindowing.md

datastream-traditional.mddocs/

0

# DataStream API (Traditional)

1

2

Traditional DataStream API providing comprehensive stream processing capabilities with windowing, state management, and complex event processing features. This is the stable, production-ready API used in most Flink applications.

3

4

## Capabilities

5

6

### Execution Environment

7

8

Main entry point for traditional DataStream programs.

9

10

```java { .api }

11

/**

12

* Main entry point for streaming programs

13

*/

14

class StreamExecutionEnvironment {

15

/**

16

* Get execution environment with default configuration

17

* @return Execution environment

18

*/

19

public static StreamExecutionEnvironment getExecutionEnvironment();

20

21

/**

22

* Create local execution environment

23

* @return Local execution environment

24

*/

25

public static StreamExecutionEnvironment createLocalEnvironment();

26

27

/**

28

* Add source function to create data stream

29

* @param function Source function

30

* @param <T> Element type

31

* @return DataStream with elements from source

32

*/

33

public <T> DataStream<T> addSource(SourceFunction<T> function);

34

35

/**

36

* Create data stream from collection

37

* @param data Collection of elements

38

* @param <T> Element type

39

* @return DataStream with elements

40

*/

41

public <T> DataStream<T> fromCollection(Collection<T> data);

42

43

/**

44

* Create data stream from elements

45

* @param data Varargs elements

46

* @param <T> Element type

47

* @return DataStream with elements

48

*/

49

public <T> DataStream<T> fromElements(@SuppressWarnings("unchecked") T... data);

50

51

/**

52

* Execute the streaming program

53

* @return Job execution result

54

* @throws Exception

55

*/

56

public JobExecutionResult execute() throws Exception;

57

58

/**

59

* Execute with job name

60

* @param jobName Job name

61

* @return Job execution result

62

* @throws Exception

63

*/

64

public JobExecutionResult execute(String jobName) throws Exception;

65

66

/**

67

* Set parallelism for operations

68

* @param parallelism Parallelism level

69

*/

70

public void setParallelism(int parallelism);

71

72

/**

73

* Enable checkpointing

74

* @param interval Checkpoint interval in milliseconds

75

*/

76

public void enableCheckpointing(long interval);

77

}

78

```

79

80

### DataStream Operations

81

82

Core stream transformation operations.

83

84

```java { .api }

85

/**

86

* Core stream abstraction for traditional API

87

* @param <T> Element type

88

*/

89

class DataStream<T> {

90

/**

91

* Apply map transformation

92

* @param mapper Map function

93

* @param <R> Result type

94

* @return Transformed stream

95

*/

96

public <R> DataStream<R> map(MapFunction<T, R> mapper);

97

98

/**

99

* Apply flatMap transformation

100

* @param flatMapper FlatMap function

101

* @param <R> Result type

102

* @return Transformed stream

103

*/

104

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);

105

106

/**

107

* Filter elements

108

* @param filter Filter function

109

* @return Filtered stream

110

*/

111

public DataStream<T> filter(FilterFunction<T> filter);

112

113

/**

114

* Partition stream by key

115

* @param keySelector Key selector function

116

* @param <K> Key type

117

* @return Keyed stream

118

*/

119

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> keySelector);

120

121

/**

122

* Connect with another stream

123

* @param dataStream Other stream

124

* @param <T2> Other stream type

125

* @return Connected streams

126

*/

127

public <T2> ConnectedStreams<T, T2> connect(DataStream<T2> dataStream);

128

129

/**

130

* Union with other streams

131

* @param streams Other streams

132

* @return Union of streams

133

*/

134

public DataStream<T> union(DataStream<T>... streams);

135

136

/**

137

* Apply process function

138

* @param processFunction Process function

139

* @param <R> Result type

140

* @return Processed stream

141

*/

142

public <R> DataStream<R> process(ProcessFunction<T, R> processFunction);

143

144

/**

145

* Add sink

146

* @param sinkFunction Sink function

147

* @return Data stream sink

148

*/

149

public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);

150

151

/**

152

* Print elements to output

153

* @return Data stream sink

154

*/

155

public DataStreamSink<T> print();

156

}

157

```

158

159

### Keyed Stream Operations

160

161

Operations available on keyed streams for stateful processing.

162

163

```java { .api }

164

/**

165

* Keyed stream for stateful operations

166

* @param <T> Element type

167

* @param <K> Key type

168

*/

169

class KeyedStream<T, K> {

170

/**

171

* Reduce elements by key

172

* @param reducer Reduce function

173

* @return Stream with reduced elements

174

*/

175

public DataStream<T> reduce(ReduceFunction<T> reducer);

176

177

/**

178

* Apply process function with key context

179

* @param keyedProcessFunction Keyed process function

180

* @param <R> Result type

181

* @return Processed stream

182

*/

183

public <R> DataStream<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction);

184

185

/**

186

* Create windowed stream

187

* @param assigner Window assigner

188

* @param <W> Window type

189

* @return Windowed stream

190

*/

191

public <W extends Window> WindowedStream<T, K, W> window(WindowAssigner<? super T, W> assigner);

192

193

/**

194

* Create time windowed stream

195

* @param assigner Time window assigner

196

* @return Time windowed stream

197

*/

198

public WindowedStream<T, K, TimeWindow> timeWindow(Time size);

199

200

/**

201

* Create sliding time windowed stream

202

* @param size Window size

203

* @param slide Slide interval

204

* @return Time windowed stream

205

*/

206

public WindowedStream<T, K, TimeWindow> timeWindow(Time size, Time slide);

207

}

208

```

209

210

### Windowed Stream Operations

211

212

Operations on windowed streams.

213

214

```java { .api }

215

/**

216

* Windowed stream operations

217

* @param <T> Element type

218

* @param <K> Key type

219

* @param <W> Window type

220

*/

221

class WindowedStream<T, K, W extends Window> {

222

/**

223

* Reduce elements within windows

224

* @param function Reduce function

225

* @return Stream with window results

226

*/

227

public DataStream<T> reduce(ReduceFunction<T> function);

228

229

/**

230

* Aggregate elements within windows

231

* @param function Aggregate function

232

* @param <ACC> Accumulator type

233

* @param <R> Result type

234

* @return Stream with aggregated results

235

*/

236

public <ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> function);

237

238

/**

239

* Apply window function

240

* @param function Window function

241

* @param <R> Result type

242

* @return Stream with window results

243

*/

244

public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function);

245

246

/**

247

* Apply process window function

248

* @param function Process window function

249

* @param <R> Result type

250

* @return Stream with processed results

251

*/

252

public <R> DataStream<R> process(ProcessWindowFunction<T, R, K, W> function);

253

254

/**

255

* Set window trigger

256

* @param trigger Window trigger

257

* @return Windowed stream with trigger

258

*/

259

public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger);

260

261

/**

262

* Set window evictor

263

* @param evictor Window evictor

264

* @return Windowed stream with evictor

265

*/

266

public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor);

267

}

268

```

269

270

### Process Functions

271

272

Process function interfaces for custom processing logic.

273

274

```java { .api }

275

/**

276

* Process function for single input streams

277

* @param <I> Input type

278

* @param <O> Output type

279

*/

280

abstract class ProcessFunction<I, O> {

281

/**

282

* Process element

283

* @param value Input element

284

* @param ctx Process context

285

* @param out Output collector

286

* @throws Exception

287

*/

288

public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

289

290

/**

291

* Process timer

292

* @param timestamp Timer timestamp

293

* @param ctx OnTimer context

294

* @param out Output collector

295

* @throws Exception

296

*/

297

public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

298

299

/**

300

* Process context

301

*/

302

public abstract class Context {

303

/**

304

* Get current element timestamp

305

* @return Element timestamp

306

*/

307

public abstract Long timestamp();

308

309

/**

310

* Register processing time timer

311

* @param timestamp Timer timestamp

312

*/

313

public abstract void timerService().registerProcessingTimeTimer(long timestamp);

314

315

/**

316

* Output to side output

317

* @param outputTag Output tag

318

* @param value Value to output

319

* @param <X> Value type

320

*/

321

public abstract <X> void output(OutputTag<X> outputTag, X value);

322

}

323

}

324

325

/**

326

* Keyed process function

327

* @param <K> Key type

328

* @param <I> Input type

329

* @param <O> Output type

330

*/

331

abstract class KeyedProcessFunction<K, I, O> {

332

/**

333

* Process element with key context

334

* @param value Input element

335

* @param ctx Keyed context

336

* @param out Output collector

337

* @throws Exception

338

*/

339

public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

340

341

/**

342

* Process timer with key context

343

* @param timestamp Timer timestamp

344

* @param ctx OnTimer context

345

* @param out Output collector

346

* @throws Exception

347

*/

348

public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

349

350

/**

351

* Keyed process context

352

*/

353

public abstract class Context {

354

/**

355

* Get current key

356

* @return Current key

357

*/

358

public abstract K getCurrentKey();

359

360

/**

361

* Get timer service

362

* @return Timer service

363

*/

364

public abstract TimerService timerService();

365

}

366

}

367

```

368

369

### Supporting Types

370

371

Supporting types and interfaces for the traditional DataStream API.

372

373

```java { .api }

374

/**

375

* Source function interface for creating data streams

376

* @param <T> Element type

377

*/

378

interface SourceFunction<T> {

379

/**

380

* Run source function

381

* @param ctx Source context

382

* @throws Exception

383

*/

384

void run(SourceContext<T> ctx) throws Exception;

385

386

/**

387

* Cancel source function

388

*/

389

void cancel();

390

391

/**

392

* Source context for emitting elements

393

* @param <T> Element type

394

*/

395

interface SourceContext<T> {

396

/**

397

* Collect element

398

* @param element Element to collect

399

*/

400

void collect(T element);

401

402

/**

403

* Collect element with timestamp

404

* @param element Element to collect

405

* @param timestamp Element timestamp

406

*/

407

void collectWithTimestamp(T element, long timestamp);

408

409

/**

410

* Emit watermark

411

* @param mark Watermark

412

*/

413

void emitWatermark(Watermark mark);

414

415

/**

416

* Mark source as temporarily idle

417

*/

418

void markAsTemporarilyIdle();

419

420

/**

421

* Get checkpoint lock

422

* @return Checkpoint lock

423

*/

424

Object getCheckpointLock();

425

426

/**

427

* Close the source context

428

*/

429

void close();

430

}

431

}

432

433

/**

434

* Sink function interface for consuming data streams

435

* @param <IN> Input element type

436

*/

437

interface SinkFunction<IN> {

438

/**

439

* Invoke sink function with element

440

* @param value Input element

441

* @param context Sink context

442

* @throws Exception

443

*/

444

void invoke(IN value, Context context) throws Exception;

445

446

/**

447

* Sink context

448

*/

449

interface Context {

450

/**

451

* Get current processing time

452

* @return Current processing time

453

*/

454

long currentProcessingTime();

455

456

/**

457

* Get current watermark

458

* @return Current watermark

459

*/

460

long currentWatermark();

461

462

/**

463

* Get element timestamp

464

* @return Element timestamp

465

*/

466

Long timestamp();

467

}

468

}

469

470

/**

471

* Data stream sink

472

* @param <T> Element type

473

*/

474

class DataStreamSink<T> {

475

/**

476

* Set sink parallelism

477

* @param parallelism Parallelism level

478

* @return Data stream sink

479

*/

480

public DataStreamSink<T> setParallelism(int parallelism);

481

482

/**

483

* Disable chaining for this sink

484

* @return Data stream sink

485

*/

486

public DataStreamSink<T> disableChaining();

487

488

/**

489

* Set slot sharing group

490

* @param slotSharingGroup Slot sharing group

491

* @return Data stream sink

492

*/

493

public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);

494

495

/**

496

* Set sink name

497

* @param name Sink name

498

* @return Data stream sink

499

*/

500

public DataStreamSink<T> name(String name);

501

}

502

503

/**

504

* Job execution result

505

*/

506

class JobExecutionResult {

507

/**

508

* Get job execution time

509

* @return Execution time in milliseconds

510

*/

511

public long getNetRuntime();

512

513

/**

514

* Get accumulator result

515

* @param accumulatorName Accumulator name

516

* @param <T> Result type

517

* @return Accumulator result

518

*/

519

public <T> T getAccumulatorResult(String accumulatorName);

520

521

/**

522

* Get all accumulator results

523

* @return Map of accumulator results

524

*/

525

public Map<String, Object> getAllAccumulatorResults();

526

}

527

528

/**

529

* Timer service for managing timers

530

*/

531

interface TimerService {

532

/**

533

* Get current processing time

534

* @return Current processing time

535

*/

536

long currentProcessingTime();

537

538

/**

539

* Get current watermark

540

* @return Current watermark

541

*/

542

long currentWatermark();

543

544

/**

545

* Register processing time timer

546

* @param timestamp Timer timestamp

547

*/

548

void registerProcessingTimeTimer(long timestamp);

549

550

/**

551

* Register event time timer

552

* @param timestamp Timer timestamp

553

*/

554

void registerEventTimeTimer(long timestamp);

555

556

/**

557

* Delete processing time timer

558

* @param timestamp Timer timestamp

559

*/

560

void deleteProcessingTimeTimer(long timestamp);

561

562

/**

563

* Delete event time timer

564

* @param timestamp Timer timestamp

565

*/

566

void deleteEventTimeTimer(long timestamp);

567

}

568

569

/**

570

* Connected streams for processing two input streams

571

* @param <IN1> First input type

572

* @param <IN2> Second input type

573

*/

574

class ConnectedStreams<IN1, IN2> {

575

/**

576

* Apply co-map function

577

* @param coMapper Co-map function

578

* @param <R> Result type

579

* @return Data stream

580

*/

581

public <R> DataStream<R> map(CoMapFunction<IN1, IN2, R> coMapper);

582

583

/**

584

* Apply co-flat-map function

585

* @param coFlatMapper Co-flat-map function

586

* @param <R> Result type

587

* @return Data stream

588

*/

589

public <R> DataStream<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper);

590

591

/**

592

* Apply co-process function

593

* @param coProcessFunction Co-process function

594

* @param <R> Result type

595

* @return Data stream

596

*/

597

public <R> DataStream<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction);

598

}

599

```

600

601

### Function Interfaces

602

603

Additional function interfaces for stream processing.

604

605

```java { .api }

606

/**

607

* Co-map function for connected streams

608

* @param <IN1> First input type

609

* @param <IN2> Second input type

610

* @param <OUT> Output type

611

*/

612

interface CoMapFunction<IN1, IN2, OUT> extends Function {

613

/**

614

* Map function for first input

615

* @param value Value from first input

616

* @return Mapped value

617

* @throws Exception

618

*/

619

OUT map1(IN1 value) throws Exception;

620

621

/**

622

* Map function for second input

623

* @param value Value from second input

624

* @return Mapped value

625

* @throws Exception

626

*/

627

OUT map2(IN2 value) throws Exception;

628

}

629

630

/**

631

* Co-flat-map function for connected streams

632

* @param <IN1> First input type

633

* @param <IN2> Second input type

634

* @param <OUT> Output type

635

*/

636

interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {

637

/**

638

* Flat-map function for first input

639

* @param value Value from first input

640

* @param out Output collector

641

* @throws Exception

642

*/

643

void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

644

645

/**

646

* Flat-map function for second input

647

* @param value Value from second input

648

* @param out Output collector

649

* @throws Exception

650

*/

651

void flatMap2(IN2 value, Collector<OUT> out) throws Exception;

652

}

653

654

/**

655

* Co-process function for connected streams

656

* @param <IN1> First input type

657

* @param <IN2> Second input type

658

* @param <OUT> Output type

659

*/

660

abstract class CoProcessFunction<IN1, IN2, OUT> {

661

/**

662

* Process element from first input

663

* @param value Element from first input

664

* @param ctx Process context

665

* @param out Output collector

666

* @throws Exception

667

*/

668

public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

669

670

/**

671

* Process element from second input

672

* @param value Element from second input

673

* @param ctx Process context

674

* @param out Output collector

675

* @throws Exception

676

*/

677

public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

678

679

/**

680

* Process timer

681

* @param timestamp Timer timestamp

682

* @param ctx OnTimer context

683

* @param out Output collector

684

* @throws Exception

685

*/

686

public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

687

688

/**

689

* Process context

690

*/

691

public abstract class Context {

692

/**

693

* Get timer service

694

* @return Timer service

695

*/

696

public abstract TimerService timerService();

697

698

/**

699

* Get current timestamp

700

* @return Current timestamp

701

*/

702

public abstract Long timestamp();

703

704

/**

705

* Output to side output

706

* @param outputTag Output tag

707

* @param value Value to output

708

* @param <X> Value type

709

*/

710

public abstract <X> void output(OutputTag<X> outputTag, X value);

711

}

712

713

/**

714

* OnTimer context

715

*/

716

public abstract class OnTimerContext extends Context {

717

/**

718

* Get timer timestamp

719

* @return Timer timestamp

720

*/

721

public abstract Long timestamp();

722

723

/**

724

* Get timer domain

725

* @return Timer domain

726

*/

727

public abstract TimeDomain timeDomain();

728

}

729

}

730

731

/**

732

* Time domain enumeration

733

*/

734

enum TimeDomain {

735

/** Event time domain */

736

EVENT_TIME,

737

/** Processing time domain */

738

PROCESSING_TIME

739

}

740

```

741

742

### Asynchronous Operations

743

744

Support for asynchronous I/O operations in stream processing.

745

746

```java { .api }

747

/**

748

* Function interface for asynchronous operations

749

* @param <IN> Input type

750

* @param <OUT> Output type

751

*/

752

interface AsyncFunction<IN, OUT> extends Function {

753

/**

754

* Trigger async operation for input element

755

* @param input Input element

756

* @param resultFuture Future to complete with results

757

* @throws Exception

758

*/

759

void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

760

761

/**

762

* Handle timeout for async operation (optional)

763

* @param input Input element that timed out

764

* @param resultFuture Future to complete with results

765

* @throws Exception

766

*/

767

default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {

768

resultFuture.completeExceptionally(

769

new TimeoutException("Async operation timed out"));

770

}

771

}

772

773

/**

774

* Rich async function with runtime context access

775

* @param <IN> Input type

776

* @param <OUT> Output type

777

*/

778

abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {}

779

780

/**

781

* Utility class for applying async operations to data streams

782

*/

783

class AsyncDataStream {

784

/**

785

* Apply async function to stream with unordered output

786

* @param input Input stream

787

* @param function Async function

788

* @param timeout Operation timeout

789

* @param timeUnit Timeout time unit

790

* @param capacity Maximum number of concurrent async operations

791

* @param <IN> Input type

792

* @param <OUT> Output type

793

* @return Stream with async results

794

*/

795

public static <IN, OUT> DataStream<OUT> unorderedWait(

796

DataStream<IN> input,

797

AsyncFunction<IN, OUT> function,

798

long timeout,

799

TimeUnit timeUnit,

800

int capacity);

801

802

/**

803

* Apply async function to stream with unordered output (default capacity)

804

* @param input Input stream

805

* @param function Async function

806

* @param timeout Operation timeout

807

* @param timeUnit Timeout time unit

808

* @param <IN> Input type

809

* @param <OUT> Output type

810

* @return Stream with async results

811

*/

812

public static <IN, OUT> DataStream<OUT> unorderedWait(

813

DataStream<IN> input,

814

AsyncFunction<IN, OUT> function,

815

long timeout,

816

TimeUnit timeUnit);

817

818

/**

819

* Apply async function to stream with ordered output

820

* @param input Input stream

821

* @param function Async function

822

* @param timeout Operation timeout

823

* @param timeUnit Timeout time unit

824

* @param capacity Maximum number of concurrent async operations

825

* @param <IN> Input type

826

* @param <OUT> Output type

827

* @return Stream with async results (order preserved)

828

*/

829

public static <IN, OUT> DataStream<OUT> orderedWait(

830

DataStream<IN> input,

831

AsyncFunction<IN, OUT> function,

832

long timeout,

833

TimeUnit timeUnit,

834

int capacity);

835

836

/**

837

* Apply async function to stream with ordered output (default capacity)

838

* @param input Input stream

839

* @param function Async function

840

* @param timeout Operation timeout

841

* @param timeUnit Timeout time unit

842

* @param <IN> Input type

843

* @param <OUT> Output type

844

* @return Stream with async results (order preserved)

845

*/

846

public static <IN, OUT> DataStream<OUT> orderedWait(

847

DataStream<IN> input,

848

AsyncFunction<IN, OUT> function,

849

long timeout,

850

TimeUnit timeUnit);

851

}

852

853

/**

854

* Future for collecting async operation results

855

* @param <OUT> Result type

856

*/

857

interface ResultFuture<OUT> {

858

/**

859

* Complete future with single result

860

* @param result Result value

861

*/

862

void complete(Collection<OUT> result);

863

864

/**

865

* Complete future with single result

866

* @param result Result value

867

*/

868

default void complete(OUT result) {

869

complete(Collections.singletonList(result));

870

}

871

872

/**

873

* Complete future exceptionally

874

* @param error Exception

875

*/

876

void completeExceptionally(Throwable error);

877

}

878

```