CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

changelog-processing.mddocs/

Changelog Processing

Advanced stream processing with support for changelog semantics including inserts, updates, and deletes. This enables handling of updating tables and complex event-driven scenarios with full CRUD operation support.

Capabilities

Changelog Stream to Table Conversion

Convert changelog DataStreams containing Row objects with RowKind flags into Tables.

/**
 * Converts changelog stream to Table with automatic schema derivation
 * Supports all RowKind changes (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)
 * @param dataStream Changelog stream of Row objects with RowKind flags
 * @return Table supporting changelog operations
 */
Table fromChangelogStream(DataStream<Row> dataStream);

/**
 * Converts changelog stream to Table with custom schema
 * @param dataStream Changelog stream of Row objects with RowKind flags
 * @param schema Custom schema for the resulting table
 * @return Table with specified schema supporting changelog operations
 */
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

/**
 * Converts changelog stream to Table with specific changelog mode
 * @param dataStream Changelog stream of Row objects with RowKind flags
 * @param schema Custom schema for the resulting table
 * @param changelogMode Expected kinds of changes in the changelog
 * @return Table with specified schema and changelog mode
 */
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

Usage Examples:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

// Create changelog stream with various row kinds
DataStream<Row> changelogStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, "Alice", 25),
    Row.ofKind(RowKind.INSERT, "Bob", 30),
    Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25),
    Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26),
    Row.ofKind(RowKind.DELETE, "Bob", 30)
);

// Convert with automatic schema
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);

// Convert with custom schema
Schema schema = Schema.newBuilder()
    .column("name", "STRING")
    .column("age", "INT")
    .primaryKey("name")
    .build();
Table customChangelogTable = tableEnv.fromChangelogStream(changelogStream, schema);

// Convert with specific changelog mode (upsert mode - no UPDATE_BEFORE)
ChangelogMode upsertMode = ChangelogMode.upsert();
Table upsertTable = tableEnv.fromChangelogStream(changelogStream, schema, upsertMode);

Table to Changelog Stream Conversion

Convert Tables to changelog DataStreams with RowKind flags for downstream processing.

/**
 * Converts Table to changelog stream with all supported row kinds
 * @param table The Table to convert (can be updating or insert-only)
 * @return Changelog stream of Row objects with RowKind flags
 */
DataStream<Row> toChangelogStream(Table table);

/**
 * Converts Table to changelog stream with custom target schema
 * @param table The Table to convert (can be updating or insert-only)
 * @param targetSchema Schema for the output stream
 * @return Changelog stream with specified schema
 */
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

/**
 * Converts Table to changelog stream with specific changelog mode
 * @param table The Table to convert (can be updating or insert-only)
 * @param targetSchema Schema for the output stream
 * @param changelogMode Required kinds of changes in result changelog
 * @return Changelog stream with specified mode
 * @throws TableException if table cannot be represented in the specified mode
 */
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

Usage Examples:

// Create an updating table
Table updatingTable = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as order_count FROM orders GROUP BY user_id"
);

// Convert to changelog stream (supports all row kinds)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);

// Convert with custom schema  
Schema outputSchema = Schema.newBuilder()
    .column("user_id", "STRING")
    .column("order_count", "BIGINT")
    .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
    .build();
DataStream<Row> customChangelogStream = tableEnv.toChangelogStream(updatingTable, outputSchema);

// Convert to upsert stream (no UPDATE_BEFORE)
ChangelogMode upsertMode = ChangelogMode.upsert();
DataStream<Row> upsertStream = tableEnv.toChangelogStream(updatingTable, outputSchema, upsertMode);

// Process changelog stream
changelogStream.process(new ProcessFunction<Row, String>() {
    @Override
    public void processElement(Row row, Context ctx, Collector<String> out) {
        RowKind kind = row.getKind();
        switch (kind) {
            case INSERT:
                out.collect("New record: " + row);
                break;
            case UPDATE_AFTER:
                out.collect("Updated record: " + row);
                break;
            case DELETE:
                out.collect("Deleted record: " + row);
                break;
            // Handle other row kinds...
        }
    }
});

Schema Configuration for Changelog Streams

Advanced schema configuration including metadata columns and watermark propagation.

// Schema with metadata column for timestamp propagation
Schema timestampSchema = Schema.newBuilder()
    .column("id", "STRING")
    .column("name", "STRING")
    .column("age", "INT")
    .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
    .watermark("rowtime", "SOURCE_WATERMARK()")
    .build();

// Schema with computed columns
Schema computedSchema = Schema.newBuilder()
    .column("user_id", "STRING") 
    .column("score", "INT")
    .columnByExpression("score_category", 
        "CASE WHEN score >= 90 THEN 'A' " +
        "WHEN score >= 80 THEN 'B' " +
        "ELSE 'C' END")
    .build();

// Schema with primary key for upsert semantics
Schema upsertSchema = Schema.newBuilder()
    .column("product_id", "STRING")
    .column("product_name", "STRING")
    .column("price", "DECIMAL(10, 2)")
    .primaryKey("product_id")
    .build();

Changelog Modes

Standard Changelog Mode

Supports all row kinds for complete CRUD operations.

// Default changelog mode - supports all row kinds
ChangelogMode standardMode = ChangelogMode.all();

// Manually specify supported row kinds
ChangelogMode customMode = ChangelogMode.newBuilder()
    .addContainedKind(RowKind.INSERT)
    .addContainedKind(RowKind.UPDATE_AFTER)
    .addContainedKind(RowKind.DELETE)
    .build();

Upsert Mode

Optimized mode without UPDATE_BEFORE for key-based updates.

// Upsert mode - no UPDATE_BEFORE, only INSERT, UPDATE_AFTER, DELETE
ChangelogMode upsertMode = ChangelogMode.upsert();

Insert-Only Mode

For append-only streams without updates or deletes.

// Insert-only mode - only INSERT row kind
ChangelogMode insertOnlyMode = ChangelogMode.insertOnly();

Row Kind Processing

Working with RowKind

Understanding and processing different types of changelog events.

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

// Create rows with specific kinds
Row insertRow = Row.ofKind(RowKind.INSERT, "Alice", 25);
Row updateBeforeRow = Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25);  
Row updateAfterRow = Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26);
Row deleteRow = Row.ofKind(RowKind.DELETE, "Bob", 30);

// Check row kind
RowKind kind = row.getKind();
if (kind == RowKind.INSERT) {
    // Handle insert
} else if (kind == RowKind.UPDATE_AFTER) {
    // Handle update
} else if (kind == RowKind.DELETE) {
    // Handle delete
}

Processing Changelog Streams

Common patterns for processing changelog data.

// Process changelog with stateful operations
changelogStream
    .keyBy(row -> row.getField(0)) // Key by first field
    .process(new KeyedProcessFunction<String, Row, String>() {
        private ValueState<String> currentValue;
        
        @Override
        public void open(Configuration parameters) {
            currentValue = getRuntimeContext().getState(
                new ValueStateDescriptor<>("current", String.class));
        }
        
        @Override
        public void processElement(Row row, Context ctx, Collector<String> out) 
                throws Exception {
            RowKind kind = row.getKind();
            String key = (String) row.getField(0);
            
            switch (kind) {
                case INSERT:
                case UPDATE_AFTER:
                    currentValue.update(row.toString());
                    out.collect("Current state for " + key + ": " + row);
                    break;
                case DELETE:
                    currentValue.clear();
                    out.collect("Deleted state for " + key);
                    break;
            }
        }
    });

Types

Changelog Processing Types

import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.table.api.Schema;

Row Creation Utilities

// Row creation with RowKind
Row insertRow = Row.ofKind(RowKind.INSERT, field1, field2, field3);
Row updateRow = Row.ofKind(RowKind.UPDATE_AFTER, field1, field2, field3);
Row deleteRow = Row.ofKind(RowKind.DELETE, field1, field2, field3);

// Row kind manipulation
Row row = Row.of(field1, field2);
row.setKind(RowKind.UPDATE_AFTER);
RowKind kind = row.getKind();

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json