Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.
—
Bridge between Table API and DataStream API enabling conversion between tables and data streams for hybrid stream/batch processing applications.
Specialized TableEnvironment that provides seamless integration between Table API and DataStream API for streaming applications.
/**
* Create StreamTableEnvironment from StreamExecutionEnvironment
* @param streamEnv Existing StreamExecutionEnvironment
* @return StreamTableEnvironment instance
*/
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv);
/**
* Create StreamTableEnvironment with specific settings
* @param streamEnv Existing StreamExecutionEnvironment
* @param settings Table environment configuration
* @return StreamTableEnvironment instance
*/
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv,
EnvironmentSettings settings);
/**
* Get the underlying StreamExecutionEnvironment
* @return StreamExecutionEnvironment instance
*/
public StreamExecutionEnvironment getStreamExecutionEnvironment();Basic Setup:
// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(10000);
// Create table environment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Configure table environment
tEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");Convert DataStream instances to Table for SQL and Table API operations.
/**
* Convert DataStream to Table using automatic schema inference
* @param dataStream DataStream to convert
* @return Table representing the stream data
*/
public <T> Table fromDataStream(DataStream<T> dataStream);
/**
* Convert DataStream to Table with explicit field selection
* @param dataStream DataStream to convert
* @param fields Expressions defining the table schema
* @return Table with specified schema
*/
public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
/**
* Convert DataStream to Table with explicit schema
* @param dataStream DataStream to convert
* @param schema Complete schema definition including watermarks
* @return Table with specified schema
*/
public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
/**
* Create a temporary view from DataStream
* @param name View name for SQL queries
* @param dataStream DataStream to expose as view
*/
public <T> void createTemporaryView(String name, DataStream<T> dataStream);
/**
* Create a temporary view from DataStream with schema
* @param name View name for SQL queries
* @param dataStream DataStream to expose as view
* @param schema Schema definition for the view
*/
public <T> void createTemporaryView(String name, DataStream<T> dataStream, Schema schema);DataStream to Table Examples:
// Simple POJO conversion
DataStream<Order> orderStream = env.addSource(new OrderSource());
// Automatic schema inference from POJO
Table orders = tEnv.fromDataStream(orderStream);
// Explicit field selection and aliasing
Table ordersWithAlias = tEnv.fromDataStream(orderStream,
$("orderId").as("id"),
$("customerId"),
$("amount"),
$("orderTime"));
// Complex schema with watermarks for event time
Schema orderSchema = Schema.newBuilder()
.column("orderId", DataTypes.BIGINT())
.column("customerId", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("orderTime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
.build();
Table ordersWithWatermark = tEnv.fromDataStream(orderStream, orderSchema);
// Create temporary view for SQL access
tEnv.createTemporaryView("orders", orderStream, orderSchema);
tEnv.executeSql("SELECT customerId, SUM(amount) FROM orders " +
"WHERE orderTime > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
"GROUP BY customerId").print();Convert Table instances back to DataStream for stream processing operations.
/**
* Convert Table to DataStream with automatic type inference
* @param table Table to convert
* @return DataStream with Row type
*/
public DataStream<Row> toDataStream(Table table);
/**
* Convert Table to DataStream with specific target type
* @param table Table to convert
* @param targetClass Target Java class for stream elements
* @return DataStream with specified type
*/
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
/**
* Convert Table to DataStream with type information
* @param table Table to convert
* @param targetType TypeInformation for stream elements
* @return DataStream with specified type
*/
public <T> DataStream<T> toDataStream(Table table, TypeInformation<T> targetType);
/**
* Convert changing table to changelog stream
* @param table Table to convert (may contain updates/deletes)
* @return DataStream of Row with change flags
*/
public DataStream<Row> toChangelogStream(Table table);
/**
* Convert changing table to changelog stream with specific type
* @param table Table to convert
* @param targetClass Target Java class for stream elements
* @return DataStream with change information
*/
public <T> DataStream<T> toChangelogStream(Table table, Class<T> targetClass);
/**
* Convert changing table to retract stream
* @param table Table to convert
* @return DataStream of Tuple2<Boolean, Row> where Boolean indicates add/retract
*/
public DataStream<Tuple2<Boolean, Row>> toRetractStream(Table table);
/**
* Convert changing table to retract stream with specific type
* @param table Table to convert
* @param targetClass Target Java class for stream elements
* @return DataStream of Tuple2<Boolean, T> with retract information
*/
public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> targetClass);Table to DataStream Examples:
// Basic conversion to Row DataStream
Table filteredOrders = tEnv.from("orders")
.filter($("amount").isGreater(lit(100)));
DataStream<Row> resultStream = tEnv.toDataStream(filteredOrders);
// Convert to specific POJO type
DataStream<OrderSummary> summaryStream = tEnv.toDataStream(
tEnv.from("orders")
.groupBy($("customerId"))
.select($("customerId"), $("amount").sum().as("totalAmount")),
OrderSummary.class
);
// Handle updates with changelog stream
Table customerTotals = tEnv.from("orders")
.groupBy($("customerId"))
.select($("customerId"), $("amount").sum().as("total"));
DataStream<Row> changelogStream = tEnv.toChangelogStream(customerTotals);
changelogStream.process(new ProcessFunction<Row, String>() {
@Override
public void processElement(Row row, Context ctx, Collector<String> out) {
RowKind kind = row.getKind();
switch (kind) {
case INSERT:
out.collect("New customer total: " + row);
break;
case UPDATE_AFTER:
out.collect("Updated customer total: " + row);
break;
case DELETE:
out.collect("Removed customer: " + row);
break;
}
}
});
// Use retract stream for legacy compatibility
DataStream<Tuple2<Boolean, OrderSummary>> retractStream =
tEnv.toRetractStream(customerTotals, OrderSummary.class);Handle event time processing and watermark propagation between DataStream and Table API.
// Event time assignment in DataStream before conversion
DataStream<Order> ordersWithEventTime = orderStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((order, timestamp) -> order.getOrderTime())
);
// Convert to Table preserving event time
Table ordersTable = tEnv.fromDataStream(ordersWithEventTime,
Schema.newBuilder()
.column("orderId", DataTypes.BIGINT())
.column("customerId", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("orderTime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
.build()
);Handle type conversions and mappings between DataStream and Table type systems.
/**
* Type mapping utilities for DataStream-Table integration
*/
public class TypeConversions {
/**
* Convert DataStream TypeInformation to Table DataType
* @param typeInfo DataStream type information
* @return Equivalent Table API data type
*/
public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo);
/**
* Convert Table DataType to DataStream TypeInformation
* @param dataType Table API data type
* @return Equivalent DataStream type information
*/
public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType);
}Type Mapping Examples:
// Complex POJO with nested fields
@Data
public class ComplexOrder {
public Long orderId;
public CustomerInfo customer;
public List<OrderItem> items;
public Instant orderTime;
@Data
public static class CustomerInfo {
public Long customerId;
public String name;
public String email;
}
@Data
public static class OrderItem {
public String productId;
public Integer quantity;
public BigDecimal price;
}
}
// DataStream with complex type
DataStream<ComplexOrder> complexOrderStream = env.addSource(new ComplexOrderSource());
// Convert with nested field access
Table complexOrders = tEnv.fromDataStream(complexOrderStream,
$("orderId"),
$("customer.customerId").as("customerId"),
$("customer.name").as("customerName"),
$("items").cardinality().as("itemCount"),
$("orderTime"));
// Flatten nested structure with Table API
Table flattenedOrders = complexOrders
.joinLateral(call("EXPLODE", $("items")).as("item"))
.select($("orderId"),
$("customerId"),
$("item.productId").as("productId"),
$("item.quantity").as("quantity"));Common patterns for combining DataStream and Table API operations.
// Pattern 1: Stream -> Table -> Stream pipeline
DataStream<RawEvent> rawEvents = env.addSource(new EventSource());
// Data cleaning and enrichment with Table API
Table cleanedEvents = tEnv.fromDataStream(rawEvents)
.filter($("value").isNotNull())
.select($("eventId"),
$("value").upperCase().as("cleanValue"),
$("timestamp"))
.join(tEnv.from("reference_data"),
$("eventId").isEqual($("reference_data.id")));
// Continue processing with DataStream API
DataStream<EnrichedEvent> enrichedStream = tEnv.toDataStream(cleanedEvents, EnrichedEvent.class);
enrichedStream
.keyBy(EnrichedEvent::getCategory)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new EventAggregator())
.addSink(new ResultSink());
// Pattern 2: Hybrid aggregation with state
Table continuousAggregates = tEnv.from("event_stream")
.window(Tumble.over(lit(1).minutes()).on($("eventTime")).as("w"))
.groupBy($("w"), $("category"))
.select($("category"),
$("w").start().as("windowStart"),
$("value").sum().as("total"));
DataStream<WindowResult> aggregateStream = tEnv.toDataStream(continuousAggregates, WindowResult.class);
// Add custom stateful processing
aggregateStream
.keyBy(WindowResult::getCategory)
.process(new StatefulProcessor())
.addSink(new AlertSink());Configuration options for optimizing DataStream-Table integration.
// Configure table environment for stream processing
Configuration config = tEnv.getConfig().getConfiguration();
// Enable mini-batch optimization for better throughput
config.setString("table.exec.mini-batch.enabled", "true");
config.setString("table.exec.mini-batch.allow-latency", "1s");
config.setString("table.exec.mini-batch.size", "1000");
// Configure state backend for table operations
config.setString("table.exec.state.backend", "rocksdb");
config.setString("table.exec.state.checkpoint-interval", "10s");
// Optimize for low latency vs high throughput
config.setString("table.exec.streaming.prefer-append-only", "true");
config.setString("table.exec.emit.early-fire.enabled", "true");
config.setString("table.exec.emit.early-fire.delay", "1000ms");Best practices for handling errors and monitoring DataStream-Table integration.
// Error handling in conversions
try {
Table result = tEnv.fromDataStream(dataStream);
DataStream<Row> output = tEnv.toDataStream(result);
} catch (ValidationException e) {
// Handle schema validation errors
log.error("Schema validation failed: " + e.getMessage());
} catch (TableException e) {
// Handle table operation errors
log.error("Table operation failed: " + e.getMessage());
}
// Add monitoring to DataStream operations
DataStream<Order> monitoredStream = orderStream
.map(new MapFunction<Order, Order>() {
private transient Counter recordCounter;
@Override
public void open(Configuration parameters) {
recordCounter = getRuntimeContext()
.getMetricGroup()
.counter("records_processed");
}
@Override
public Order map(Order order) {
recordCounter.inc();
return order;
}
});Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber