or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddata-types.mddatastream-bridge.mdexpressions.mdfunctions.mdindex.mdsql-gateway.mdtable-operations.md

datastream-bridge.mddocs/

0

# DataStream Integration

1

2

Bridge between Table API and DataStream API enabling conversion between tables and data streams for hybrid stream/batch processing applications.

3

4

## Capabilities

5

6

### StreamTableEnvironment

7

8

Specialized TableEnvironment that provides seamless integration between Table API and DataStream API for streaming applications.

9

10

```java { .api }

11

/**

12

* Create StreamTableEnvironment from StreamExecutionEnvironment

13

* @param streamEnv Existing StreamExecutionEnvironment

14

* @return StreamTableEnvironment instance

15

*/

16

public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv);

17

18

/**

19

* Create StreamTableEnvironment with specific settings

20

* @param streamEnv Existing StreamExecutionEnvironment

21

* @param settings Table environment configuration

22

* @return StreamTableEnvironment instance

23

*/

24

public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv,

25

EnvironmentSettings settings);

26

27

/**

28

* Get the underlying StreamExecutionEnvironment

29

* @return StreamExecutionEnvironment instance

30

*/

31

public StreamExecutionEnvironment getStreamExecutionEnvironment();

32

```

33

34

**Basic Setup:**

35

36

```java

37

// Create streaming environment

38

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

39

env.setParallelism(4);

40

env.enableCheckpointing(10000);

41

42

// Create table environment

43

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

44

45

// Configure table environment

46

tEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");

47

```

48

49

### DataStream to Table Conversion

50

51

Convert DataStream instances to Table for SQL and Table API operations.

52

53

```java { .api }

54

/**

55

* Convert DataStream to Table using automatic schema inference

56

* @param dataStream DataStream to convert

57

* @return Table representing the stream data

58

*/

59

public <T> Table fromDataStream(DataStream<T> dataStream);

60

61

/**

62

* Convert DataStream to Table with explicit field selection

63

* @param dataStream DataStream to convert

64

* @param fields Expressions defining the table schema

65

* @return Table with specified schema

66

*/

67

public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

68

69

/**

70

* Convert DataStream to Table with explicit schema

71

* @param dataStream DataStream to convert

72

* @param schema Complete schema definition including watermarks

73

* @return Table with specified schema

74

*/

75

public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

76

77

/**

78

* Create a temporary view from DataStream

79

* @param name View name for SQL queries

80

* @param dataStream DataStream to expose as view

81

*/

82

public <T> void createTemporaryView(String name, DataStream<T> dataStream);

83

84

/**

85

* Create a temporary view from DataStream with schema

86

* @param name View name for SQL queries

87

* @param dataStream DataStream to expose as view

88

* @param schema Schema definition for the view

89

*/

90

public <T> void createTemporaryView(String name, DataStream<T> dataStream, Schema schema);

91

```

92

93

**DataStream to Table Examples:**

94

95

```java

96

// Simple POJO conversion

97

DataStream<Order> orderStream = env.addSource(new OrderSource());

98

99

// Automatic schema inference from POJO

100

Table orders = tEnv.fromDataStream(orderStream);

101

102

// Explicit field selection and aliasing

103

Table ordersWithAlias = tEnv.fromDataStream(orderStream,

104

$("orderId").as("id"),

105

$("customerId"),

106

$("amount"),

107

$("orderTime"));

108

109

// Complex schema with watermarks for event time

110

Schema orderSchema = Schema.newBuilder()

111

.column("orderId", DataTypes.BIGINT())

112

.column("customerId", DataTypes.BIGINT())

113

.column("amount", DataTypes.DECIMAL(10, 2))

114

.column("orderTime", DataTypes.TIMESTAMP_LTZ(3))

115

.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))

116

.build();

117

118

Table ordersWithWatermark = tEnv.fromDataStream(orderStream, orderSchema);

119

120

// Create temporary view for SQL access

121

tEnv.createTemporaryView("orders", orderStream, orderSchema);

122

tEnv.executeSql("SELECT customerId, SUM(amount) FROM orders " +

123

"WHERE orderTime > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +

124

"GROUP BY customerId").print();

125

```

126

127

### Table to DataStream Conversion

128

129

Convert Table instances back to DataStream for stream processing operations.

130

131

```java { .api }

132

/**

133

* Convert Table to DataStream with automatic type inference

134

* @param table Table to convert

135

* @return DataStream with Row type

136

*/

137

public DataStream<Row> toDataStream(Table table);

138

139

/**

140

* Convert Table to DataStream with specific target type

141

* @param table Table to convert

142

* @param targetClass Target Java class for stream elements

143

* @return DataStream with specified type

144

*/

145

public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

146

147

/**

148

* Convert Table to DataStream with type information

149

* @param table Table to convert

150

* @param targetType TypeInformation for stream elements

151

* @return DataStream with specified type

152

*/

153

public <T> DataStream<T> toDataStream(Table table, TypeInformation<T> targetType);

154

155

/**

156

* Convert changing table to changelog stream

157

* @param table Table to convert (may contain updates/deletes)

158

* @return DataStream of Row with change flags

159

*/

160

public DataStream<Row> toChangelogStream(Table table);

161

162

/**

163

* Convert changing table to changelog stream with specific type

164

* @param table Table to convert

165

* @param targetClass Target Java class for stream elements

166

* @return DataStream with change information

167

*/

168

public <T> DataStream<T> toChangelogStream(Table table, Class<T> targetClass);

169

170

/**

171

* Convert changing table to retract stream

172

* @param table Table to convert

173

* @return DataStream of Tuple2<Boolean, Row> where Boolean indicates add/retract

174

*/

175

public DataStream<Tuple2<Boolean, Row>> toRetractStream(Table table);

176

177

/**

178

* Convert changing table to retract stream with specific type

179

* @param table Table to convert

180

* @param targetClass Target Java class for stream elements

181

* @return DataStream of Tuple2<Boolean, T> with retract information

182

*/

183

public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> targetClass);

184

```

185

186

**Table to DataStream Examples:**

187

188

```java

189

// Basic conversion to Row DataStream

190

Table filteredOrders = tEnv.from("orders")

191

.filter($("amount").isGreater(lit(100)));

192

DataStream<Row> resultStream = tEnv.toDataStream(filteredOrders);

193

194

// Convert to specific POJO type

195

DataStream<OrderSummary> summaryStream = tEnv.toDataStream(

196

tEnv.from("orders")

197

.groupBy($("customerId"))

198

.select($("customerId"), $("amount").sum().as("totalAmount")),

199

OrderSummary.class

200

);

201

202

// Handle updates with changelog stream

203

Table customerTotals = tEnv.from("orders")

204

.groupBy($("customerId"))

205

.select($("customerId"), $("amount").sum().as("total"));

206

207

DataStream<Row> changelogStream = tEnv.toChangelogStream(customerTotals);

208

changelogStream.process(new ProcessFunction<Row, String>() {

209

@Override

210

public void processElement(Row row, Context ctx, Collector<String> out) {

211

RowKind kind = row.getKind();

212

switch (kind) {

213

case INSERT:

214

out.collect("New customer total: " + row);

215

break;

216

case UPDATE_AFTER:

217

out.collect("Updated customer total: " + row);

218

break;

219

case DELETE:

220

out.collect("Removed customer: " + row);

221

break;

222

}

223

}

224

});

225

226

// Use retract stream for legacy compatibility

227

DataStream<Tuple2<Boolean, OrderSummary>> retractStream =

228

tEnv.toRetractStream(customerTotals, OrderSummary.class);

229

```

230

231

### Event Time and Watermarks

232

233

Handle event time processing and watermark propagation between DataStream and Table API.

234

235

```java { .api }

236

// Event time assignment in DataStream before conversion

237

DataStream<Order> ordersWithEventTime = orderStream

238

.assignTimestampsAndWatermarks(

239

WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))

240

.withTimestampAssigner((order, timestamp) -> order.getOrderTime())

241

);

242

243

// Convert to Table preserving event time

244

Table ordersTable = tEnv.fromDataStream(ordersWithEventTime,

245

Schema.newBuilder()

246

.column("orderId", DataTypes.BIGINT())

247

.column("customerId", DataTypes.BIGINT())

248

.column("amount", DataTypes.DECIMAL(10, 2))

249

.column("orderTime", DataTypes.TIMESTAMP_LTZ(3))

250

.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))

251

.build()

252

);

253

```

254

255

### Type System Integration

256

257

Handle type conversions and mappings between DataStream and Table type systems.

258

259

```java { .api }

260

/**

261

* Type mapping utilities for DataStream-Table integration

262

*/

263

public class TypeConversions {

264

/**

265

* Convert DataStream TypeInformation to Table DataType

266

* @param typeInfo DataStream type information

267

* @return Equivalent Table API data type

268

*/

269

public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo);

270

271

/**

272

* Convert Table DataType to DataStream TypeInformation

273

* @param dataType Table API data type

274

* @return Equivalent DataStream type information

275

*/

276

public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType);

277

}

278

```

279

280

**Type Mapping Examples:**

281

282

```java

283

// Complex POJO with nested fields

284

@Data

285

public class ComplexOrder {

286

public Long orderId;

287

public CustomerInfo customer;

288

public List<OrderItem> items;

289

public Instant orderTime;

290

291

@Data

292

public static class CustomerInfo {

293

public Long customerId;

294

public String name;

295

public String email;

296

}

297

298

@Data

299

public static class OrderItem {

300

public String productId;

301

public Integer quantity;

302

public BigDecimal price;

303

}

304

}

305

306

// DataStream with complex type

307

DataStream<ComplexOrder> complexOrderStream = env.addSource(new ComplexOrderSource());

308

309

// Convert with nested field access

310

Table complexOrders = tEnv.fromDataStream(complexOrderStream,

311

$("orderId"),

312

$("customer.customerId").as("customerId"),

313

$("customer.name").as("customerName"),

314

$("items").cardinality().as("itemCount"),

315

$("orderTime"));

316

317

// Flatten nested structure with Table API

318

Table flattenedOrders = complexOrders

319

.joinLateral(call("EXPLODE", $("items")).as("item"))

320

.select($("orderId"),

321

$("customerId"),

322

$("item.productId").as("productId"),

323

$("item.quantity").as("quantity"));

324

```

325

326

### Stream Processing Patterns

327

328

Common patterns for combining DataStream and Table API operations.

329

330

```java { .api }

331

// Pattern 1: Stream -> Table -> Stream pipeline

332

DataStream<RawEvent> rawEvents = env.addSource(new EventSource());

333

334

// Data cleaning and enrichment with Table API

335

Table cleanedEvents = tEnv.fromDataStream(rawEvents)

336

.filter($("value").isNotNull())

337

.select($("eventId"),

338

$("value").upperCase().as("cleanValue"),

339

$("timestamp"))

340

.join(tEnv.from("reference_data"),

341

$("eventId").isEqual($("reference_data.id")));

342

343

// Continue processing with DataStream API

344

DataStream<EnrichedEvent> enrichedStream = tEnv.toDataStream(cleanedEvents, EnrichedEvent.class);

345

enrichedStream

346

.keyBy(EnrichedEvent::getCategory)

347

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

348

.aggregate(new EventAggregator())

349

.addSink(new ResultSink());

350

351

// Pattern 2: Hybrid aggregation with state

352

Table continuousAggregates = tEnv.from("event_stream")

353

.window(Tumble.over(lit(1).minutes()).on($("eventTime")).as("w"))

354

.groupBy($("w"), $("category"))

355

.select($("category"),

356

$("w").start().as("windowStart"),

357

$("value").sum().as("total"));

358

359

DataStream<WindowResult> aggregateStream = tEnv.toDataStream(continuousAggregates, WindowResult.class);

360

361

// Add custom stateful processing

362

aggregateStream

363

.keyBy(WindowResult::getCategory)

364

.process(new StatefulProcessor())

365

.addSink(new AlertSink());

366

```

367

368

### Configuration and Optimization

369

370

Configuration options for optimizing DataStream-Table integration.

371

372

```java { .api }

373

// Configure table environment for stream processing

374

Configuration config = tEnv.getConfig().getConfiguration();

375

376

// Enable mini-batch optimization for better throughput

377

config.setString("table.exec.mini-batch.enabled", "true");

378

config.setString("table.exec.mini-batch.allow-latency", "1s");

379

config.setString("table.exec.mini-batch.size", "1000");

380

381

// Configure state backend for table operations

382

config.setString("table.exec.state.backend", "rocksdb");

383

config.setString("table.exec.state.checkpoint-interval", "10s");

384

385

// Optimize for low latency vs high throughput

386

config.setString("table.exec.streaming.prefer-append-only", "true");

387

config.setString("table.exec.emit.early-fire.enabled", "true");

388

config.setString("table.exec.emit.early-fire.delay", "1000ms");

389

```

390

391

### Error Handling and Monitoring

392

393

Best practices for handling errors and monitoring DataStream-Table integration.

394

395

```java { .api }

396

// Error handling in conversions

397

try {

398

Table result = tEnv.fromDataStream(dataStream);

399

DataStream<Row> output = tEnv.toDataStream(result);

400

} catch (ValidationException e) {

401

// Handle schema validation errors

402

log.error("Schema validation failed: " + e.getMessage());

403

} catch (TableException e) {

404

// Handle table operation errors

405

log.error("Table operation failed: " + e.getMessage());

406

}

407

408

// Add monitoring to DataStream operations

409

DataStream<Order> monitoredStream = orderStream

410

.map(new MapFunction<Order, Order>() {

411

private transient Counter recordCounter;

412

413

@Override

414

public void open(Configuration parameters) {

415

recordCounter = getRuntimeContext()

416

.getMetricGroup()

417

.counter("records_processed");

418

}

419

420

@Override

421

public Order map(Order order) {

422

recordCounter.inc();

423

return order;

424

}

425

});

426

```