or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md
tile.json

tessl/maven-flink-table-api-java-bridge

Java bridge API for Apache Flink's Table/SQL functionality, enabling seamless integration between table operations and DataStream APIs

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-java-bridge@1.20.x

To install, run

npx @tessl/cli install tessl/maven-flink-table-api-java-bridge@1.20.0

index.mddocs/

Apache Flink Table API Java Bridge

Apache Flink's Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API. It enables developers to convert between DataStream and Table representations, register custom connectors, and execute table operations within streaming applications.

Package Information

  • Package Name: flink-table-api-java-bridge
  • Package Type: maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>1.20.2</version>
    </dependency>

Core Imports

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Schema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.table.types.AbstractDataType;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;

// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Convert DataStream to Table
DataStream<Row> dataStream = // ... your data stream
Table table = tableEnv.fromDataStream(dataStream);

// Execute SQL queries
Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE amount > 100");

// Convert back to DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(result);

Architecture

The Flink Table API Java Bridge is organized around several key components:

  • StreamTableEnvironment: Main entry point providing factory methods and conversion utilities
  • Connector Framework: Interfaces for implementing custom table sources and sinks
  • Built-in Connectors: Ready-to-use connectors for testing and common use cases (DataGen, Print, BlackHole)
  • Legacy APIs: Deprecated interfaces maintained for backward compatibility
  • Procedure Context: Framework for stored procedure execution

Capabilities

Table Environment Management

Core functionality for creating and managing table environments that bridge DataStream and Table APIs.

public interface StreamTableEnvironment extends TableEnvironment {
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
}

Table Environment

DataStream Integration

Bi-directional conversion between DataStream and Table with support for custom schemas and changelog processing.

// DataStream to Table conversion
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

// Table to DataStream conversion
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

// Temporary view creation
<T> void createTemporaryView(String path, DataStream<T> dataStream);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

// Statement set creation
StreamStatementSet createStatementSet();

// Deprecated methods (maintained for backward compatibility)
@Deprecated <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
@Deprecated <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
@Deprecated <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
@Deprecated <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);

DataStream Integration

Connector Framework

Provider interfaces for implementing custom table sources and sinks that integrate with DataStream API.

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
    DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
}

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
    DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
    Optional<Integer> getParallelism();
}

public interface ProviderContext {
    String generateUid(String name);
}

public interface ParallelismProvider {
    Optional<Integer> getParallelism();
}

Connector Framework

Built-in Connectors

Production-ready connectors for common use cases including test data generation, development output, and performance testing.

// DataGen connector for test data generation
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
    public String factoryIdentifier(); // Returns "datagen"
}

// Print connector for development output
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
    public String factoryIdentifier(); // Returns "print"
}

// BlackHole connector for performance testing
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
    public String factoryIdentifier(); // Returns "blackhole"
}

Built-in Connectors

Statement Set Operations

Batch execution of multiple table operations for optimized query planning and execution.

public interface StreamStatementSet extends StatementSet {
    StreamStatementSet add(TablePipeline tablePipeline);
    StreamStatementSet addInsertSql(String statement);
    StreamStatementSet addInsert(String targetPath, Table table);
    StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
    StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
    StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
    void attachAsDataStream();
    StreamStatementSet printExplain(ExplainDetail... extraDetails);
}

Statement Set Operations

Procedure Context

Framework for stored procedure execution with access to StreamExecutionEnvironment.

public interface ProcedureContext {
    StreamExecutionEnvironment getExecutionEnvironment();
}

public class DefaultProcedureContext implements ProcedureContext {
    public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment);
}

Procedure Context

Type Definitions

Core Types

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TablePipeline;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Optional;

Schema Building

// Schema builder for custom table schemas
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .columnByExpression("computed", "id * 2")
    .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
    .watermark("rowtime", "rowtime - INTERVAL '5' SECOND")
    .build();

Changelog Modes

// Changelog mode options
ChangelogMode.all()           // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
ChangelogMode.insertOnly()    // INSERT only
ChangelogMode.upsert()        // INSERT, UPDATE_AFTER, DELETE