CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-uber

Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.

Pending
Overview
Eval results
Files

datastream-bridge.mddocs/

DataStream Integration

Bridge between Table API and DataStream API enabling conversion between tables and data streams for hybrid stream/batch processing applications.

Capabilities

StreamTableEnvironment

Specialized TableEnvironment that provides seamless integration between Table API and DataStream API for streaming applications.

/**
 * Create StreamTableEnvironment from StreamExecutionEnvironment
 * @param streamEnv Existing StreamExecutionEnvironment
 * @return StreamTableEnvironment instance
 */
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv);

/**
 * Create StreamTableEnvironment with specific settings
 * @param streamEnv Existing StreamExecutionEnvironment
 * @param settings Table environment configuration
 * @return StreamTableEnvironment instance
 */
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv, 
                                           EnvironmentSettings settings);

/**
 * Get the underlying StreamExecutionEnvironment
 * @return StreamExecutionEnvironment instance
 */
public StreamExecutionEnvironment getStreamExecutionEnvironment();

Basic Setup:

// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(10000);

// Create table environment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Configure table environment
tEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");

DataStream to Table Conversion

Convert DataStream instances to Table for SQL and Table API operations.

/**
 * Convert DataStream to Table using automatic schema inference
 * @param dataStream DataStream to convert
 * @return Table representing the stream data
 */
public <T> Table fromDataStream(DataStream<T> dataStream);

/**
 * Convert DataStream to Table with explicit field selection
 * @param dataStream DataStream to convert
 * @param fields Expressions defining the table schema
 * @return Table with specified schema
 */
public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

/**
 * Convert DataStream to Table with explicit schema
 * @param dataStream DataStream to convert
 * @param schema Complete schema definition including watermarks
 * @return Table with specified schema
 */
public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

/**
 * Create a temporary view from DataStream
 * @param name View name for SQL queries
 * @param dataStream DataStream to expose as view
 */
public <T> void createTemporaryView(String name, DataStream<T> dataStream);

/**
 * Create a temporary view from DataStream with schema
 * @param name View name for SQL queries
 * @param dataStream DataStream to expose as view
 * @param schema Schema definition for the view
 */
public <T> void createTemporaryView(String name, DataStream<T> dataStream, Schema schema);

DataStream to Table Examples:

// Simple POJO conversion
DataStream<Order> orderStream = env.addSource(new OrderSource());

// Automatic schema inference from POJO
Table orders = tEnv.fromDataStream(orderStream);

// Explicit field selection and aliasing
Table ordersWithAlias = tEnv.fromDataStream(orderStream, 
    $("orderId").as("id"),
    $("customerId"),
    $("amount"),
    $("orderTime"));

// Complex schema with watermarks for event time
Schema orderSchema = Schema.newBuilder()
    .column("orderId", DataTypes.BIGINT())
    .column("customerId", DataTypes.BIGINT()) 
    .column("amount", DataTypes.DECIMAL(10, 2))
    .column("orderTime", DataTypes.TIMESTAMP_LTZ(3))
    .watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
    .build();

Table ordersWithWatermark = tEnv.fromDataStream(orderStream, orderSchema);

// Create temporary view for SQL access
tEnv.createTemporaryView("orders", orderStream, orderSchema);
tEnv.executeSql("SELECT customerId, SUM(amount) FROM orders " +
               "WHERE orderTime > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
               "GROUP BY customerId").print();

Table to DataStream Conversion

Convert Table instances back to DataStream for stream processing operations.

/**
 * Convert Table to DataStream with automatic type inference
 * @param table Table to convert
 * @return DataStream with Row type
 */
public DataStream<Row> toDataStream(Table table);

/**
 * Convert Table to DataStream with specific target type
 * @param table Table to convert
 * @param targetClass Target Java class for stream elements
 * @return DataStream with specified type
 */
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

/**
 * Convert Table to DataStream with type information
 * @param table Table to convert
 * @param targetType TypeInformation for stream elements
 * @return DataStream with specified type
 */
public <T> DataStream<T> toDataStream(Table table, TypeInformation<T> targetType);

/**
 * Convert changing table to changelog stream
 * @param table Table to convert (may contain updates/deletes)
 * @return DataStream of Row with change flags
 */
public DataStream<Row> toChangelogStream(Table table);

/**
 * Convert changing table to changelog stream with specific type
 * @param table Table to convert
 * @param targetClass Target Java class for stream elements
 * @return DataStream with change information
 */
public <T> DataStream<T> toChangelogStream(Table table, Class<T> targetClass);

/**
 * Convert changing table to retract stream
 * @param table Table to convert
 * @return DataStream of Tuple2<Boolean, Row> where Boolean indicates add/retract
 */
public DataStream<Tuple2<Boolean, Row>> toRetractStream(Table table);

/**
 * Convert changing table to retract stream with specific type
 * @param table Table to convert
 * @param targetClass Target Java class for stream elements
 * @return DataStream of Tuple2<Boolean, T> with retract information
 */
public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> targetClass);

Table to DataStream Examples:

// Basic conversion to Row DataStream
Table filteredOrders = tEnv.from("orders")
    .filter($("amount").isGreater(lit(100)));
DataStream<Row> resultStream = tEnv.toDataStream(filteredOrders);

// Convert to specific POJO type
DataStream<OrderSummary> summaryStream = tEnv.toDataStream(
    tEnv.from("orders")
        .groupBy($("customerId"))
        .select($("customerId"), $("amount").sum().as("totalAmount")),
    OrderSummary.class
);

// Handle updates with changelog stream
Table customerTotals = tEnv.from("orders")
    .groupBy($("customerId"))
    .select($("customerId"), $("amount").sum().as("total"));

DataStream<Row> changelogStream = tEnv.toChangelogStream(customerTotals);
changelogStream.process(new ProcessFunction<Row, String>() {
    @Override
    public void processElement(Row row, Context ctx, Collector<String> out) {
        RowKind kind = row.getKind();
        switch (kind) {
            case INSERT:
                out.collect("New customer total: " + row);
                break;
            case UPDATE_AFTER:
                out.collect("Updated customer total: " + row);
                break;
            case DELETE:
                out.collect("Removed customer: " + row);
                break;
        }
    }
});

// Use retract stream for legacy compatibility
DataStream<Tuple2<Boolean, OrderSummary>> retractStream = 
    tEnv.toRetractStream(customerTotals, OrderSummary.class);

Event Time and Watermarks

Handle event time processing and watermark propagation between DataStream and Table API.

// Event time assignment in DataStream before conversion
DataStream<Order> ordersWithEventTime = orderStream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((order, timestamp) -> order.getOrderTime())
    );

// Convert to Table preserving event time
Table ordersTable = tEnv.fromDataStream(ordersWithEventTime,
    Schema.newBuilder()
        .column("orderId", DataTypes.BIGINT())
        .column("customerId", DataTypes.BIGINT())
        .column("amount", DataTypes.DECIMAL(10, 2))
        .column("orderTime", DataTypes.TIMESTAMP_LTZ(3))
        .watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
        .build()
);

Type System Integration

Handle type conversions and mappings between DataStream and Table type systems.

/**
 * Type mapping utilities for DataStream-Table integration
 */
public class TypeConversions {
    /**
     * Convert DataStream TypeInformation to Table DataType
     * @param typeInfo DataStream type information
     * @return Equivalent Table API data type
     */
    public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo);
    
    /**
     * Convert Table DataType to DataStream TypeInformation
     * @param dataType Table API data type
     * @return Equivalent DataStream type information
     */
    public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType);
}

Type Mapping Examples:

// Complex POJO with nested fields
@Data
public class ComplexOrder {
    public Long orderId;
    public CustomerInfo customer;
    public List<OrderItem> items;
    public Instant orderTime;
    
    @Data
    public static class CustomerInfo {
        public Long customerId;
        public String name;
        public String email;
    }
    
    @Data
    public static class OrderItem {
        public String productId;
        public Integer quantity;
        public BigDecimal price;
    }
}

// DataStream with complex type
DataStream<ComplexOrder> complexOrderStream = env.addSource(new ComplexOrderSource());

// Convert with nested field access
Table complexOrders = tEnv.fromDataStream(complexOrderStream,
    $("orderId"),
    $("customer.customerId").as("customerId"), 
    $("customer.name").as("customerName"),
    $("items").cardinality().as("itemCount"),
    $("orderTime"));

// Flatten nested structure with Table API
Table flattenedOrders = complexOrders
    .joinLateral(call("EXPLODE", $("items")).as("item"))
    .select($("orderId"), 
            $("customerId"),
            $("item.productId").as("productId"),
            $("item.quantity").as("quantity"));

Stream Processing Patterns

Common patterns for combining DataStream and Table API operations.

// Pattern 1: Stream -> Table -> Stream pipeline
DataStream<RawEvent> rawEvents = env.addSource(new EventSource());

// Data cleaning and enrichment with Table API
Table cleanedEvents = tEnv.fromDataStream(rawEvents)
    .filter($("value").isNotNull())
    .select($("eventId"),
            $("value").upperCase().as("cleanValue"),
            $("timestamp"))
    .join(tEnv.from("reference_data"), 
          $("eventId").isEqual($("reference_data.id")));

// Continue processing with DataStream API
DataStream<EnrichedEvent> enrichedStream = tEnv.toDataStream(cleanedEvents, EnrichedEvent.class);
enrichedStream
    .keyBy(EnrichedEvent::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new EventAggregator())
    .addSink(new ResultSink());

// Pattern 2: Hybrid aggregation with state
Table continuousAggregates = tEnv.from("event_stream")
    .window(Tumble.over(lit(1).minutes()).on($("eventTime")).as("w"))
    .groupBy($("w"), $("category"))
    .select($("category"), 
            $("w").start().as("windowStart"),
            $("value").sum().as("total"));

DataStream<WindowResult> aggregateStream = tEnv.toDataStream(continuousAggregates, WindowResult.class);

// Add custom stateful processing
aggregateStream
    .keyBy(WindowResult::getCategory)
    .process(new StatefulProcessor())
    .addSink(new AlertSink());

Configuration and Optimization

Configuration options for optimizing DataStream-Table integration.

// Configure table environment for stream processing
Configuration config = tEnv.getConfig().getConfiguration();

// Enable mini-batch optimization for better throughput
config.setString("table.exec.mini-batch.enabled", "true");
config.setString("table.exec.mini-batch.allow-latency", "1s");
config.setString("table.exec.mini-batch.size", "1000");

// Configure state backend for table operations
config.setString("table.exec.state.backend", "rocksdb");
config.setString("table.exec.state.checkpoint-interval", "10s");

// Optimize for low latency vs high throughput
config.setString("table.exec.streaming.prefer-append-only", "true");
config.setString("table.exec.emit.early-fire.enabled", "true");
config.setString("table.exec.emit.early-fire.delay", "1000ms");

Error Handling and Monitoring

Best practices for handling errors and monitoring DataStream-Table integration.

// Error handling in conversions
try {
    Table result = tEnv.fromDataStream(dataStream);
    DataStream<Row> output = tEnv.toDataStream(result);
} catch (ValidationException e) {
    // Handle schema validation errors
    log.error("Schema validation failed: " + e.getMessage());
} catch (TableException e) {
    // Handle table operation errors
    log.error("Table operation failed: " + e.getMessage());
}

// Add monitoring to DataStream operations
DataStream<Order> monitoredStream = orderStream
    .map(new MapFunction<Order, Order>() {
        private transient Counter recordCounter;
        
        @Override
        public void open(Configuration parameters) {
            recordCounter = getRuntimeContext()
                .getMetricGroup()
                .counter("records_processed");
        }
        
        @Override
        public Order map(Order order) {
            recordCounter.inc();
            return order;
        }
    });

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber

docs

connectors.md

data-types.md

datastream-bridge.md

expressions.md

functions.md

index.md

sql-gateway.md

table-operations.md

tile.json