CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.

Pending
Overview
Eval results
Files

datastream-conversions.mddocs/

DataStream Conversions

The Flink Table API Java Bridge provides seamless conversion between DataStream and Table representations, enabling developers to combine the expressiveness of SQL/Table API with the flexibility of DataStream API.

Conversion Overview

The bridge supports bidirectional conversions:

  • DataStream → Table: Convert streaming data to table format for SQL operations
  • Table → DataStream: Convert table results back to streaming format for further processing

DataStream to Table Conversions

Basic Conversion

Convert a DataStream to a Table using automatic schema inference:

<T> Table fromDataStream(DataStream<T> dataStream);

Usage Example:

// Create DataStream from POJO objects
DataStream<Person> personStream = env.fromElements(
    new Person("Alice", 25, "Engineer"),
    new Person("Bob", 30, "Manager"),
    new Person("Charlie", 28, "Developer")
);

// Convert to Table (schema inferred from POJO structure)
Table personTable = tableEnv.fromDataStream(personStream);

// Now you can use SQL on the table
Table result = tableEnv.sqlQuery("SELECT name, age FROM " + personTable + " WHERE age > 26");

Conversion with Custom Schema

Convert a DataStream to a Table with explicit schema definition:

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

Usage Example:

DataStream<Row> rowStream = env.fromElements(
    Row.of("Alice", 25, "2023-01-15"),
    Row.of("Bob", 30, "2023-02-20"),
    Row.of("Charlie", 28, "2023-03-10")
);

// Define custom schema with proper data types
Schema schema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .column("hire_date", DataTypes.STRING())
    .build();

Table employeeTable = tableEnv.fromDataStream(rowStream, schema);

Changelog Stream Conversion

Convert changelog streams (insert/update/delete operations) to tables:

Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

Usage Example:

// Changelog stream with Row kind information
DataStream<Row> changelogStream = env.addSource(new ChangelogSourceFunction());

// Basic changelog conversion
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);

// With custom schema
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("value", DataTypes.DOUBLE())
    .build();

Table schemaTable = tableEnv.fromChangelogStream(changelogStream, schema);

// With specific changelog mode
ChangelogMode insertUpdateMode = ChangelogMode.insertUpdate();
Table modeTable = tableEnv.fromChangelogStream(changelogStream, schema, insertUpdateMode);

Table to DataStream Conversions

Basic Conversion to Row

Convert a Table to a DataStream<Row>:

DataStream<Row> toDataStream(Table table);

Usage Example:

// Create table from SQL query
Table resultTable = tableEnv.sqlQuery(
    "SELECT name, age, salary FROM employees WHERE department = 'Engineering'"
);

// Convert to DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

// Process the stream further
resultStream.map(row -> "Employee: " + row.getField(0) + ", Age: " + row.getField(1))
           .print();

Conversion to Specific Type

Convert a Table to a DataStream of a specific Java class:

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

Usage Example:

// Define result POJO
public static class EmployeeSummary {
    public String name;
    public Integer age;
    public Double salary;
    
    // Constructors, getters, setters
}

Table summaryTable = tableEnv.sqlQuery("SELECT name, age, salary FROM employees");

// Convert directly to POJO
DataStream<EmployeeSummary> summaryStream = tableEnv.toDataStream(summaryTable, EmployeeSummary.class);

summaryStream.filter(emp -> emp.salary > 50000.0)
             .map(emp -> emp.name + " earns " + emp.salary)
             .print();

Conversion with Data Type

Convert a Table to a DataStream using explicit data type specification:

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

Usage Example:

Table aggregatedTable = tableEnv.sqlQuery("SELECT department, COUNT(*) as emp_count FROM employees GROUP BY department");

// Define target data type
AbstractDataType<?> rowType = DataTypes.ROW(
    DataTypes.FIELD("department", DataTypes.STRING()),
    DataTypes.FIELD("emp_count", DataTypes.BIGINT())
);

DataStream<Row> typedStream = tableEnv.toDataStream(aggregatedTable, rowType);

Changelog Stream Conversion

Convert tables to changelog streams for capturing insert/update/delete operations:

DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

Usage Example:

// Create a table with updates (e.g., from a GROUP BY query)
Table dynamicTable = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id"
);

// Convert to changelog stream
DataStream<Row> changelogStream = tableEnv.toChangelogStream(dynamicTable);

// Process changelog events
changelogStream.process(new ProcessFunction<Row, String>() {
    @Override
    public void processElement(Row row, Context ctx, Collector<String> out) {
        RowKind kind = row.getKind();
        switch (kind) {
            case INSERT:
                out.collect("New user: " + row.getField(0) + " with " + row.getField(1) + " events");
                break;
            case UPDATE_AFTER:
                out.collect("Updated user: " + row.getField(0) + " now has " + row.getField(1) + " events");
                break;
            case DELETE:
                out.collect("Removed user: " + row.getField(0));
                break;
        }
    }
}).print();

// With target schema
Schema targetSchema = Schema.newBuilder()
    .column("user_id", DataTypes.STRING())
    .column("event_count", DataTypes.BIGINT())
    .build();

DataStream<Row> schemaChangelogStream = tableEnv.toChangelogStream(dynamicTable, targetSchema);

// With specific changelog mode
ChangelogMode upsertMode = ChangelogMode.upsert();
DataStream<Row> upsertStream = tableEnv.toChangelogStream(dynamicTable, targetSchema, upsertMode);

Schema Definition

When working with custom schemas, use the Schema builder:

public class Schema {
    public static Builder newBuilder();
    
    public static class Builder {
        public Builder column(String name, AbstractDataType<?> dataType);
        public Builder columnByExpression(String name, String expression);
        public Builder columnByMetadata(String name, AbstractDataType<?> dataType);
        public Builder primaryKey(String... columnNames);
        public Builder watermark(String columnName, String watermarkExpression);
        public Schema build();
    }
}

Usage Example:

Schema complexSchema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("timestamp_col", DataTypes.TIMESTAMP(3))
    .column("event_time", DataTypes.TIMESTAMP_LTZ(3))
    .columnByExpression("proc_time", "PROCTIME()")
    .watermark("event_time", "event_time - INTERVAL '5' SECOND")
    .primaryKey("id")
    .build();

Changelog Modes

The bridge supports different changelog modes for streaming tables:

public enum ChangelogMode {
    // Available modes
    public static ChangelogMode insertOnly();
    public static ChangelogMode insertUpdate();
    public static ChangelogMode insertUpdateDelete();
    public static ChangelogMode upsert();
    public static ChangelogMode all();
}

Mode Descriptions:

  • insertOnly(): Only INSERT operations (append-only stream)
  • insertUpdate(): INSERT and UPDATE_AFTER operations
  • insertUpdateDelete(): INSERT, UPDATE_AFTER, and DELETE operations
  • upsert(): INSERT, UPDATE_AFTER, and DELETE operations with primary key
  • all(): All possible row kinds including UPDATE_BEFORE

Common Patterns

Stream Processing with SQL

Combine DataStream processing with SQL queries:

// Start with DataStream
DataStream<Event> eventStream = env.addSource(new EventSource());

// Convert to Table for SQL processing
Table eventTable = tableEnv.fromDataStream(eventStream);
tableEnv.createTemporaryView("events", eventTable);

// Apply SQL transformations
Table filteredEvents = tableEnv.sqlQuery(
    "SELECT user_id, event_type, event_time " +
    "FROM events " + 
    "WHERE event_type IN ('click', 'purchase') " +
    "AND event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"
);

// Convert back to DataStream for further processing
DataStream<Row> processedStream = tableEnv.toDataStream(filteredEvents);

// Continue with DataStream operations
processedStream.keyBy(row -> row.getField(0))
               .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
               .apply(new MyWindowFunction())
               .addSink(new MySink());

Aggregation with State

Use changelog streams for stateful aggregations:

Table aggregationTable = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as event_count, MAX(event_time) as last_event " +
    "FROM events " +
    "GROUP BY user_id"
);

DataStream<Row> aggregationChangelog = tableEnv.toChangelogStream(aggregationTable);

// Handle aggregation changes
aggregationChangelog.process(new KeyedProcessFunction<String, Row, Alert>() {
    private ValueState<Long> lastCount;
    
    @Override
    public void processElement(Row row, Context ctx, Collector<Alert> out) {
        String userId = row.getFieldAs(0);
        Long newCount = row.getFieldAs(1);
        Long previousCount = lastCount.value();
        
        if (previousCount != null && newCount > previousCount * 2) {
            out.collect(new Alert(userId, "Activity spike detected"));
        }
        
        lastCount.update(newCount);
    }
});

Error Handling

Handle common conversion errors:

try {
    // Schema mismatch
    Schema schema = Schema.newBuilder()
        .column("name", DataTypes.STRING())
        .column("age", DataTypes.INT())
        .build();
    
    Table table = tableEnv.fromDataStream(invalidDataStream, schema);
    
} catch (ValidationException e) {
    // Handle schema validation errors
    logger.error("Schema validation failed: {}", e.getMessage());
    
} catch (TableException e) {
    // Handle table conversion errors
    logger.error("Table conversion failed: {}", e.getMessage());
    
} catch (IllegalArgumentException e) {
    // Handle invalid arguments
    logger.error("Invalid argument: {}", e.getMessage());
}

Performance Considerations

  1. Schema Inference: Explicit schema definition is more efficient than automatic inference
  2. Type Conversion: Direct POJO conversion avoids Row overhead
  3. Changelog Overhead: Changelog streams have additional metadata overhead
  4. Watermark Propagation: Ensure proper watermark handling in conversions

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

docs

built-in-connectors.md

datastream-conversions.md

index.md

legacy-connector-support.md

modern-connector-framework.md

stream-table-environment.md

watermark-strategies.md

tile.json