or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md

datastream-integration.mddocs/

0

# DataStream Integration

1

2

This document covers the seamless conversion between Flink Tables and DataStreams, enabling hybrid processing workflows that combine Table API with DataStream API capabilities.

3

4

## StreamTableEnvironment

5

6

The StreamTableEnvironment provides integration between Table API and DataStream API.

7

8

### Environment Creation

9

10

```java { .api }

11

class StreamTableEnvironment {

12

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

13

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

14

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);

15

}

16

```

17

18

**Usage:**

19

20

```java

21

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

22

23

// Create with default settings

24

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

25

26

// Create with custom settings

27

EnvironmentSettings settings = EnvironmentSettings.newInstance()

28

.useBlinkPlanner()

29

.inStreamingMode()

30

.build();

31

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

32

```

33

34

## DataStream to Table Conversion

35

36

### Basic Conversion

37

38

```java { .api }

39

interface StreamTableEnvironment {

40

Table fromDataStream(DataStream<?> dataStream);

41

Table fromDataStream(DataStream<?> dataStream, Expression... fields);

42

Table fromDataStream(DataStream<?> dataStream, Schema schema);

43

44

// Changelog stream conversion

45

Table fromChangelogStream(DataStream<Row> dataStream);

46

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

47

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

48

}

49

```

50

51

**Usage:**

52

53

```java

54

DataStream<Tuple3<String, Integer, Long>> dataStream = env.fromElements(

55

Tuple3.of("Alice", 25, 1000L),

56

Tuple3.of("Bob", 30, 2000L)

57

);

58

59

// Convert with automatic field inference

60

Table table1 = tableEnv.fromDataStream(dataStream);

61

62

// Convert with field mapping

63

Table table2 = tableEnv.fromDataStream(dataStream, $("name"), $("age"), $("salary"));

64

65

// Convert with explicit schema

66

Schema schema = Schema.newBuilder()

67

.column("name", DataTypes.STRING())

68

.column("age", DataTypes.INT())

69

.column("salary", DataTypes.BIGINT())

70

.build();

71

Table table3 = tableEnv.fromDataStream(dataStream, schema);

72

```

73

74

### Complex Type Conversion

75

76

```java

77

// POJO conversion

78

public static class User {

79

public String name;

80

public int age;

81

public long timestamp;

82

// constructors, getters, setters

83

}

84

85

DataStream<User> userStream = env.addSource(new UserSource());

86

Table userTable = tableEnv.fromDataStream(userStream);

87

88

// Row type conversion

89

DataStream<Row> rowStream = env.addSource(new RowSource());

90

Table rowTable = tableEnv.fromDataStream(rowStream,

91

$("user_id").bigint(),

92

$("event_time").timestamp(3),

93

$("event_type").string()

94

);

95

```

96

97

### Time Attribute Mapping

98

99

```java

100

DataStream<Tuple3<String, Long, String>> eventStream = env.addSource(new EventSource());

101

102

// Processing time

103

Table table = tableEnv.fromDataStream(eventStream,

104

$("user_id"),

105

$("event_time").rowtime(), // Event time from field

106

$("event_type"),

107

$("proc_time").proctime() // Processing time

108

);

109

110

// Event time with watermarks

111

DataStream<Event> watermarkedStream = eventStream

112

.assignTimestampsAndWatermarks(

113

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))

114

.withTimestampAssigner((event, timestamp) -> event.timestamp)

115

);

116

117

Table eventTable = tableEnv.fromDataStream(watermarkedStream,

118

$("user_id"),

119

$("timestamp").rowtime(),

120

$("event_type")

121

);

122

```

123

124

## Table to DataStream Conversion

125

126

### Basic Conversion

127

128

```java { .api }

129

interface StreamTableEnvironment {

130

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

131

<T> DataStream<T> toDataStream(Table table, AbstractDataType<T> targetDataType);

132

DataStream<Row> toDataStream(Table table);

133

134

// Legacy methods (deprecated)

135

<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

136

<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

137

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

138

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);

139

}

140

```

141

142

**Usage:**

143

144

```java

145

Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18");

146

147

// Convert to Row (default)

148

DataStream<Row> rowStream = tableEnv.toDataStream(table);

149

150

// Convert to POJO

151

DataStream<User> userStream = tableEnv.toDataStream(table, User.class);

152

153

// Convert to Tuple

154

DataStream<Tuple2<String, Integer>> tupleStream = tableEnv.toDataStream(table,

155

Types.TUPLE(Types.STRING, Types.INT));

156

```

157

158

### Changelog Stream Conversion

159

160

```java { .api }

161

interface StreamTableEnvironment {

162

DataStream<Row> toChangelogStream(Table table);

163

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

164

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

165

}

166

```

167

168

**Usage:**

169

170

```java

171

Table aggregatedTable = tableEnv.sqlQuery(

172

"SELECT user_id, COUNT(*) as cnt FROM events GROUP BY user_id"

173

);

174

175

// Convert to changelog stream (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)

176

DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggregatedTable);

177

178

changelogStream.process(new ProcessFunction<Row, String>() {

179

@Override

180

public void processElement(Row row, Context ctx, Collector<String> out) {

181

RowKind kind = row.getKind();

182

String message = String.format("Kind: %s, Data: %s", kind, row);

183

out.collect(message);

184

}

185

});

186

```

187

188

## Temporary Views from DataStreams

189

190

```java { .api }

191

interface StreamTableEnvironment {

192

void createTemporaryView(String path, DataStream<?> dataStream);

193

void createTemporaryView(String path, DataStream<?> dataStream, Expression... fields);

194

void createTemporaryView(String path, DataStream<?> dataStream, Schema schema);

195

}

196

```

197

198

**Usage:**

199

200

```java

201

DataStream<Order> orderStream = env.addSource(new OrderSource());

202

203

// Create temporary view

204

tableEnv.createTemporaryView("orders", orderStream,

205

$("order_id"),

206

$("customer_id"),

207

$("amount"),

208

$("order_time").rowtime()

209

);

210

211

// Use in SQL

212

Table result = tableEnv.sqlQuery(

213

"SELECT customer_id, SUM(amount) " +

214

"FROM orders " +

215

"GROUP BY customer_id, TUMBLE(order_time, INTERVAL '1' HOUR)"

216

);

217

```

218

219

## Batch Integration (Legacy)

220

221

For batch processing with DataSet API (deprecated in newer versions):

222

223

```java { .api }

224

class BatchTableEnvironment {

225

static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment);

226

227

Table fromDataSet(DataSet<?> dataSet);

228

Table fromDataSet(DataSet<?> dataSet, Expression... fields);

229

230

<T> DataSet<T> toDataSet(Table table, Class<T> clazz);

231

DataSet<Row> toDataSet(Table table);

232

}

233

```

234

235

## Scala Integration

236

237

### StreamTableEnvironment (Scala)

238

239

```scala { .api }

240

object StreamTableEnvironment {

241

def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

242

def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment

243

}

244

245

class StreamTableEnvironment {

246

def fromDataStream[T](dataStream: DataStream[T]): Table

247

def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table

248

249

def toAppendStream[T: TypeInformation](table: Table): DataStream[T]

250

def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]

251

}

252

```

253

254

**Scala Usage:**

255

256

```scala

257

import org.apache.flink.streaming.api.scala._

258

import org.apache.flink.table.api.bridge.scala._

259

260

val env = StreamExecutionEnvironment.getExecutionEnvironment

261

val tableEnv = StreamTableEnvironment.create(env)

262

263

// DataStream to Table

264

val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25), ("Bob", 30))

265

val table = tableEnv.fromDataStream(dataStream, 'name, 'age)

266

267

// Table to DataStream

268

val resultStream: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)

269

```

270

271

## Type System Integration

272

273

### Type Inference

274

275

```java { .api }

276

class TypeInference {

277

static DataType inferDataType(Class<?> clazz);

278

static DataType inferDataType(TypeInformation<?> typeInfo);

279

}

280

```

281

282

### Row Kind Handling

283

284

```java { .api }

285

enum RowKind {

286

INSERT("+I"),

287

UPDATE_BEFORE("-U"),

288

UPDATE_AFTER("+U"),

289

DELETE("-D");

290

}

291

292

class Row {

293

RowKind getKind();

294

void setKind(RowKind kind);

295

Object getField(int pos);

296

Object getField(String name);

297

}

298

```

299

300

## Configuration

301

302

### Execution Configuration

303

304

```java

305

// Configure execution environment

306

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

307

env.getConfig().setAutoWatermarkInterval(1000);

308

309

// Configure table environment

310

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));

311

tableEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");

312

```

313

314

### Memory Management

315

316

```java

317

// Configure state backend

318

env.setStateBackend(new HashMapStateBackend());

319

env.getCheckpointConfig().setCheckpointStorage("file:///checkpoints");

320

321

// Configure memory

322

Configuration config = new Configuration();

323

config.setString("taskmanager.memory.process.size", "1024m");

324

config.setString("taskmanager.memory.flink.size", "768m");

325

```

326

327

## Common Patterns

328

329

### Hybrid Processing Pipeline

330

331

```java

332

// Start with DataStream processing

333

DataStream<Event> eventStream = env

334

.addSource(new KafkaSource())

335

.filter(event -> event.isValid())

336

.keyBy(Event::getUserId);

337

338

// Convert to Table for SQL processing

339

tableEnv.createTemporaryView("events", eventStream);

340

341

Table aggregated = tableEnv.sqlQuery(

342

"SELECT user_id, COUNT(*) as event_count, " +

343

" TUMBLE_START(event_time, INTERVAL '5' MINUTES) as window_start " +

344

"FROM events " +

345

"GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTES)"

346

);

347

348

// Convert back to DataStream for complex processing

349

DataStream<UserStats> statsStream = tableEnv.toDataStream(aggregated, UserStats.class);

350

351

statsStream

352

.keyBy(UserStats::getUserId)

353

.process(new ComplexStatefulFunction())

354

.addSink(new ElasticsearchSink<>());

355

```

356

357

### Real-time Feature Engineering

358

359

```java

360

// Raw events from multiple sources

361

DataStream<ClickEvent> clicks = env.addSource(new ClickEventSource());

362

DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseEventSource());

363

364

// Register as tables

365

tableEnv.createTemporaryView("clicks", clicks);

366

tableEnv.createTemporaryView("purchases", purchases);

367

368

// Feature engineering with SQL

369

Table features = tableEnv.sqlQuery(

370

"SELECT " +

371

" c.user_id," +

372

" COUNT(c.click_id) as clicks_last_hour," +

373

" COALESCE(SUM(p.amount), 0) as purchases_last_hour " +

374

"FROM clicks c " +

375

"LEFT JOIN purchases p ON c.user_id = p.user_id " +

376

" AND p.purchase_time BETWEEN c.click_time - INTERVAL '1' HOUR AND c.click_time " +

377

"WHERE c.click_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +

378

"GROUP BY c.user_id"

379

);

380

381

// Output as enriched stream

382

DataStream<UserFeatures> featureStream = tableEnv.toDataStream(features, UserFeatures.class);

383

```

384

385

## Error Handling

386

387

```java { .api }

388

class StreamTableException extends TableException;

389

class DataStreamConversionException extends StreamTableException;

390

```

391

392

## Types

393

394

```java { .api }

395

class Schema {

396

static Schema.Builder newBuilder();

397

398

interface Builder {

399

Builder column(String columnName, DataType dataType);

400

Builder columnByExpression(String columnName, String expression);

401

Builder columnByMetadata(String columnName, DataType dataType);

402

Builder watermark(String columnName, String watermarkExpression);

403

Builder primaryKey(String... columnNames);

404

Schema build();

405

}

406

}

407

408

enum ChangelogMode {

409

INSERT_ONLY,

410

UPSERT,

411

ALL

412

}

413

414

interface AbstractDataType<T>;

415

class DataTypes {

416

static AbstractDataType<Row> ROW(AbstractDataType<?>... fieldDataTypes);

417

static <T> AbstractDataType<T> of(Class<T> expectedClass);

418

}

419

```