or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md

datastream-integration.mddocs/

0

# DataStream Integration

1

2

StreamTableEnvironment provides seamless integration between Flink's Table API and DataStream API, enabling conversion between tables and data streams for complex stream processing pipelines that combine both APIs.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Creates streaming table environments with DataStream integration capabilities.

9

10

```java { .api }

11

/**

12

* Creates a StreamTableEnvironment from a StreamExecutionEnvironment

13

* @param executionEnvironment The StreamExecutionEnvironment for stream processing

14

* @return StreamTableEnvironment with DataStream integration

15

*/

16

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

17

18

/**

19

* Creates a StreamTableEnvironment with custom settings

20

* @param executionEnvironment The StreamExecutionEnvironment for stream processing

21

* @param settings Environment settings for the table environment

22

* @return StreamTableEnvironment with specified settings

23

*/

24

static StreamTableEnvironment create(

25

StreamExecutionEnvironment executionEnvironment,

26

EnvironmentSettings settings

27

);

28

```

29

30

**Usage Examples:**

31

32

```java

33

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

34

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

35

36

// Create from execution environment

37

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

38

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

39

40

// With custom settings

41

EnvironmentSettings settings = EnvironmentSettings

42

.newInstance()

43

.inStreamingMode()

44

.build();

45

StreamTableEnvironment customTableEnv = StreamTableEnvironment.create(env, settings);

46

```

47

48

### DataStream to Table Conversion

49

50

Converts DataStream objects to Table objects for SQL and Table API operations.

51

52

```java { .api }

53

/**

54

* Creates a Table from a DataStream with automatic schema inference

55

* @param dataStream The DataStream to convert

56

* @return Table representing the DataStream

57

*/

58

<T> Table fromDataStream(DataStream<T> dataStream);

59

60

/**

61

* Creates a Table from a DataStream with explicit schema

62

* @param dataStream The DataStream to convert

63

* @param schema Schema definition for the resulting Table

64

* @return Table with specified schema

65

*/

66

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

67

68

/**

69

* Creates a Table from a DataStream with field expressions

70

* @param dataStream The DataStream to convert

71

* @param fields Expressions defining field mappings and types

72

* @return Table with specified field mappings

73

*/

74

<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

75

```

76

77

**Usage Examples:**

78

79

```java

80

// Simple POJO DataStream

81

DataStream<Order> orderStream = env.fromSource(orderSource, watermarkStrategy, "orders");

82

83

// Automatic schema inference

84

Table ordersTable = tableEnv.fromDataStream(orderStream);

85

86

// With explicit schema

87

Schema orderSchema = Schema.newBuilder()

88

.column("orderId", DataTypes.BIGINT())

89

.column("customerId", DataTypes.BIGINT())

90

.column("amount", DataTypes.DECIMAL(10, 2))

91

.column("orderTime", DataTypes.TIMESTAMP(3))

92

.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))

93

.build();

94

95

Table ordersWithSchema = tableEnv.fromDataStream(orderStream, orderSchema);

96

97

// With field expressions

98

Table ordersWithFields = tableEnv.fromDataStream(

99

orderStream,

100

$("orderId"),

101

$("customerId"),

102

$("amount"),

103

$("orderTime").rowtime()

104

);

105

```

106

107

### Table to DataStream Conversion

108

109

Converts Table objects back to DataStream objects for further stream processing.

110

111

```java { .api }

112

/**

113

* Converts a Table to a DataStream of Row objects

114

* @param table The Table to convert

115

* @return DataStream<Row> representing the Table data

116

*/

117

DataStream<Row> toDataStream(Table table);

118

119

/**

120

* Converts a Table to a DataStream with explicit target class

121

* @param table The Table to convert

122

* @param targetClass Target class for DataStream elements

123

* @return DataStream with specified element type

124

*/

125

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

126

127

/**

128

* Converts a Table to a DataStream with specific data type

129

* @param table The Table to convert

130

* @param targetDataType Target data type for DataStream elements

131

* @return DataStream with specified data type

132

*/

133

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

134

```

135

136

**Usage Examples:**

137

138

```java

139

// Table with aggregated results

140

Table aggregatedOrders = tableEnv

141

.from("orders")

142

.groupBy($("customerId"))

143

.select($("customerId"), $("amount").sum().as("totalAmount"));

144

145

// Convert to DataStream with automatic inference

146

DataStream<Row> resultStream = tableEnv.toDataStream(aggregatedOrders);

147

148

// Convert to specific POJO class

149

@Data

150

public class CustomerTotal {

151

public Long customerId;

152

public BigDecimal totalAmount;

153

}

154

155

DataStream<CustomerTotal> pojoStream = tableEnv.toDataStream(

156

aggregatedOrders,

157

CustomerTotal.class

158

);

159

160

// Convert with explicit data type

161

DataStream<Row> typedStream = tableEnv.toDataStream(

162

aggregatedOrders,

163

DataTypes.ROW(

164

DataTypes.FIELD("customerId", DataTypes.BIGINT()),

165

DataTypes.FIELD("totalAmount", DataTypes.DECIMAL(10, 2))

166

)

167

);

168

```

169

170

### Changelog Stream Operations

171

172

Handles change data capture (CDC) and changelog streams for maintaining state consistency.

173

174

```java { .api }

175

/**

176

* Converts a Table to a changelog DataStream

177

* @param table The Table to convert (should support changelog)

178

* @return DataStream of Row with RowKind information

179

*/

180

DataStream<Row> toChangelogStream(Table table);

181

182

/**

183

* Converts a Table to a changelog DataStream with explicit schema

184

* @param table The Table to convert

185

* @param targetSchema Schema for the resulting changelog stream

186

* @return DataStream of Row with RowKind information

187

*/

188

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

189

190

/**

191

* Converts a Table to a changelog DataStream with explicit schema and changelog mode

192

* @param table The Table to convert

193

* @param targetSchema Schema for the resulting changelog stream

194

* @param changelogMode Changelog mode specifying supported operations

195

* @return DataStream of Row with RowKind information

196

*/

197

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

198

```

199

200

**Usage Examples:**

201

202

```java

203

// Table with updates and deletes

204

Table updatingTable = tableEnv

205

.from("user_updates")

206

.groupBy($("userId"))

207

.select($("userId"), $("lastUpdate").max().as("latestUpdate"));

208

209

// Convert to changelog stream

210

DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);

211

212

// Process changelog events

213

changelogStream.map(new MapFunction<Row, String>() {

214

@Override

215

public String map(Row row) throws Exception {

216

RowKind kind = row.getKind();

217

switch (kind) {

218

case INSERT:

219

return "New user: " + row.getField(0);

220

case UPDATE_AFTER:

221

return "Updated user: " + row.getField(0);

222

case DELETE:

223

return "Deleted user: " + row.getField(0);

224

default:

225

return "Unknown change: " + row.getField(0);

226

}

227

}

228

});

229

```

230

231

### Changelog to Table Conversion

232

233

Converts changelog DataStreams back to Table objects for further table operations.

234

235

```java { .api }

236

/**

237

* Converts a changelog DataStream to a Table

238

* @param dataStream Changelog DataStream with RowKind information

239

* @return Table representing the changelog stream

240

*/

241

Table fromChangelogStream(DataStream<Row> dataStream);

242

243

/**

244

* Converts a changelog DataStream to a Table with explicit schema

245

* @param dataStream Changelog DataStream with RowKind information

246

* @param schema Schema for the resulting table

247

* @return Table with specified schema

248

*/

249

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

250

251

/**

252

* Converts a changelog DataStream to a Table with schema and changelog mode

253

* @param dataStream Changelog DataStream with RowKind information

254

* @param schema Schema for the resulting table

255

* @param changelogMode Changelog mode specifying supported operations

256

* @return Table with specified schema and changelog mode

257

*/

258

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

259

```

260

261

**Usage Examples:**

262

263

```java

264

// DataStream with changelog semantics

265

DataStream<Row> changelogStream = env.addSource(new ChangelogSource());

266

267

// Convert to table with automatic schema inference

268

Table changelogTable = tableEnv.fromChangelogStream(changelogStream);

269

270

// Convert with explicit schema

271

Schema explicitSchema = Schema.newBuilder()

272

.column("user_id", DataTypes.BIGINT())

273

.column("user_name", DataTypes.STRING())

274

.column("last_login", DataTypes.TIMESTAMP(3))

275

.build();

276

277

Table typedChangelogTable = tableEnv.fromChangelogStream(changelogStream, explicitSchema);

278

279

// With changelog mode specification

280

ChangelogMode updateMode = ChangelogMode.newBuilder()

281

.addContainedKind(RowKind.INSERT)

282

.addContainedKind(RowKind.UPDATE_AFTER)

283

.addContainedKind(RowKind.DELETE)

284

.build();

285

286

Table modeAwareTable = tableEnv.fromChangelogStream(

287

changelogStream,

288

explicitSchema,

289

updateMode

290

);

291

```

292

293

### Temporary View Creation

294

295

Creates temporary views from DataStream objects for SQL query access.

296

297

```java { .api }

298

/**

299

* Creates a temporary view from a DataStream

300

* @param path View name/path

301

* @param dataStream DataStream to create view from

302

*/

303

<T> void createTemporaryView(String path, DataStream<T> dataStream);

304

305

/**

306

* Creates a temporary view from a DataStream with explicit schema

307

* @param path View name/path

308

* @param dataStream DataStream to create view from

309

* @param schema Schema for the view

310

*/

311

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

312

```

313

314

**Usage Examples:**

315

316

```java

317

DataStream<Order> orderStream = env.addSource(new OrderSource());

318

319

// Create temporary view with automatic schema inference

320

tableEnv.createTemporaryView("orders", orderStream);

321

322

// Create view with explicit schema

323

Schema orderSchema = Schema.newBuilder()

324

.column("order_id", DataTypes.BIGINT())

325

.column("customer_id", DataTypes.BIGINT())

326

.column("amount", DataTypes.DECIMAL(10, 2))

327

.column("order_time", DataTypes.TIMESTAMP(3))

328

.watermark("order_time", $("order_time").minus(lit(5).seconds()))

329

.build();

330

331

tableEnv.createTemporaryView("orders_with_watermark", orderStream, orderSchema);

332

333

// Use in SQL queries

334

Table results = tableEnv.sqlQuery(

335

"SELECT customer_id, SUM(amount) as total_amount " +

336

"FROM orders_with_watermark " +

337

"GROUP BY customer_id"

338

);

339

```

340

341

### Statement Set Creation

342

343

Creates statement sets for executing multiple streaming operations together.

344

345

```java { .api }

346

/**

347

* Creates a StreamStatementSet for batch execution of multiple streaming operations

348

* @return StreamStatementSet for adding multiple statements

349

*/

350

StreamStatementSet createStatementSet();

351

```

352

353

**Usage Examples:**

354

355

```java

356

// Create statement set for batch execution

357

StreamStatementSet statementSet = tableEnv.createStatementSet();

358

359

// Add multiple insert statements

360

Table processedOrders = tableEnv.from("orders")

361

.filter($("status").isEqual("processed"));

362

363

statementSet.addInsert("processed_orders_sink", processedOrders);

364

365

Table failedOrders = tableEnv.from("orders")

366

.filter($("status").isEqual("failed"));

367

368

statementSet.addInsert("failed_orders_sink", failedOrders);

369

370

// Execute all statements together

371

TableResult result = statementSet.execute();

372

373

// Or add SQL statements

374

statementSet.addInsertSql(

375

"INSERT INTO daily_summary " +

376

"SELECT DATE(order_time) as order_date, COUNT(*) as total_orders " +

377

"FROM orders " +

378

"GROUP BY DATE(order_time)"

379

);

380

```

381

382

### Legacy Conversion Methods (Deprecated)

383

384

Older methods for DataStream conversion that are now deprecated but still available for backward compatibility.

385

386

```java { .api }

387

/**

388

* @deprecated Use fromDataStream() instead

389

* Registers a DataStream as a temporary table

390

*/

391

@Deprecated

392

<T> void registerDataStream(String name, DataStream<T> dataStream);

393

394

/**

395

* @deprecated Use toDataStream() instead

396

* Converts a Table to an append-only DataStream

397

*/

398

@Deprecated

399

<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

400

401

/**

402

* @deprecated Use toChangelogStream() instead

403

* Converts a Table to a retract DataStream

404

*/

405

@Deprecated

406

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

407

```

408

409

410

### Schema Definition for Conversion

411

412

Explicit schema definitions for controlling DataStream to Table conversions.

413

414

```java { .api }

415

class Schema {

416

/**

417

* Creates a new schema builder

418

* @return Builder for constructing schemas

419

*/

420

static Builder newBuilder();

421

422

interface Builder {

423

Builder column(String columnName, AbstractDataType<?> dataType);

424

Builder columnByExpression(String columnName, String expression);

425

Builder columnByMetadata(String columnName, AbstractDataType<?> dataType);

426

Builder columnByMetadata(String columnName, AbstractDataType<?> dataType, String metadataKey);

427

Builder watermark(String columnName, Expression watermarkExpression);

428

Builder primaryKey(String... columnNames);

429

Schema build();

430

}

431

}

432

```

433

434

**Usage Examples:**

435

436

```java

437

// Complex schema with computed columns and watermarks

438

Schema complexSchema = Schema.newBuilder()

439

.column("user_id", DataTypes.BIGINT())

440

.column("event_time", DataTypes.TIMESTAMP(3))

441

.column("event_data", DataTypes.STRING())

442

.columnByExpression("hour_of_day", "EXTRACT(HOUR FROM event_time)")

443

.columnByMetadata("kafka_offset", DataTypes.BIGINT(), "offset")

444

.watermark("event_time", $("event_time").minus(lit(30).seconds()))

445

.primaryKey("user_id", "event_time")

446

.build();

447

448

// Apply schema during conversion

449

Table enrichedEvents = tableEnv.fromDataStream(eventStream, complexSchema);

450

```

451

452

## Types

453

454

### Stream Table Environment

455

456

```java { .api }

457

interface StreamTableEnvironment extends TableEnvironment {

458

// Inherits all TableEnvironment methods

459

// Plus DataStream integration methods listed above

460

}

461

```

462

463

### Temporal Table Function

464

465

```java { .api }

466

class TemporalTableFunction extends TableFunction<Row> {

467

// Used for temporal joins in streaming scenarios

468

// Automatically created by createTemporalTableFunction()

469

}

470

```

471

472

### Row and RowKind

473

474

```java { .api }

475

class Row {

476

Object getField(int pos);

477

Object getField(String name);

478

int getArity();

479

RowKind getKind();

480

void setKind(RowKind kind);

481

}

482

483

enum RowKind {

484

INSERT, // +I: Insert operation

485

UPDATE_BEFORE, // -U: Update before (old value)

486

UPDATE_AFTER, // +U: Update after (new value)

487

DELETE // -D: Delete operation

488

}

489

```