or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

datastream-conversions.mddocs/

0

# DataStream Conversions

1

2

The Flink Table API Java Bridge provides seamless conversion between DataStream and Table representations, enabling developers to combine the expressiveness of SQL/Table API with the flexibility of DataStream API.

3

4

## Conversion Overview

5

6

The bridge supports bidirectional conversions:

7

8

- **DataStream → Table**: Convert streaming data to table format for SQL operations

9

- **Table → DataStream**: Convert table results back to streaming format for further processing

10

11

## DataStream to Table Conversions

12

13

### Basic Conversion

14

15

Convert a `DataStream` to a `Table` using automatic schema inference:

16

17

```java { .api }

18

<T> Table fromDataStream(DataStream<T> dataStream);

19

```

20

21

**Usage Example:**

22

23

```java

24

// Create DataStream from POJO objects

25

DataStream<Person> personStream = env.fromElements(

26

new Person("Alice", 25, "Engineer"),

27

new Person("Bob", 30, "Manager"),

28

new Person("Charlie", 28, "Developer")

29

);

30

31

// Convert to Table (schema inferred from POJO structure)

32

Table personTable = tableEnv.fromDataStream(personStream);

33

34

// Now you can use SQL on the table

35

Table result = tableEnv.sqlQuery("SELECT name, age FROM " + personTable + " WHERE age > 26");

36

```

37

38

### Conversion with Custom Schema

39

40

Convert a `DataStream` to a `Table` with explicit schema definition:

41

42

```java { .api }

43

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

44

```

45

46

**Usage Example:**

47

48

```java

49

DataStream<Row> rowStream = env.fromElements(

50

Row.of("Alice", 25, "2023-01-15"),

51

Row.of("Bob", 30, "2023-02-20"),

52

Row.of("Charlie", 28, "2023-03-10")

53

);

54

55

// Define custom schema with proper data types

56

Schema schema = Schema.newBuilder()

57

.column("name", DataTypes.STRING())

58

.column("age", DataTypes.INT())

59

.column("hire_date", DataTypes.STRING())

60

.build();

61

62

Table employeeTable = tableEnv.fromDataStream(rowStream, schema);

63

```

64

65

### Changelog Stream Conversion

66

67

Convert changelog streams (insert/update/delete operations) to tables:

68

69

```java { .api }

70

Table fromChangelogStream(DataStream<Row> dataStream);

71

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

72

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

73

```

74

75

**Usage Example:**

76

77

```java

78

// Changelog stream with Row kind information

79

DataStream<Row> changelogStream = env.addSource(new ChangelogSourceFunction());

80

81

// Basic changelog conversion

82

Table changelogTable = tableEnv.fromChangelogStream(changelogStream);

83

84

// With custom schema

85

Schema schema = Schema.newBuilder()

86

.column("id", DataTypes.BIGINT())

87

.column("name", DataTypes.STRING())

88

.column("value", DataTypes.DOUBLE())

89

.build();

90

91

Table schemaTable = tableEnv.fromChangelogStream(changelogStream, schema);

92

93

// With specific changelog mode

94

ChangelogMode insertUpdateMode = ChangelogMode.insertUpdate();

95

Table modeTable = tableEnv.fromChangelogStream(changelogStream, schema, insertUpdateMode);

96

```

97

98

## Table to DataStream Conversions

99

100

### Basic Conversion to Row

101

102

Convert a `Table` to a `DataStream<Row>`:

103

104

```java { .api }

105

DataStream<Row> toDataStream(Table table);

106

```

107

108

**Usage Example:**

109

110

```java

111

// Create table from SQL query

112

Table resultTable = tableEnv.sqlQuery(

113

"SELECT name, age, salary FROM employees WHERE department = 'Engineering'"

114

);

115

116

// Convert to DataStream

117

DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

118

119

// Process the stream further

120

resultStream.map(row -> "Employee: " + row.getField(0) + ", Age: " + row.getField(1))

121

.print();

122

```

123

124

### Conversion to Specific Type

125

126

Convert a `Table` to a `DataStream` of a specific Java class:

127

128

```java { .api }

129

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

130

```

131

132

**Usage Example:**

133

134

```java

135

// Define result POJO

136

public static class EmployeeSummary {

137

public String name;

138

public Integer age;

139

public Double salary;

140

141

// Constructors, getters, setters

142

}

143

144

Table summaryTable = tableEnv.sqlQuery("SELECT name, age, salary FROM employees");

145

146

// Convert directly to POJO

147

DataStream<EmployeeSummary> summaryStream = tableEnv.toDataStream(summaryTable, EmployeeSummary.class);

148

149

summaryStream.filter(emp -> emp.salary > 50000.0)

150

.map(emp -> emp.name + " earns " + emp.salary)

151

.print();

152

```

153

154

### Conversion with Data Type

155

156

Convert a `Table` to a `DataStream` using explicit data type specification:

157

158

```java { .api }

159

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

160

```

161

162

**Usage Example:**

163

164

```java

165

Table aggregatedTable = tableEnv.sqlQuery("SELECT department, COUNT(*) as emp_count FROM employees GROUP BY department");

166

167

// Define target data type

168

AbstractDataType<?> rowType = DataTypes.ROW(

169

DataTypes.FIELD("department", DataTypes.STRING()),

170

DataTypes.FIELD("emp_count", DataTypes.BIGINT())

171

);

172

173

DataStream<Row> typedStream = tableEnv.toDataStream(aggregatedTable, rowType);

174

```

175

176

### Changelog Stream Conversion

177

178

Convert tables to changelog streams for capturing insert/update/delete operations:

179

180

```java { .api }

181

DataStream<Row> toChangelogStream(Table table);

182

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

183

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

184

```

185

186

**Usage Example:**

187

188

```java

189

// Create a table with updates (e.g., from a GROUP BY query)

190

Table dynamicTable = tableEnv.sqlQuery(

191

"SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id"

192

);

193

194

// Convert to changelog stream

195

DataStream<Row> changelogStream = tableEnv.toChangelogStream(dynamicTable);

196

197

// Process changelog events

198

changelogStream.process(new ProcessFunction<Row, String>() {

199

@Override

200

public void processElement(Row row, Context ctx, Collector<String> out) {

201

RowKind kind = row.getKind();

202

switch (kind) {

203

case INSERT:

204

out.collect("New user: " + row.getField(0) + " with " + row.getField(1) + " events");

205

break;

206

case UPDATE_AFTER:

207

out.collect("Updated user: " + row.getField(0) + " now has " + row.getField(1) + " events");

208

break;

209

case DELETE:

210

out.collect("Removed user: " + row.getField(0));

211

break;

212

}

213

}

214

}).print();

215

216

// With target schema

217

Schema targetSchema = Schema.newBuilder()

218

.column("user_id", DataTypes.STRING())

219

.column("event_count", DataTypes.BIGINT())

220

.build();

221

222

DataStream<Row> schemaChangelogStream = tableEnv.toChangelogStream(dynamicTable, targetSchema);

223

224

// With specific changelog mode

225

ChangelogMode upsertMode = ChangelogMode.upsert();

226

DataStream<Row> upsertStream = tableEnv.toChangelogStream(dynamicTable, targetSchema, upsertMode);

227

```

228

229

## Schema Definition

230

231

When working with custom schemas, use the `Schema` builder:

232

233

```java { .api }

234

public class Schema {

235

public static Builder newBuilder();

236

237

public static class Builder {

238

public Builder column(String name, AbstractDataType<?> dataType);

239

public Builder columnByExpression(String name, String expression);

240

public Builder columnByMetadata(String name, AbstractDataType<?> dataType);

241

public Builder primaryKey(String... columnNames);

242

public Builder watermark(String columnName, String watermarkExpression);

243

public Schema build();

244

}

245

}

246

```

247

248

**Usage Example:**

249

250

```java

251

Schema complexSchema = Schema.newBuilder()

252

.column("id", DataTypes.BIGINT())

253

.column("name", DataTypes.STRING())

254

.column("timestamp_col", DataTypes.TIMESTAMP(3))

255

.column("event_time", DataTypes.TIMESTAMP_LTZ(3))

256

.columnByExpression("proc_time", "PROCTIME()")

257

.watermark("event_time", "event_time - INTERVAL '5' SECOND")

258

.primaryKey("id")

259

.build();

260

```

261

262

## Changelog Modes

263

264

The bridge supports different changelog modes for streaming tables:

265

266

```java { .api }

267

public enum ChangelogMode {

268

// Available modes

269

public static ChangelogMode insertOnly();

270

public static ChangelogMode insertUpdate();

271

public static ChangelogMode insertUpdateDelete();

272

public static ChangelogMode upsert();

273

public static ChangelogMode all();

274

}

275

```

276

277

**Mode Descriptions:**

278

279

- **insertOnly()**: Only INSERT operations (append-only stream)

280

- **insertUpdate()**: INSERT and UPDATE_AFTER operations

281

- **insertUpdateDelete()**: INSERT, UPDATE_AFTER, and DELETE operations

282

- **upsert()**: INSERT, UPDATE_AFTER, and DELETE operations with primary key

283

- **all()**: All possible row kinds including UPDATE_BEFORE

284

285

## Common Patterns

286

287

### Stream Processing with SQL

288

289

Combine DataStream processing with SQL queries:

290

291

```java

292

// Start with DataStream

293

DataStream<Event> eventStream = env.addSource(new EventSource());

294

295

// Convert to Table for SQL processing

296

Table eventTable = tableEnv.fromDataStream(eventStream);

297

tableEnv.createTemporaryView("events", eventTable);

298

299

// Apply SQL transformations

300

Table filteredEvents = tableEnv.sqlQuery(

301

"SELECT user_id, event_type, event_time " +

302

"FROM events " +

303

"WHERE event_type IN ('click', 'purchase') " +

304

"AND event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"

305

);

306

307

// Convert back to DataStream for further processing

308

DataStream<Row> processedStream = tableEnv.toDataStream(filteredEvents);

309

310

// Continue with DataStream operations

311

processedStream.keyBy(row -> row.getField(0))

312

.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))

313

.apply(new MyWindowFunction())

314

.addSink(new MySink());

315

```

316

317

### Aggregation with State

318

319

Use changelog streams for stateful aggregations:

320

321

```java

322

Table aggregationTable = tableEnv.sqlQuery(

323

"SELECT user_id, COUNT(*) as event_count, MAX(event_time) as last_event " +

324

"FROM events " +

325

"GROUP BY user_id"

326

);

327

328

DataStream<Row> aggregationChangelog = tableEnv.toChangelogStream(aggregationTable);

329

330

// Handle aggregation changes

331

aggregationChangelog.process(new KeyedProcessFunction<String, Row, Alert>() {

332

private ValueState<Long> lastCount;

333

334

@Override

335

public void processElement(Row row, Context ctx, Collector<Alert> out) {

336

String userId = row.getFieldAs(0);

337

Long newCount = row.getFieldAs(1);

338

Long previousCount = lastCount.value();

339

340

if (previousCount != null && newCount > previousCount * 2) {

341

out.collect(new Alert(userId, "Activity spike detected"));

342

}

343

344

lastCount.update(newCount);

345

}

346

});

347

```

348

349

## Error Handling

350

351

Handle common conversion errors:

352

353

```java

354

try {

355

// Schema mismatch

356

Schema schema = Schema.newBuilder()

357

.column("name", DataTypes.STRING())

358

.column("age", DataTypes.INT())

359

.build();

360

361

Table table = tableEnv.fromDataStream(invalidDataStream, schema);

362

363

} catch (ValidationException e) {

364

// Handle schema validation errors

365

logger.error("Schema validation failed: {}", e.getMessage());

366

367

} catch (TableException e) {

368

// Handle table conversion errors

369

logger.error("Table conversion failed: {}", e.getMessage());

370

371

} catch (IllegalArgumentException e) {

372

// Handle invalid arguments

373

logger.error("Invalid argument: {}", e.getMessage());

374

}

375

```

376

377

## Performance Considerations

378

379

1. **Schema Inference**: Explicit schema definition is more efficient than automatic inference

380

2. **Type Conversion**: Direct POJO conversion avoids Row overhead

381

3. **Changelog Overhead**: Changelog streams have additional metadata overhead

382

4. **Watermark Propagation**: Ensure proper watermark handling in conversions