This document covers the seamless conversion between Flink Tables and DataStreams, enabling hybrid processing workflows that combine Table API with DataStream API capabilities.
The StreamTableEnvironment provides integration between Table API and DataStream API.
class StreamTableEnvironment {
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);
}Usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create with default settings
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create with custom settings
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);interface StreamTableEnvironment {
Table fromDataStream(DataStream<?> dataStream);
Table fromDataStream(DataStream<?> dataStream, Expression... fields);
Table fromDataStream(DataStream<?> dataStream, Schema schema);
// Changelog stream conversion
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
}Usage:
DataStream<Tuple3<String, Integer, Long>> dataStream = env.fromElements(
Tuple3.of("Alice", 25, 1000L),
Tuple3.of("Bob", 30, 2000L)
);
// Convert with automatic field inference
Table table1 = tableEnv.fromDataStream(dataStream);
// Convert with field mapping
Table table2 = tableEnv.fromDataStream(dataStream, $("name"), $("age"), $("salary"));
// Convert with explicit schema
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("salary", DataTypes.BIGINT())
.build();
Table table3 = tableEnv.fromDataStream(dataStream, schema);// POJO conversion
public static class User {
public String name;
public int age;
public long timestamp;
// constructors, getters, setters
}
DataStream<User> userStream = env.addSource(new UserSource());
Table userTable = tableEnv.fromDataStream(userStream);
// Row type conversion
DataStream<Row> rowStream = env.addSource(new RowSource());
Table rowTable = tableEnv.fromDataStream(rowStream,
$("user_id").bigint(),
$("event_time").timestamp(3),
$("event_type").string()
);DataStream<Tuple3<String, Long, String>> eventStream = env.addSource(new EventSource());
// Processing time
Table table = tableEnv.fromDataStream(eventStream,
$("user_id"),
$("event_time").rowtime(), // Event time from field
$("event_type"),
$("proc_time").proctime() // Processing time
);
// Event time with watermarks
DataStream<Event> watermarkedStream = eventStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.timestamp)
);
Table eventTable = tableEnv.fromDataStream(watermarkedStream,
$("user_id"),
$("timestamp").rowtime(),
$("event_type")
);interface StreamTableEnvironment {
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<T> targetDataType);
DataStream<Row> toDataStream(Table table);
// Legacy methods (deprecated)
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
}Usage:
Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18");
// Convert to Row (default)
DataStream<Row> rowStream = tableEnv.toDataStream(table);
// Convert to POJO
DataStream<User> userStream = tableEnv.toDataStream(table, User.class);
// Convert to Tuple
DataStream<Tuple2<String, Integer>> tupleStream = tableEnv.toDataStream(table,
Types.TUPLE(Types.STRING, Types.INT));interface StreamTableEnvironment {
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
}Usage:
Table aggregatedTable = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as cnt FROM events GROUP BY user_id"
);
// Convert to changelog stream (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggregatedTable);
changelogStream.process(new ProcessFunction<Row, String>() {
@Override
public void processElement(Row row, Context ctx, Collector<String> out) {
RowKind kind = row.getKind();
String message = String.format("Kind: %s, Data: %s", kind, row);
out.collect(message);
}
});interface StreamTableEnvironment {
void createTemporaryView(String path, DataStream<?> dataStream);
void createTemporaryView(String path, DataStream<?> dataStream, Expression... fields);
void createTemporaryView(String path, DataStream<?> dataStream, Schema schema);
}Usage:
DataStream<Order> orderStream = env.addSource(new OrderSource());
// Create temporary view
tableEnv.createTemporaryView("orders", orderStream,
$("order_id"),
$("customer_id"),
$("amount"),
$("order_time").rowtime()
);
// Use in SQL
Table result = tableEnv.sqlQuery(
"SELECT customer_id, SUM(amount) " +
"FROM orders " +
"GROUP BY customer_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
);For batch processing with DataSet API (deprecated in newer versions):
class BatchTableEnvironment {
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment);
Table fromDataSet(DataSet<?> dataSet);
Table fromDataSet(DataSet<?> dataSet, Expression... fields);
<T> DataSet<T> toDataSet(Table table, Class<T> clazz);
DataSet<Row> toDataSet(Table table);
}object StreamTableEnvironment {
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}
class StreamTableEnvironment {
def fromDataStream[T](dataStream: DataStream[T]): Table
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]
}Scala Usage:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// DataStream to Table
val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25), ("Bob", 30))
val table = tableEnv.fromDataStream(dataStream, 'name, 'age)
// Table to DataStream
val resultStream: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)class TypeInference {
static DataType inferDataType(Class<?> clazz);
static DataType inferDataType(TypeInformation<?> typeInfo);
}enum RowKind {
INSERT("+I"),
UPDATE_BEFORE("-U"),
UPDATE_AFTER("+U"),
DELETE("-D");
}
class Row {
RowKind getKind();
void setKind(RowKind kind);
Object getField(int pos);
Object getField(String name);
}// Configure execution environment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
// Configure table environment
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
tableEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");// Configure state backend
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoints");
// Configure memory
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "1024m");
config.setString("taskmanager.memory.flink.size", "768m");// Start with DataStream processing
DataStream<Event> eventStream = env
.addSource(new KafkaSource())
.filter(event -> event.isValid())
.keyBy(Event::getUserId);
// Convert to Table for SQL processing
tableEnv.createTemporaryView("events", eventStream);
Table aggregated = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as event_count, " +
" TUMBLE_START(event_time, INTERVAL '5' MINUTES) as window_start " +
"FROM events " +
"GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTES)"
);
// Convert back to DataStream for complex processing
DataStream<UserStats> statsStream = tableEnv.toDataStream(aggregated, UserStats.class);
statsStream
.keyBy(UserStats::getUserId)
.process(new ComplexStatefulFunction())
.addSink(new ElasticsearchSink<>());// Raw events from multiple sources
DataStream<ClickEvent> clicks = env.addSource(new ClickEventSource());
DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseEventSource());
// Register as tables
tableEnv.createTemporaryView("clicks", clicks);
tableEnv.createTemporaryView("purchases", purchases);
// Feature engineering with SQL
Table features = tableEnv.sqlQuery(
"SELECT " +
" c.user_id," +
" COUNT(c.click_id) as clicks_last_hour," +
" COALESCE(SUM(p.amount), 0) as purchases_last_hour " +
"FROM clicks c " +
"LEFT JOIN purchases p ON c.user_id = p.user_id " +
" AND p.purchase_time BETWEEN c.click_time - INTERVAL '1' HOUR AND c.click_time " +
"WHERE c.click_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
"GROUP BY c.user_id"
);
// Output as enriched stream
DataStream<UserFeatures> featureStream = tableEnv.toDataStream(features, UserFeatures.class);class StreamTableException extends TableException;
class DataStreamConversionException extends StreamTableException;class Schema {
static Schema.Builder newBuilder();
interface Builder {
Builder column(String columnName, DataType dataType);
Builder columnByExpression(String columnName, String expression);
Builder columnByMetadata(String columnName, DataType dataType);
Builder watermark(String columnName, String watermarkExpression);
Builder primaryKey(String... columnNames);
Schema build();
}
}
enum ChangelogMode {
INSERT_ONLY,
UPSERT,
ALL
}
interface AbstractDataType<T>;
class DataTypes {
static AbstractDataType<Row> ROW(AbstractDataType<?>... fieldDataTypes);
static <T> AbstractDataType<T> of(Class<T> expectedClass);
}