or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

datastream-integration.mddocs/

DataStream Integration

This document covers the seamless conversion between Flink Tables and DataStreams, enabling hybrid processing workflows that combine Table API with DataStream API capabilities.

StreamTableEnvironment

The StreamTableEnvironment provides integration between Table API and DataStream API.

Environment Creation

class StreamTableEnvironment {
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);
}

Usage:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create with default settings
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create with custom settings
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

DataStream to Table Conversion

Basic Conversion

interface StreamTableEnvironment {
    Table fromDataStream(DataStream<?> dataStream);
    Table fromDataStream(DataStream<?> dataStream, Expression... fields);
    Table fromDataStream(DataStream<?> dataStream, Schema schema);
    
    // Changelog stream conversion
    Table fromChangelogStream(DataStream<Row> dataStream);
    Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
    Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
}

Usage:

DataStream<Tuple3<String, Integer, Long>> dataStream = env.fromElements(
    Tuple3.of("Alice", 25, 1000L),
    Tuple3.of("Bob", 30, 2000L)
);

// Convert with automatic field inference
Table table1 = tableEnv.fromDataStream(dataStream);

// Convert with field mapping
Table table2 = tableEnv.fromDataStream(dataStream, $("name"), $("age"), $("salary"));

// Convert with explicit schema
Schema schema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .column("salary", DataTypes.BIGINT())
    .build();
Table table3 = tableEnv.fromDataStream(dataStream, schema);

Complex Type Conversion

// POJO conversion
public static class User {
    public String name;
    public int age;
    public long timestamp;
    // constructors, getters, setters
}

DataStream<User> userStream = env.addSource(new UserSource());
Table userTable = tableEnv.fromDataStream(userStream);

// Row type conversion  
DataStream<Row> rowStream = env.addSource(new RowSource());
Table rowTable = tableEnv.fromDataStream(rowStream, 
    $("user_id").bigint(), 
    $("event_time").timestamp(3),
    $("event_type").string()
);

Time Attribute Mapping

DataStream<Tuple3<String, Long, String>> eventStream = env.addSource(new EventSource());

// Processing time
Table table = tableEnv.fromDataStream(eventStream,
    $("user_id"),
    $("event_time").rowtime(),  // Event time from field
    $("event_type"),
    $("proc_time").proctime()   // Processing time
);

// Event time with watermarks
DataStream<Event> watermarkedStream = eventStream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.timestamp)
    );

Table eventTable = tableEnv.fromDataStream(watermarkedStream,
    $("user_id"),
    $("timestamp").rowtime(),
    $("event_type")
);

Table to DataStream Conversion

Basic Conversion

interface StreamTableEnvironment {
    <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
    <T> DataStream<T> toDataStream(Table table, AbstractDataType<T> targetDataType);
    DataStream<Row> toDataStream(Table table);
    
    // Legacy methods (deprecated)
    <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
    <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
    <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
    <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
}

Usage:

Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18");

// Convert to Row (default)
DataStream<Row> rowStream = tableEnv.toDataStream(table);

// Convert to POJO
DataStream<User> userStream = tableEnv.toDataStream(table, User.class);

// Convert to Tuple
DataStream<Tuple2<String, Integer>> tupleStream = tableEnv.toDataStream(table, 
    Types.TUPLE(Types.STRING, Types.INT));

Changelog Stream Conversion

interface StreamTableEnvironment {
    DataStream<Row> toChangelogStream(Table table);
    DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
    DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
}

Usage:

Table aggregatedTable = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as cnt FROM events GROUP BY user_id"
);

// Convert to changelog stream (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggregatedTable);

changelogStream.process(new ProcessFunction<Row, String>() {
    @Override
    public void processElement(Row row, Context ctx, Collector<String> out) {
        RowKind kind = row.getKind();
        String message = String.format("Kind: %s, Data: %s", kind, row);
        out.collect(message);
    }
});

Temporary Views from DataStreams

interface StreamTableEnvironment {
    void createTemporaryView(String path, DataStream<?> dataStream);
    void createTemporaryView(String path, DataStream<?> dataStream, Expression... fields);
    void createTemporaryView(String path, DataStream<?> dataStream, Schema schema);
}

Usage:

DataStream<Order> orderStream = env.addSource(new OrderSource());

// Create temporary view
tableEnv.createTemporaryView("orders", orderStream,
    $("order_id"),
    $("customer_id"), 
    $("amount"),
    $("order_time").rowtime()
);

// Use in SQL
Table result = tableEnv.sqlQuery(
    "SELECT customer_id, SUM(amount) " +
    "FROM orders " +
    "GROUP BY customer_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
);

Batch Integration (Legacy)

For batch processing with DataSet API (deprecated in newer versions):

class BatchTableEnvironment {
    static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment);
    
    Table fromDataSet(DataSet<?> dataSet);
    Table fromDataSet(DataSet<?> dataSet, Expression... fields);
    
    <T> DataSet<T> toDataSet(Table table, Class<T> clazz);
    DataSet<Row> toDataSet(Table table);
}

Scala Integration

StreamTableEnvironment (Scala)

object StreamTableEnvironment {
  def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
  def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}

class StreamTableEnvironment {
  def fromDataStream[T](dataStream: DataStream[T]): Table
  def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
  
  def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
  def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]
}

Scala Usage:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// DataStream to Table
val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25), ("Bob", 30))
val table = tableEnv.fromDataStream(dataStream, 'name, 'age)

// Table to DataStream
val resultStream: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)

Type System Integration

Type Inference

class TypeInference {
    static DataType inferDataType(Class<?> clazz);
    static DataType inferDataType(TypeInformation<?> typeInfo);
}

Row Kind Handling

enum RowKind {
    INSERT("+I"),
    UPDATE_BEFORE("-U"),
    UPDATE_AFTER("+U"), 
    DELETE("-D");
}

class Row {
    RowKind getKind();
    void setKind(RowKind kind);
    Object getField(int pos);
    Object getField(String name);
}

Configuration

Execution Configuration

// Configure execution environment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);

// Configure table environment
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
tableEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");

Memory Management

// Configure state backend
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoints");

// Configure memory
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "1024m");
config.setString("taskmanager.memory.flink.size", "768m");

Common Patterns

Hybrid Processing Pipeline

// Start with DataStream processing
DataStream<Event> eventStream = env
    .addSource(new KafkaSource())
    .filter(event -> event.isValid())
    .keyBy(Event::getUserId);

// Convert to Table for SQL processing
tableEnv.createTemporaryView("events", eventStream);

Table aggregated = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as event_count, " +
    "       TUMBLE_START(event_time, INTERVAL '5' MINUTES) as window_start " +
    "FROM events " +
    "GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTES)"
);

// Convert back to DataStream for complex processing
DataStream<UserStats> statsStream = tableEnv.toDataStream(aggregated, UserStats.class);

statsStream
    .keyBy(UserStats::getUserId)
    .process(new ComplexStatefulFunction())
    .addSink(new ElasticsearchSink<>());

Real-time Feature Engineering

// Raw events from multiple sources
DataStream<ClickEvent> clicks = env.addSource(new ClickEventSource());
DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseEventSource());

// Register as tables
tableEnv.createTemporaryView("clicks", clicks);
tableEnv.createTemporaryView("purchases", purchases);

// Feature engineering with SQL
Table features = tableEnv.sqlQuery(
    "SELECT " +
    "  c.user_id," +
    "  COUNT(c.click_id) as clicks_last_hour," +
    "  COALESCE(SUM(p.amount), 0) as purchases_last_hour " +
    "FROM clicks c " +
    "LEFT JOIN purchases p ON c.user_id = p.user_id " +
    "    AND p.purchase_time BETWEEN c.click_time - INTERVAL '1' HOUR AND c.click_time " +
    "WHERE c.click_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
    "GROUP BY c.user_id"
);

// Output as enriched stream
DataStream<UserFeatures> featureStream = tableEnv.toDataStream(features, UserFeatures.class);

Error Handling

class StreamTableException extends TableException;
class DataStreamConversionException extends StreamTableException;

Types

class Schema {
    static Schema.Builder newBuilder();
    
    interface Builder {
        Builder column(String columnName, DataType dataType);
        Builder columnByExpression(String columnName, String expression);
        Builder columnByMetadata(String columnName, DataType dataType);
        Builder watermark(String columnName, String watermarkExpression);
        Builder primaryKey(String... columnNames);
        Schema build();
    }
}

enum ChangelogMode {
    INSERT_ONLY,
    UPSERT,
    ALL
}

interface AbstractDataType<T>;
class DataTypes {
    static AbstractDataType<Row> ROW(AbstractDataType<?>... fieldDataTypes);
    static <T> AbstractDataType<T> of(Class<T> expectedClass);
}