Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
—
The Flink Table API Java Bridge provides seamless conversion between DataStream and Table representations, enabling developers to combine the expressiveness of SQL/Table API with the flexibility of DataStream API.
The bridge supports bidirectional conversions:
Convert a DataStream to a Table using automatic schema inference:
<T> Table fromDataStream(DataStream<T> dataStream);Usage Example:
// Create DataStream from POJO objects
DataStream<Person> personStream = env.fromElements(
new Person("Alice", 25, "Engineer"),
new Person("Bob", 30, "Manager"),
new Person("Charlie", 28, "Developer")
);
// Convert to Table (schema inferred from POJO structure)
Table personTable = tableEnv.fromDataStream(personStream);
// Now you can use SQL on the table
Table result = tableEnv.sqlQuery("SELECT name, age FROM " + personTable + " WHERE age > 26");Convert a DataStream to a Table with explicit schema definition:
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);Usage Example:
DataStream<Row> rowStream = env.fromElements(
Row.of("Alice", 25, "2023-01-15"),
Row.of("Bob", 30, "2023-02-20"),
Row.of("Charlie", 28, "2023-03-10")
);
// Define custom schema with proper data types
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("hire_date", DataTypes.STRING())
.build();
Table employeeTable = tableEnv.fromDataStream(rowStream, schema);Convert changelog streams (insert/update/delete operations) to tables:
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);Usage Example:
// Changelog stream with Row kind information
DataStream<Row> changelogStream = env.addSource(new ChangelogSourceFunction());
// Basic changelog conversion
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
// With custom schema
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("value", DataTypes.DOUBLE())
.build();
Table schemaTable = tableEnv.fromChangelogStream(changelogStream, schema);
// With specific changelog mode
ChangelogMode insertUpdateMode = ChangelogMode.insertUpdate();
Table modeTable = tableEnv.fromChangelogStream(changelogStream, schema, insertUpdateMode);Convert a Table to a DataStream<Row>:
DataStream<Row> toDataStream(Table table);Usage Example:
// Create table from SQL query
Table resultTable = tableEnv.sqlQuery(
"SELECT name, age, salary FROM employees WHERE department = 'Engineering'"
);
// Convert to DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
// Process the stream further
resultStream.map(row -> "Employee: " + row.getField(0) + ", Age: " + row.getField(1))
.print();Convert a Table to a DataStream of a specific Java class:
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);Usage Example:
// Define result POJO
public static class EmployeeSummary {
public String name;
public Integer age;
public Double salary;
// Constructors, getters, setters
}
Table summaryTable = tableEnv.sqlQuery("SELECT name, age, salary FROM employees");
// Convert directly to POJO
DataStream<EmployeeSummary> summaryStream = tableEnv.toDataStream(summaryTable, EmployeeSummary.class);
summaryStream.filter(emp -> emp.salary > 50000.0)
.map(emp -> emp.name + " earns " + emp.salary)
.print();Convert a Table to a DataStream using explicit data type specification:
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);Usage Example:
Table aggregatedTable = tableEnv.sqlQuery("SELECT department, COUNT(*) as emp_count FROM employees GROUP BY department");
// Define target data type
AbstractDataType<?> rowType = DataTypes.ROW(
DataTypes.FIELD("department", DataTypes.STRING()),
DataTypes.FIELD("emp_count", DataTypes.BIGINT())
);
DataStream<Row> typedStream = tableEnv.toDataStream(aggregatedTable, rowType);Convert tables to changelog streams for capturing insert/update/delete operations:
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);Usage Example:
// Create a table with updates (e.g., from a GROUP BY query)
Table dynamicTable = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id"
);
// Convert to changelog stream
DataStream<Row> changelogStream = tableEnv.toChangelogStream(dynamicTable);
// Process changelog events
changelogStream.process(new ProcessFunction<Row, String>() {
@Override
public void processElement(Row row, Context ctx, Collector<String> out) {
RowKind kind = row.getKind();
switch (kind) {
case INSERT:
out.collect("New user: " + row.getField(0) + " with " + row.getField(1) + " events");
break;
case UPDATE_AFTER:
out.collect("Updated user: " + row.getField(0) + " now has " + row.getField(1) + " events");
break;
case DELETE:
out.collect("Removed user: " + row.getField(0));
break;
}
}
}).print();
// With target schema
Schema targetSchema = Schema.newBuilder()
.column("user_id", DataTypes.STRING())
.column("event_count", DataTypes.BIGINT())
.build();
DataStream<Row> schemaChangelogStream = tableEnv.toChangelogStream(dynamicTable, targetSchema);
// With specific changelog mode
ChangelogMode upsertMode = ChangelogMode.upsert();
DataStream<Row> upsertStream = tableEnv.toChangelogStream(dynamicTable, targetSchema, upsertMode);When working with custom schemas, use the Schema builder:
public class Schema {
public static Builder newBuilder();
public static class Builder {
public Builder column(String name, AbstractDataType<?> dataType);
public Builder columnByExpression(String name, String expression);
public Builder columnByMetadata(String name, AbstractDataType<?> dataType);
public Builder primaryKey(String... columnNames);
public Builder watermark(String columnName, String watermarkExpression);
public Schema build();
}
}Usage Example:
Schema complexSchema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("timestamp_col", DataTypes.TIMESTAMP(3))
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
.columnByExpression("proc_time", "PROCTIME()")
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
.primaryKey("id")
.build();The bridge supports different changelog modes for streaming tables:
public enum ChangelogMode {
// Available modes
public static ChangelogMode insertOnly();
public static ChangelogMode insertUpdate();
public static ChangelogMode insertUpdateDelete();
public static ChangelogMode upsert();
public static ChangelogMode all();
}Mode Descriptions:
Combine DataStream processing with SQL queries:
// Start with DataStream
DataStream<Event> eventStream = env.addSource(new EventSource());
// Convert to Table for SQL processing
Table eventTable = tableEnv.fromDataStream(eventStream);
tableEnv.createTemporaryView("events", eventTable);
// Apply SQL transformations
Table filteredEvents = tableEnv.sqlQuery(
"SELECT user_id, event_type, event_time " +
"FROM events " +
"WHERE event_type IN ('click', 'purchase') " +
"AND event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"
);
// Convert back to DataStream for further processing
DataStream<Row> processedStream = tableEnv.toDataStream(filteredEvents);
// Continue with DataStream operations
processedStream.keyBy(row -> row.getField(0))
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.apply(new MyWindowFunction())
.addSink(new MySink());Use changelog streams for stateful aggregations:
Table aggregationTable = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as event_count, MAX(event_time) as last_event " +
"FROM events " +
"GROUP BY user_id"
);
DataStream<Row> aggregationChangelog = tableEnv.toChangelogStream(aggregationTable);
// Handle aggregation changes
aggregationChangelog.process(new KeyedProcessFunction<String, Row, Alert>() {
private ValueState<Long> lastCount;
@Override
public void processElement(Row row, Context ctx, Collector<Alert> out) {
String userId = row.getFieldAs(0);
Long newCount = row.getFieldAs(1);
Long previousCount = lastCount.value();
if (previousCount != null && newCount > previousCount * 2) {
out.collect(new Alert(userId, "Activity spike detected"));
}
lastCount.update(newCount);
}
});Handle common conversion errors:
try {
// Schema mismatch
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
Table table = tableEnv.fromDataStream(invalidDataStream, schema);
} catch (ValidationException e) {
// Handle schema validation errors
logger.error("Schema validation failed: {}", e.getMessage());
} catch (TableException e) {
// Handle table conversion errors
logger.error("Table conversion failed: {}", e.getMessage());
} catch (IllegalArgumentException e) {
// Handle invalid arguments
logger.error("Invalid argument: {}", e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11