or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdcore-functions.mddatastream-traditional.mddatastream-v2.mdindex.mdstate-management.mdtable-api.mdwindowing.md

datastream-v2.mddocs/

0

# DataStream API (New v2)

1

2

Next-generation DataStream API with improved type safety, better performance, and enhanced functionality. This experimental API provides a streamlined programming model for stream processing applications with cleaner abstractions and more intuitive operation semantics.

3

4

## Capabilities

5

6

### Execution Environment

7

8

Main entry point for creating DataStream programs with the new v2 API.

9

10

```java { .api }

11

/**

12

* Main entry point for DataStream programs using the new v2 API

13

*/

14

interface ExecutionEnvironment {

15

/**

16

* Create data stream from source

17

* @param source Data source

18

* @param <T> Element type

19

* @return DataStream with elements from source

20

*/

21

<T> DataStream<T> fromSource(Source<T> source);

22

23

/**

24

* Create data stream from collection

25

* @param collection Collection of elements

26

* @param <T> Element type

27

* @return DataStream with elements from collection

28

*/

29

<T> DataStream<T> fromCollection(Collection<T> collection);

30

31

/**

32

* Create data stream from elements

33

* @param elements Varargs elements

34

* @param <T> Element type

35

* @return DataStream with provided elements

36

*/

37

<T> DataStream<T> fromElements(T... elements);

38

39

/**

40

* Execute the streaming program

41

* @return Job execution result

42

*/

43

CompletableFuture<JobExecutionResult> execute();

44

45

/**

46

* Execute the streaming program with job name

47

* @param jobName Name of the job

48

* @return Job execution result

49

*/

50

CompletableFuture<JobExecutionResult> execute(String jobName);

51

}

52

```

53

54

### Stream Types

55

56

Core stream abstractions in the v2 API providing different partitioning and processing semantics.

57

58

```java { .api }

59

/**

60

* Fundamental data stream interface

61

* @param <T> Element type

62

*/

63

interface DataStream<T> {

64

/**

65

* Apply process function to transform elements

66

* @param processFunction Process function

67

* @param <OUT> Output type

68

* @return Transformed stream

69

*/

70

<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);

71

72

/**

73

* Partition stream by key

74

* @param keySelector Key extraction function

75

* @param <K> Key type

76

* @return Keyed partitioned stream

77

*/

78

<K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

79

80

/**

81

* Connect with another stream for dual-input processing

82

* @param other Other stream

83

* @param <T2> Other stream element type

84

* @return Connected streams

85

*/

86

<T2> TwoInputConnectedStreams<T, T2> connectWith(DataStream<T2> other);

87

88

/**

89

* Broadcast stream to all parallel instances

90

* @return Broadcast stream

91

*/

92

BroadcastStream<T> broadcast();

93

94

/**

95

* Send all elements to single parallel instance

96

* @return Global stream

97

*/

98

GlobalStream<T> global();

99

100

/**

101

* Add sink to consume stream elements

102

* @param sink Data sink

103

* @return Sink transformation

104

*/

105

DataStreamSink<T> sinkTo(Sink<T> sink);

106

}

107

108

/**

109

* Keyed and partitioned stream for stateful operations

110

* @param <K> Key type

111

* @param <T> Element type

112

*/

113

interface KeyedPartitionStream<K, T> {

114

/**

115

* Apply keyed process function

116

* @param processFunction Keyed process function

117

* @param <OUT> Output type

118

* @return Transformed stream

119

*/

120

<OUT> DataStream<OUT> process(KeyedProcessFunction<K, T, OUT> processFunction);

121

122

/**

123

* Reduce elements by key using reduce function

124

* @param reduceFunction Reduce function

125

* @return Stream with reduced elements

126

*/

127

DataStream<T> reduce(ReduceFunction<T> reduceFunction);

128

129

/**

130

* Aggregate elements by key using aggregate function

131

* @param aggregateFunction Aggregate function

132

* @param <ACC> Accumulator type

133

* @param <OUT> Output type

134

* @return Stream with aggregated elements

135

*/

136

<ACC, OUT> DataStream<OUT> aggregate(AggregateFunction<T, ACC, OUT> aggregateFunction);

137

}

138

139

/**

140

* Non-keyed partitioned stream

141

* @param <T> Element type

142

*/

143

interface NonKeyedPartitionStream<T> {

144

/**

145

* Apply process function to stream

146

* @param processFunction Process function

147

* @param <OUT> Output type

148

* @return Transformed stream

149

*/

150

<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);

151

}

152

153

/**

154

* Broadcast stream that sends elements to all parallel instances

155

* @param <T> Element type

156

*/

157

interface BroadcastStream<T> {

158

/**

159

* Connect with keyed stream for broadcast processing

160

* @param keyedStream Keyed stream

161

* @param <K> Key type

162

* @param <KS> Keyed stream element type

163

* @return Broadcast connected stream

164

*/

165

<K, KS> BroadcastConnectedStream<KS, T> connectWith(KeyedPartitionStream<K, KS> keyedStream);

166

167

/**

168

* Connect with non-keyed stream for broadcast processing

169

* @param stream Non-keyed stream

170

* @param <S> Stream element type

171

* @return Broadcast connected stream

172

*/

173

<S> BroadcastConnectedStream<S, T> connectWith(NonKeyedPartitionStream<S> stream);

174

}

175

176

/**

177

* Global stream processed by single parallel instance

178

* @param <T> Element type

179

*/

180

interface GlobalStream<T> {

181

/**

182

* Apply process function to global stream

183

* @param processFunction Process function

184

* @param <OUT> Output type

185

* @return Transformed stream

186

*/

187

<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);

188

}

189

```

190

191

### Process Functions

192

193

Process function interfaces for the v2 API providing flexible stream processing capabilities.

194

195

```java { .api }

196

/**

197

* Base interface for process functions

198

*/

199

interface ProcessFunction {}

200

201

/**

202

* Single input stream processing function

203

* @param <IN> Input type

204

* @param <OUT> Output type

205

*/

206

interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {

207

/**

208

* Process single element

209

* @param element Input element

210

* @param output Output collector

211

* @param ctx Runtime context

212

* @throws Exception

213

*/

214

void processElement(IN element, Collector<OUT> output, RuntimeContext ctx) throws Exception;

215

216

/**

217

* Process timer event

218

* @param timestamp Timer timestamp

219

* @param output Output collector

220

* @param ctx Runtime context

221

* @throws Exception

222

*/

223

default void onTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) throws Exception {}

224

}

225

226

/**

227

* Key-aware process function for keyed streams

228

* @param <K> Key type

229

* @param <IN> Input type

230

* @param <OUT> Output type

231

*/

232

interface KeyedProcessFunction<K, IN, OUT> extends ProcessFunction {

233

/**

234

* Process element with key context

235

* @param element Input element

236

* @param output Output collector

237

* @param ctx Partitioned context with key access

238

* @throws Exception

239

*/

240

void processElement(IN element, Collector<OUT> output, PartitionedContext<K> ctx) throws Exception;

241

242

/**

243

* Process timer with key context

244

* @param timestamp Timer timestamp

245

* @param output Output collector

246

* @param ctx Partitioned context with key access

247

* @throws Exception

248

*/

249

default void onTimer(long timestamp, Collector<OUT> output, PartitionedContext<K> ctx) throws Exception {}

250

}

251

252

/**

253

* Dual input stream processing function (non-broadcast)

254

* @param <IN1> First input type

255

* @param <IN2> Second input type

256

* @param <OUT> Output type

257

*/

258

interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {

259

/**

260

* Process element from first input

261

* @param element Element from first input

262

* @param output Output collector

263

* @param ctx Runtime context

264

* @throws Exception

265

*/

266

void processElement1(IN1 element, Collector<OUT> output, RuntimeContext ctx) throws Exception;

267

268

/**

269

* Process element from second input

270

* @param element Element from second input

271

* @param output Output collector

272

* @param ctx Runtime context

273

* @throws Exception

274

*/

275

void processElement2(IN2 element, Collector<OUT> output, RuntimeContext ctx) throws Exception;

276

}

277

278

/**

279

* Multi-output stream processing function

280

* @param <IN> Input type

281

* @param <OUT1> First output type

282

* @param <OUT2> Second output type

283

*/

284

interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {

285

/**

286

* Process element producing multiple outputs

287

* @param element Input element

288

* @param output1 First output collector

289

* @param output2 Second output collector

290

* @param ctx Runtime context

291

* @throws Exception

292

*/

293

void processElement(IN element, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) throws Exception;

294

}

295

```

296

297

### Context Interfaces

298

299

Context interfaces providing access to runtime information and services in the v2 API.

300

301

```java { .api }

302

/**

303

* Runtime context for accessing runtime information

304

*/

305

interface RuntimeContext {

306

/**

307

* Get task name

308

* @return Task name

309

*/

310

String getTaskName();

311

312

/**

313

* Get parallelism of current operator

314

* @return Parallelism

315

*/

316

int getParallelism();

317

318

/**

319

* Get index of current parallel subtask

320

* @return Subtask index

321

*/

322

int getIndexOfThisSubtask();

323

324

/**

325

* Get processing time manager

326

* @return Processing time manager

327

*/

328

ProcessingTimeManager getProcessingTimeManager();

329

330

/**

331

* Get state manager

332

* @return State manager

333

*/

334

StateManager getStateManager();

335

}

336

337

/**

338

* Context for partitioned (keyed) processing

339

* @param <K> Key type

340

*/

341

interface PartitionedContext<K> extends RuntimeContext {

342

/**

343

* Get current key

344

* @return Current processing key

345

*/

346

K getCurrentKey();

347

348

/**

349

* Get keyed state manager

350

* @return Keyed state manager

351

*/

352

KeyedStateManager getKeyedStateManager();

353

}

354

355

/**

356

* Interface for state management

357

*/

358

interface StateManager {

359

/**

360

* Get value state

361

* @param descriptor State descriptor

362

* @param <T> Value type

363

* @return Value state

364

*/

365

<T> ValueState<T> getState(ValueStateDescriptor<T> descriptor);

366

367

/**

368

* Get list state

369

* @param descriptor State descriptor

370

* @param <T> Element type

371

* @return List state

372

*/

373

<T> ListState<T> getListState(ListStateDescriptor<T> descriptor);

374

375

/**

376

* Get map state

377

* @param descriptor State descriptor

378

* @param <UK> User key type

379

* @param <UV> User value type

380

* @return Map state

381

*/

382

<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> descriptor);

383

}

384

385

/**

386

* Interface for keyed state management

387

*/

388

interface KeyedStateManager extends StateManager {

389

/**

390

* Get reducing state

391

* @param descriptor State descriptor

392

* @param <T> Element type

393

* @return Reducing state

394

*/

395

<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> descriptor);

396

397

/**

398

* Get aggregating state

399

* @param descriptor State descriptor

400

* @param <IN> Input type

401

* @param <ACC> Accumulator type

402

* @param <OUT> Output type

403

* @return Aggregating state

404

*/

405

<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> descriptor);

406

}

407

408

/**

409

* Interface for processing time management

410

*/

411

interface ProcessingTimeManager {

412

/**

413

* Get current processing time

414

* @return Current processing time timestamp

415

*/

416

long getCurrentProcessingTime();

417

418

/**

419

* Register processing time timer

420

* @param timestamp Timer timestamp

421

*/

422

void registerTimer(long timestamp);

423

424

/**

425

* Delete processing time timer

426

* @param timestamp Timer timestamp

427

*/

428

void deleteTimer(long timestamp);

429

}

430

```

431

432

### Windowing Extension

433

434

Windowing support for the v2 API through extensions.

435

436

```java { .api }

437

/**

438

* Base window strategy interface

439

* @param <W> Window type

440

*/

441

interface WindowStrategy<W> {

442

/**

443

* Assign windows to element

444

* @param element Element to assign windows to

445

* @param timestamp Element timestamp

446

* @return Collection of windows

447

*/

448

Collection<W> assignWindows(Object element, long timestamp);

449

450

/**

451

* Get window serializer

452

* @return Window serializer

453

*/

454

TypeSerializer<W> getWindowSerializer();

455

}

456

457

/**

458

* Tumbling time window strategy

459

*/

460

class TumblingTimeWindowStrategy implements WindowStrategy<TimeWindow> {

461

/**

462

* Create tumbling time windows with specified size

463

* @param size Window size

464

* @return Tumbling window strategy

465

*/

466

public static TumblingTimeWindowStrategy of(Duration size);

467

468

/**

469

* Create tumbling time windows with size and offset

470

* @param size Window size

471

* @param offset Window offset

472

* @return Tumbling window strategy

473

*/

474

public static TumblingTimeWindowStrategy of(Duration size, Duration offset);

475

}

476

477

/**

478

* Sliding time window strategy

479

*/

480

class SlidingTimeWindowStrategy implements WindowStrategy<TimeWindow> {

481

/**

482

* Create sliding time windows with size and slide

483

* @param size Window size

484

* @param slide Slide interval

485

* @return Sliding window strategy

486

*/

487

public static SlidingTimeWindowStrategy of(Duration size, Duration slide);

488

489

/**

490

* Create sliding time windows with size, slide, and offset

491

* @param size Window size

492

* @param slide Slide interval

493

* @param offset Window offset

494

* @return Sliding window strategy

495

*/

496

public static SlidingTimeWindowStrategy of(Duration size, Duration slide, Duration offset);

497

}

498

499

/**

500

* Session window strategy

501

*/

502

class SessionWindowStrategy implements WindowStrategy<TimeWindow> {

503

/**

504

* Create session windows with inactivity gap

505

* @param gap Inactivity gap

506

* @return Session window strategy

507

*/

508

public static SessionWindowStrategy withGap(Duration gap);

509

}

510

511

/**

512

* Time window implementation

513

*/

514

class TimeWindow {

515

/**

516

* Get window start time

517

* @return Start timestamp

518

*/

519

public long getStart();

520

521

/**

522

* Get window end time

523

* @return End timestamp

524

*/

525

public long getEnd();

526

527

/**

528

* Get window maximum timestamp

529

* @return Maximum timestamp

530

*/

531

public long maxTimestamp();

532

533

/**

534

* Check if window contains timestamp

535

* @param timestamp Timestamp to check

536

* @return true if timestamp is in window

537

*/

538

public boolean contains(long timestamp);

539

}

540

```

541

542

### Event Time Extension

543

544

Event time processing support through extensions.

545

546

```java { .api }

547

/**

548

* Event time extension for DataStream API v2

549

*/

550

class EventTimeExtension {

551

/**

552

* Enable event time processing for stream

553

* @param stream Input stream

554

* @param <T> Element type

555

* @return Stream with event time support

556

*/

557

public static <T> DataStream<T> withEventTime(DataStream<T> stream);

558

559

/**

560

* Assign timestamps and watermarks

561

* @param stream Input stream

562

* @param timestampAssigner Timestamp assigner

563

* @param <T> Element type

564

* @return Stream with timestamps and watermarks

565

*/

566

public static <T> DataStream<T> assignTimestampsAndWatermarks(

567

DataStream<T> stream,

568

TimestampAssigner<T> timestampAssigner

569

);

570

}

571

572

/**

573

* Interface for event time management

574

*/

575

interface EventTimeManager {

576

/**

577

* Get current event time

578

* @return Current event time

579

*/

580

long getCurrentEventTime();

581

582

/**

583

* Register event time timer

584

* @param timestamp Timer timestamp

585

*/

586

void registerTimer(long timestamp);

587

588

/**

589

* Delete event time timer

590

* @param timestamp Timer timestamp

591

*/

592

void deleteTimer(long timestamp);

593

}

594

595

/**

596

* Interface for assigning timestamps to elements

597

* @param <T> Element type

598

*/

599

interface TimestampAssigner<T> {

600

/**

601

* Extract timestamp from element

602

* @param element Element

603

* @param recordTimestamp Record timestamp

604

* @return Element timestamp

605

*/

606

long extractTimestamp(T element, long recordTimestamp);

607

}

608

```