or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

changelog-processing.mddocs/

0

# Changelog Processing

1

2

Advanced stream processing with support for changelog semantics including inserts, updates, and deletes. This enables handling of updating tables and complex event-driven scenarios with full CRUD operation support.

3

4

## Capabilities

5

6

### Changelog Stream to Table Conversion

7

8

Convert changelog DataStreams containing Row objects with RowKind flags into Tables.

9

10

```java { .api }

11

/**

12

* Converts changelog stream to Table with automatic schema derivation

13

* Supports all RowKind changes (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)

14

* @param dataStream Changelog stream of Row objects with RowKind flags

15

* @return Table supporting changelog operations

16

*/

17

Table fromChangelogStream(DataStream<Row> dataStream);

18

19

/**

20

* Converts changelog stream to Table with custom schema

21

* @param dataStream Changelog stream of Row objects with RowKind flags

22

* @param schema Custom schema for the resulting table

23

* @return Table with specified schema supporting changelog operations

24

*/

25

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

26

27

/**

28

* Converts changelog stream to Table with specific changelog mode

29

* @param dataStream Changelog stream of Row objects with RowKind flags

30

* @param schema Custom schema for the resulting table

31

* @param changelogMode Expected kinds of changes in the changelog

32

* @return Table with specified schema and changelog mode

33

*/

34

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

35

```

36

37

**Usage Examples:**

38

39

```java

40

import org.apache.flink.streaming.api.datastream.DataStream;

41

import org.apache.flink.table.api.Table;

42

import org.apache.flink.table.api.Schema;

43

import org.apache.flink.table.connector.ChangelogMode;

44

import org.apache.flink.types.Row;

45

import org.apache.flink.types.RowKind;

46

47

// Create changelog stream with various row kinds

48

DataStream<Row> changelogStream = env.fromElements(

49

Row.ofKind(RowKind.INSERT, "Alice", 25),

50

Row.ofKind(RowKind.INSERT, "Bob", 30),

51

Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25),

52

Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26),

53

Row.ofKind(RowKind.DELETE, "Bob", 30)

54

);

55

56

// Convert with automatic schema

57

Table changelogTable = tableEnv.fromChangelogStream(changelogStream);

58

59

// Convert with custom schema

60

Schema schema = Schema.newBuilder()

61

.column("name", "STRING")

62

.column("age", "INT")

63

.primaryKey("name")

64

.build();

65

Table customChangelogTable = tableEnv.fromChangelogStream(changelogStream, schema);

66

67

// Convert with specific changelog mode (upsert mode - no UPDATE_BEFORE)

68

ChangelogMode upsertMode = ChangelogMode.upsert();

69

Table upsertTable = tableEnv.fromChangelogStream(changelogStream, schema, upsertMode);

70

```

71

72

### Table to Changelog Stream Conversion

73

74

Convert Tables to changelog DataStreams with RowKind flags for downstream processing.

75

76

```java { .api }

77

/**

78

* Converts Table to changelog stream with all supported row kinds

79

* @param table The Table to convert (can be updating or insert-only)

80

* @return Changelog stream of Row objects with RowKind flags

81

*/

82

DataStream<Row> toChangelogStream(Table table);

83

84

/**

85

* Converts Table to changelog stream with custom target schema

86

* @param table The Table to convert (can be updating or insert-only)

87

* @param targetSchema Schema for the output stream

88

* @return Changelog stream with specified schema

89

*/

90

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

91

92

/**

93

* Converts Table to changelog stream with specific changelog mode

94

* @param table The Table to convert (can be updating or insert-only)

95

* @param targetSchema Schema for the output stream

96

* @param changelogMode Required kinds of changes in result changelog

97

* @return Changelog stream with specified mode

98

* @throws TableException if table cannot be represented in the specified mode

99

*/

100

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

101

```

102

103

**Usage Examples:**

104

105

```java

106

// Create an updating table

107

Table updatingTable = tableEnv.sqlQuery(

108

"SELECT user_id, COUNT(*) as order_count FROM orders GROUP BY user_id"

109

);

110

111

// Convert to changelog stream (supports all row kinds)

112

DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);

113

114

// Convert with custom schema

115

Schema outputSchema = Schema.newBuilder()

116

.column("user_id", "STRING")

117

.column("order_count", "BIGINT")

118

.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")

119

.build();

120

DataStream<Row> customChangelogStream = tableEnv.toChangelogStream(updatingTable, outputSchema);

121

122

// Convert to upsert stream (no UPDATE_BEFORE)

123

ChangelogMode upsertMode = ChangelogMode.upsert();

124

DataStream<Row> upsertStream = tableEnv.toChangelogStream(updatingTable, outputSchema, upsertMode);

125

126

// Process changelog stream

127

changelogStream.process(new ProcessFunction<Row, String>() {

128

@Override

129

public void processElement(Row row, Context ctx, Collector<String> out) {

130

RowKind kind = row.getKind();

131

switch (kind) {

132

case INSERT:

133

out.collect("New record: " + row);

134

break;

135

case UPDATE_AFTER:

136

out.collect("Updated record: " + row);

137

break;

138

case DELETE:

139

out.collect("Deleted record: " + row);

140

break;

141

// Handle other row kinds...

142

}

143

}

144

});

145

```

146

147

### Schema Configuration for Changelog Streams

148

149

Advanced schema configuration including metadata columns and watermark propagation.

150

151

```java { .api }

152

// Schema with metadata column for timestamp propagation

153

Schema timestampSchema = Schema.newBuilder()

154

.column("id", "STRING")

155

.column("name", "STRING")

156

.column("age", "INT")

157

.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")

158

.watermark("rowtime", "SOURCE_WATERMARK()")

159

.build();

160

161

// Schema with computed columns

162

Schema computedSchema = Schema.newBuilder()

163

.column("user_id", "STRING")

164

.column("score", "INT")

165

.columnByExpression("score_category",

166

"CASE WHEN score >= 90 THEN 'A' " +

167

"WHEN score >= 80 THEN 'B' " +

168

"ELSE 'C' END")

169

.build();

170

171

// Schema with primary key for upsert semantics

172

Schema upsertSchema = Schema.newBuilder()

173

.column("product_id", "STRING")

174

.column("product_name", "STRING")

175

.column("price", "DECIMAL(10, 2)")

176

.primaryKey("product_id")

177

.build();

178

```

179

180

## Changelog Modes

181

182

### Standard Changelog Mode

183

184

Supports all row kinds for complete CRUD operations.

185

186

```java { .api }

187

// Default changelog mode - supports all row kinds

188

ChangelogMode standardMode = ChangelogMode.all();

189

190

// Manually specify supported row kinds

191

ChangelogMode customMode = ChangelogMode.newBuilder()

192

.addContainedKind(RowKind.INSERT)

193

.addContainedKind(RowKind.UPDATE_AFTER)

194

.addContainedKind(RowKind.DELETE)

195

.build();

196

```

197

198

### Upsert Mode

199

200

Optimized mode without UPDATE_BEFORE for key-based updates.

201

202

```java { .api }

203

// Upsert mode - no UPDATE_BEFORE, only INSERT, UPDATE_AFTER, DELETE

204

ChangelogMode upsertMode = ChangelogMode.upsert();

205

```

206

207

### Insert-Only Mode

208

209

For append-only streams without updates or deletes.

210

211

```java { .api }

212

// Insert-only mode - only INSERT row kind

213

ChangelogMode insertOnlyMode = ChangelogMode.insertOnly();

214

```

215

216

## Row Kind Processing

217

218

### Working with RowKind

219

220

Understanding and processing different types of changelog events.

221

222

```java { .api }

223

import org.apache.flink.types.Row;

224

import org.apache.flink.types.RowKind;

225

226

// Create rows with specific kinds

227

Row insertRow = Row.ofKind(RowKind.INSERT, "Alice", 25);

228

Row updateBeforeRow = Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25);

229

Row updateAfterRow = Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26);

230

Row deleteRow = Row.ofKind(RowKind.DELETE, "Bob", 30);

231

232

// Check row kind

233

RowKind kind = row.getKind();

234

if (kind == RowKind.INSERT) {

235

// Handle insert

236

} else if (kind == RowKind.UPDATE_AFTER) {

237

// Handle update

238

} else if (kind == RowKind.DELETE) {

239

// Handle delete

240

}

241

```

242

243

### Processing Changelog Streams

244

245

Common patterns for processing changelog data.

246

247

```java

248

// Process changelog with stateful operations

249

changelogStream

250

.keyBy(row -> row.getField(0)) // Key by first field

251

.process(new KeyedProcessFunction<String, Row, String>() {

252

private ValueState<String> currentValue;

253

254

@Override

255

public void open(Configuration parameters) {

256

currentValue = getRuntimeContext().getState(

257

new ValueStateDescriptor<>("current", String.class));

258

}

259

260

@Override

261

public void processElement(Row row, Context ctx, Collector<String> out)

262

throws Exception {

263

RowKind kind = row.getKind();

264

String key = (String) row.getField(0);

265

266

switch (kind) {

267

case INSERT:

268

case UPDATE_AFTER:

269

currentValue.update(row.toString());

270

out.collect("Current state for " + key + ": " + row);

271

break;

272

case DELETE:

273

currentValue.clear();

274

out.collect("Deleted state for " + key);

275

break;

276

}

277

}

278

});

279

```

280

281

## Types

282

283

### Changelog Processing Types

284

285

```java { .api }

286

import org.apache.flink.table.connector.ChangelogMode;

287

import org.apache.flink.types.Row;

288

import org.apache.flink.types.RowKind;

289

import org.apache.flink.table.api.Schema;

290

```

291

292

### Row Creation Utilities

293

294

```java { .api }

295

// Row creation with RowKind

296

Row insertRow = Row.ofKind(RowKind.INSERT, field1, field2, field3);

297

Row updateRow = Row.ofKind(RowKind.UPDATE_AFTER, field1, field2, field3);

298

Row deleteRow = Row.ofKind(RowKind.DELETE, field1, field2, field3);

299

300

// Row kind manipulation

301

Row row = Row.of(field1, field2);

302

row.setKind(RowKind.UPDATE_AFTER);

303

RowKind kind = row.getKind();

304

```