or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md

datastream-integration.mddocs/

0

# DataStream Integration

1

2

Bi-directional conversion between DataStream and Table APIs with support for custom schemas, changelog processing, and type mapping. This enables seamless integration between stream processing and SQL operations.

3

4

## Capabilities

5

6

### DataStream to Table Conversion

7

8

Converts DataStream instances to Table objects for SQL processing.

9

10

```java { .api }

11

/**

12

* Converts DataStream to Table with automatic schema derivation

13

* @param dataStream The input DataStream to convert

14

* @return Table representation of the DataStream

15

*/

16

<T> Table fromDataStream(DataStream<T> dataStream);

17

18

/**

19

* Converts DataStream to Table with custom schema

20

* @param dataStream The input DataStream to convert

21

* @param schema Custom schema for column mapping and metadata

22

* @return Table representation with specified schema

23

*/

24

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

25

```

26

27

**Usage Examples:**

28

29

```java

30

import org.apache.flink.streaming.api.datastream.DataStream;

31

import org.apache.flink.table.api.Table;

32

import org.apache.flink.table.api.Schema;

33

import org.apache.flink.table.api.DataTypes;

34

import org.apache.flink.types.Row;

35

36

// Automatic schema derivation

37

DataStream<Row> orderStream = env.fromElements(

38

Row.of(1L, "Alice", 100.0),

39

Row.of(2L, "Bob", 200.0)

40

);

41

Table orderTable = tableEnv.fromDataStream(orderStream);

42

43

// Custom schema with computed columns

44

Schema orderSchema = Schema.newBuilder()

45

.column("order_id", DataTypes.BIGINT())

46

.column("customer", DataTypes.STRING())

47

.column("amount", DataTypes.DOUBLE())

48

.columnByExpression("tax", "amount * 0.1")

49

.columnByExpression("total", "amount + tax")

50

.build();

51

52

Table enrichedTable = tableEnv.fromDataStream(orderStream, orderSchema);

53

```

54

55

### Changelog Stream Processing

56

57

Processes DataStream of Row records with RowKind information for handling insertions, updates, and deletions.

58

59

```java { .api }

60

/**

61

* Converts changelog DataStream to Table with all RowKind changes

62

* @param dataStream DataStream of Row records with RowKind information

63

* @return Table supporting all changelog operations

64

*/

65

Table fromChangelogStream(DataStream<Row> dataStream);

66

67

/**

68

* Converts changelog DataStream to Table with custom schema

69

* @param dataStream DataStream of Row records with RowKind information

70

* @param schema Custom schema for the resulting table

71

* @return Table with specified schema supporting changelog operations

72

*/

73

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

74

75

/**

76

* Converts changelog DataStream to Table with explicit changelog mode

77

* @param dataStream DataStream of Row records with RowKind information

78

* @param schema Custom schema for the resulting table

79

* @param changelogMode Explicit changelog mode defining allowed operations

80

* @return Table with specified schema and changelog constraints

81

*/

82

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

83

```

84

85

**Usage Examples:**

86

87

```java

88

import org.apache.flink.table.connector.ChangelogMode;

89

import org.apache.flink.types.RowKind;

90

91

// Create changelog stream

92

DataStream<Row> changelogStream = env.fromElements(

93

Row.ofKind(RowKind.INSERT, 1L, "Alice", 100.0),

94

Row.ofKind(RowKind.UPDATE_AFTER, 1L, "Alice", 150.0),

95

Row.ofKind(RowKind.DELETE, 2L, "Bob", 200.0)

96

);

97

98

// Convert with automatic changelog mode

99

Table changelogTable = tableEnv.fromChangelogStream(changelogStream);

100

101

// Convert with upsert mode (no UPDATE_BEFORE required)

102

Table upsertTable = tableEnv.fromChangelogStream(

103

changelogStream,

104

schema,

105

ChangelogMode.upsert()

106

);

107

```

108

109

### Table to DataStream Conversion

110

111

Converts Table objects back to DataStream for further stream processing.

112

113

```java { .api }

114

/**

115

* Converts insert-only Table to DataStream of Row

116

* @param table The Table to convert (must be insert-only)

117

* @return DataStream of Row records

118

*/

119

DataStream<Row> toDataStream(Table table);

120

121

/**

122

* Converts insert-only Table to DataStream of specific type

123

* @param table The Table to convert (must be insert-only)

124

* @param targetClass Target class for type mapping

125

* @return DataStream of specified type

126

*/

127

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

128

129

/**

130

* Converts insert-only Table to DataStream with custom data type

131

* @param table The Table to convert (must be insert-only)

132

* @param targetDataType Target data type for conversion

133

* @return DataStream of specified data type

134

*/

135

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

136

```

137

138

**Usage Examples:**

139

140

```java

141

import org.apache.flink.api.common.typeinfo.Types;

142

import org.apache.flink.table.api.DataTypes;

143

144

// Convert to Row DataStream

145

Table resultTable = tableEnv.sqlQuery("SELECT customer, SUM(amount) as total FROM orders GROUP BY customer");

146

DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

147

148

// Convert to POJO class

149

public static class CustomerTotal {

150

public String customer;

151

public Double total;

152

153

public CustomerTotal() {}

154

public CustomerTotal(String customer, Double total) {

155

this.customer = customer;

156

this.total = total;

157

}

158

}

159

160

DataStream<CustomerTotal> pojoStream = tableEnv.toDataStream(resultTable, CustomerTotal.class);

161

162

// Convert with explicit data type

163

DataStream<CustomerTotal> typedStream = tableEnv.toDataStream(

164

resultTable,

165

DataTypes.of(CustomerTotal.class)

166

);

167

```

168

169

### Changelog DataStream Generation

170

171

Converts updating tables to changelog DataStream with RowKind information.

172

173

```java { .api }

174

/**

175

* Converts Table to changelog DataStream with all RowKind changes

176

* @param table The Table to convert (can be updating or insert-only)

177

* @return DataStream of Row with RowKind information

178

*/

179

DataStream<Row> toChangelogStream(Table table);

180

181

/**

182

* Converts Table to changelog DataStream with custom schema

183

* @param table The Table to convert

184

* @param targetSchema Schema for output format customization

185

* @return DataStream of Row with custom schema and RowKind

186

*/

187

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

188

189

/**

190

* Converts Table to changelog DataStream with explicit changelog mode

191

* @param table The Table to convert

192

* @param targetSchema Schema for output format

193

* @param changelogMode Required changelog mode for validation

194

* @return DataStream with specified changelog constraints

195

*/

196

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

197

```

198

199

**Usage Examples:**

200

201

```java

202

// Convert updating table to changelog stream

203

Table aggregatedTable = tableEnv.sqlQuery(

204

"SELECT customer, COUNT(*) as order_count FROM orders GROUP BY customer"

205

);

206

DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggregatedTable);

207

208

// Process changelog events

209

changelogStream.map(row -> {

210

RowKind kind = row.getKind();

211

switch (kind) {

212

case INSERT:

213

return "New customer: " + row.getField(0);

214

case UPDATE_AFTER:

215

return "Updated customer: " + row.getField(0);

216

case DELETE:

217

return "Customer removed: " + row.getField(0);

218

default:

219

return "Unknown change: " + row.getField(0);

220

}

221

});

222

```

223

224

### Legacy Conversion Methods (Deprecated)

225

226

Historical conversion methods maintained for backward compatibility.

227

228

```java { .api }

229

/**

230

* @deprecated Use fromDataStream(DataStream, Schema) instead

231

*/

232

@Deprecated

233

<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

234

235

/**

236

* @deprecated Use toDataStream(Table, Class) instead

237

*/

238

@Deprecated

239

<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

240

241

/**

242

* @deprecated Use toDataStream(Table, TypeInformation) instead

243

*/

244

@Deprecated

245

<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

246

247

/**

248

* @deprecated Use toChangelogStream(Table, Schema) instead

249

*/

250

@Deprecated

251

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

252

```

253

254

## Type Definitions

255

256

### Schema Configuration

257

258

```java { .api }

259

import org.apache.flink.table.api.Schema;

260

import org.apache.flink.table.api.DataTypes;

261

import org.apache.flink.api.common.typeinfo.TypeInformation;

262

import org.apache.flink.table.types.AbstractDataType;

263

import org.apache.flink.api.java.tuple.Tuple2;

264

265

// Advanced schema examples

266

Schema schema = Schema.newBuilder()

267

// Physical columns from DataStream

268

.column("id", DataTypes.BIGINT())

269

.column("name", DataTypes.STRING())

270

271

// Computed columns

272

.columnByExpression("name_length", "CHAR_LENGTH(name)")

273

274

// Metadata columns (timestamp extraction)

275

.columnByMetadata("proc_time", DataTypes.TIMESTAMP_LTZ(3), "timestamp")

276

.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))

277

278

// Watermark strategy

279

.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")

280

281

// Primary key definition

282

.primaryKey("id")

283

.build();

284

```

285

286

### Changelog Modes

287

288

```java { .api }

289

import org.apache.flink.table.connector.ChangelogMode;

290

291

// Available changelog modes

292

ChangelogMode.all() // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

293

ChangelogMode.insertOnly() // INSERT only

294

ChangelogMode.upsert() // INSERT, UPDATE_AFTER, DELETE (no UPDATE_BEFORE)

295

```

296

297

### Row Kind Operations

298

299

```java { .api }

300

import org.apache.flink.types.RowKind;

301

302

// RowKind enumeration

303

RowKind.INSERT // New record insertion

304

RowKind.UPDATE_BEFORE // Previous version of updated record

305

RowKind.UPDATE_AFTER // New version of updated record

306

RowKind.DELETE // Record deletion

307

```

308

309

### Type Mapping

310

311

DataStream to Table type mapping follows these rules:

312

313

- **Primitive types**: Direct mapping (int → INT, String → STRING)

314

- **Composite types**: Flattened to top-level columns

315

- **Row types**: Field-by-field mapping with position or name matching

316

- **POJO types**: Field name-based mapping with type validation

317

- **Tuple types**: Position-based mapping (f0, f1, f2, ...)

318

319

Table to DataStream type mapping:

320

321

- **Row class**: Direct field access by position

322

- **POJO classes**: Field name matching with type conversion

323

- **Tuple types**: Position-based assignment with type checking

324

- **Primitive types**: Single-column table conversion

325

326

### Legacy Source/Sink Base Classes (Deprecated)

327

328

Abstract base classes for implementing legacy table sources and sinks with InputFormat and OutputFormat.

329

330

```java { .api }

331

import org.apache.flink.api.common.io.InputFormat;

332

import org.apache.flink.api.common.io.OutputFormat;

333

import org.apache.flink.streaming.api.datastream.DataStreamSink;

334

335

/**

336

* Base class for bounded table sources using InputFormat

337

* @deprecated Use DynamicTableSource instead

338

*/

339

@Deprecated

340

public abstract class InputFormatTableSource<T> implements StreamTableSource<T> {

341

/**

342

* Returns an InputFormat for reading bounded table data

343

* @return InputFormat instance for data reading

344

*/

345

public abstract InputFormat<T, ?> getInputFormat();

346

347

/**

348

* Always returns true indicating this is a bounded source

349

* @return true for bounded sources

350

*/

351

public final boolean isBounded();

352

353

/**

354

* Creates DataStream using InputFormat

355

* @param execEnv StreamExecutionEnvironment for DataStream creation

356

* @return DataStream of table data

357

*/

358

public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);

359

}

360

361

/**

362

* Base class for bounded table sinks using OutputFormat

363

* @deprecated Use DynamicTableSink instead

364

*/

365

@Deprecated

366

public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {

367

/**

368

* Returns an OutputFormat for writing bounded table data

369

* @return OutputFormat instance for data writing

370

*/

371

public abstract OutputFormat<T> getOutputFormat();

372

373

/**

374

* Consumes DataStream using OutputFormat

375

* @param dataStream Input DataStream to consume

376

* @return DataStreamSink transformation

377

*/

378

public final DataStreamSink<T> consumeDataStream(DataStream<T> dataStream);

379

}

380

```

381

382

These legacy base classes are marked as both `@Deprecated` and `@Experimental`, indicating they are maintained for backward compatibility but should not be used in new implementations.