Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
—
Advanced stream processing with support for changelog semantics including inserts, updates, and deletes. This enables handling of updating tables and complex event-driven scenarios with full CRUD operation support.
Convert changelog DataStreams containing Row objects with RowKind flags into Tables.
/**
* Converts changelog stream to Table with automatic schema derivation
* Supports all RowKind changes (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)
* @param dataStream Changelog stream of Row objects with RowKind flags
* @return Table supporting changelog operations
*/
Table fromChangelogStream(DataStream<Row> dataStream);
/**
* Converts changelog stream to Table with custom schema
* @param dataStream Changelog stream of Row objects with RowKind flags
* @param schema Custom schema for the resulting table
* @return Table with specified schema supporting changelog operations
*/
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
/**
* Converts changelog stream to Table with specific changelog mode
* @param dataStream Changelog stream of Row objects with RowKind flags
* @param schema Custom schema for the resulting table
* @param changelogMode Expected kinds of changes in the changelog
* @return Table with specified schema and changelog mode
*/
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);Usage Examples:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
// Create changelog stream with various row kinds
DataStream<Row> changelogStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 25),
Row.ofKind(RowKind.INSERT, "Bob", 30),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26),
Row.ofKind(RowKind.DELETE, "Bob", 30)
);
// Convert with automatic schema
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
// Convert with custom schema
Schema schema = Schema.newBuilder()
.column("name", "STRING")
.column("age", "INT")
.primaryKey("name")
.build();
Table customChangelogTable = tableEnv.fromChangelogStream(changelogStream, schema);
// Convert with specific changelog mode (upsert mode - no UPDATE_BEFORE)
ChangelogMode upsertMode = ChangelogMode.upsert();
Table upsertTable = tableEnv.fromChangelogStream(changelogStream, schema, upsertMode);Convert Tables to changelog DataStreams with RowKind flags for downstream processing.
/**
* Converts Table to changelog stream with all supported row kinds
* @param table The Table to convert (can be updating or insert-only)
* @return Changelog stream of Row objects with RowKind flags
*/
DataStream<Row> toChangelogStream(Table table);
/**
* Converts Table to changelog stream with custom target schema
* @param table The Table to convert (can be updating or insert-only)
* @param targetSchema Schema for the output stream
* @return Changelog stream with specified schema
*/
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
/**
* Converts Table to changelog stream with specific changelog mode
* @param table The Table to convert (can be updating or insert-only)
* @param targetSchema Schema for the output stream
* @param changelogMode Required kinds of changes in result changelog
* @return Changelog stream with specified mode
* @throws TableException if table cannot be represented in the specified mode
*/
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);Usage Examples:
// Create an updating table
Table updatingTable = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as order_count FROM orders GROUP BY user_id"
);
// Convert to changelog stream (supports all row kinds)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);
// Convert with custom schema
Schema outputSchema = Schema.newBuilder()
.column("user_id", "STRING")
.column("order_count", "BIGINT")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.build();
DataStream<Row> customChangelogStream = tableEnv.toChangelogStream(updatingTable, outputSchema);
// Convert to upsert stream (no UPDATE_BEFORE)
ChangelogMode upsertMode = ChangelogMode.upsert();
DataStream<Row> upsertStream = tableEnv.toChangelogStream(updatingTable, outputSchema, upsertMode);
// Process changelog stream
changelogStream.process(new ProcessFunction<Row, String>() {
@Override
public void processElement(Row row, Context ctx, Collector<String> out) {
RowKind kind = row.getKind();
switch (kind) {
case INSERT:
out.collect("New record: " + row);
break;
case UPDATE_AFTER:
out.collect("Updated record: " + row);
break;
case DELETE:
out.collect("Deleted record: " + row);
break;
// Handle other row kinds...
}
}
});Advanced schema configuration including metadata columns and watermark propagation.
// Schema with metadata column for timestamp propagation
Schema timestampSchema = Schema.newBuilder()
.column("id", "STRING")
.column("name", "STRING")
.column("age", "INT")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build();
// Schema with computed columns
Schema computedSchema = Schema.newBuilder()
.column("user_id", "STRING")
.column("score", "INT")
.columnByExpression("score_category",
"CASE WHEN score >= 90 THEN 'A' " +
"WHEN score >= 80 THEN 'B' " +
"ELSE 'C' END")
.build();
// Schema with primary key for upsert semantics
Schema upsertSchema = Schema.newBuilder()
.column("product_id", "STRING")
.column("product_name", "STRING")
.column("price", "DECIMAL(10, 2)")
.primaryKey("product_id")
.build();Supports all row kinds for complete CRUD operations.
// Default changelog mode - supports all row kinds
ChangelogMode standardMode = ChangelogMode.all();
// Manually specify supported row kinds
ChangelogMode customMode = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();Optimized mode without UPDATE_BEFORE for key-based updates.
// Upsert mode - no UPDATE_BEFORE, only INSERT, UPDATE_AFTER, DELETE
ChangelogMode upsertMode = ChangelogMode.upsert();For append-only streams without updates or deletes.
// Insert-only mode - only INSERT row kind
ChangelogMode insertOnlyMode = ChangelogMode.insertOnly();Understanding and processing different types of changelog events.
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
// Create rows with specific kinds
Row insertRow = Row.ofKind(RowKind.INSERT, "Alice", 25);
Row updateBeforeRow = Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25);
Row updateAfterRow = Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26);
Row deleteRow = Row.ofKind(RowKind.DELETE, "Bob", 30);
// Check row kind
RowKind kind = row.getKind();
if (kind == RowKind.INSERT) {
// Handle insert
} else if (kind == RowKind.UPDATE_AFTER) {
// Handle update
} else if (kind == RowKind.DELETE) {
// Handle delete
}Common patterns for processing changelog data.
// Process changelog with stateful operations
changelogStream
.keyBy(row -> row.getField(0)) // Key by first field
.process(new KeyedProcessFunction<String, Row, String>() {
private ValueState<String> currentValue;
@Override
public void open(Configuration parameters) {
currentValue = getRuntimeContext().getState(
new ValueStateDescriptor<>("current", String.class));
}
@Override
public void processElement(Row row, Context ctx, Collector<String> out)
throws Exception {
RowKind kind = row.getKind();
String key = (String) row.getField(0);
switch (kind) {
case INSERT:
case UPDATE_AFTER:
currentValue.update(row.toString());
out.collect("Current state for " + key + ": " + row);
break;
case DELETE:
currentValue.clear();
out.collect("Deleted state for " + key);
break;
}
}
});import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.table.api.Schema;// Row creation with RowKind
Row insertRow = Row.ofKind(RowKind.INSERT, field1, field2, field3);
Row updateRow = Row.ofKind(RowKind.UPDATE_AFTER, field1, field2, field3);
Row deleteRow = Row.ofKind(RowKind.DELETE, field1, field2, field3);
// Row kind manipulation
Row row = Row.of(field1, field2);
row.setKind(RowKind.UPDATE_AFTER);
RowKind kind = row.getKind();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge