or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

index.mddocs/

0

# Apache Flink Table API Java Bridge

1

2

The Apache Flink Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API for Java applications. This module enables developers to convert between DataStreams and Tables, create StreamTableEnvironments for unified batch and stream processing, and leverage SQL queries on streaming data with comprehensive connector support.

3

4

## Package Information

5

6

- **Package Name**: flink-table-api-java-bridge

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Language**: Java

10

- **Installation**: Add to Maven dependencies:

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-table-api-java-bridge</artifactId>

15

<version>2.1.0</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

23

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

24

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

25

import org.apache.flink.table.api.Table;

26

import org.apache.flink.streaming.api.datastream.DataStream;

27

```

28

29

## Basic Usage

30

31

```java

32

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

33

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

34

import org.apache.flink.table.api.Table;

35

import org.apache.flink.streaming.api.datastream.DataStream;

36

import org.apache.flink.types.Row;

37

38

// Create execution environment and table environment

39

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

40

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

41

42

// Convert DataStream to Table

43

DataStream<Row> dataStream = env.fromElements(

44

Row.of("Alice", 25),

45

Row.of("Bob", 30)

46

);

47

Table table = tableEnv.fromDataStream(dataStream);

48

49

// Execute SQL on the table

50

Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE f1 > 25");

51

52

// Convert Table back to DataStream

53

DataStream<Row> resultStream = tableEnv.toDataStream(result);

54

55

// Execute the pipeline

56

env.execute("Table API Bridge Example");

57

```

58

59

## Architecture

60

61

The Flink Table API Java Bridge is built around several key components:

62

63

- **StreamTableEnvironment**: Central context for bridging Table API and DataStream API operations

64

- **Stream-Table Conversion**: Bidirectional conversion utilities between DataStreams and Tables

65

- **Schema Management**: Type-safe schema definitions and transformations with watermark support

66

- **Built-in Connectors**: Testing and development connectors (blackhole, datagen, print)

67

- **Legacy Support**: Backward compatibility for existing table sources and sinks

68

- **Watermark Strategies**: Time-based event processing with configurable watermark assignment

69

70

## Capabilities

71

72

### Stream Table Environment

73

74

Core environment for creating and managing tables that integrate with DataStream API. Provides unified context for both batch and streaming operations.

75

76

```java { .api }

77

public interface StreamTableEnvironment extends TableEnvironment {

78

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

79

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

80

81

<T> Table fromDataStream(DataStream<T> dataStream);

82

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

83

<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields); // @Deprecated

84

85

<T> void createTemporaryView(String path, DataStream<T> dataStream);

86

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

87

<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields); // @Deprecated

88

89

DataStream<Row> toDataStream(Table table);

90

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

91

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

92

93

<T> DataStream<T> toAppendStream(Table table, Class<T> clazz); // @Deprecated

94

<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo); // @Deprecated

95

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz); // @Deprecated

96

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo); // @Deprecated

97

98

StreamStatementSet createStatementSet();

99

}

100

```

101

102

[Stream Table Environment](./stream-table-environment.md)

103

104

### Changelog Stream Processing

105

106

Advanced stream processing with support for changelog semantics including inserts, updates, and deletes.

107

108

```java { .api }

109

public interface StreamTableEnvironment extends TableEnvironment {

110

Table fromChangelogStream(DataStream<Row> dataStream);

111

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

112

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

113

114

DataStream<Row> toChangelogStream(Table table);

115

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

116

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

117

}

118

```

119

120

[Changelog Processing](./changelog-processing.md)

121

122

### DataStream Connector Providers

123

124

Provider interfaces for advanced connector development that integrate directly with DataStream API.

125

126

```java { .api }

127

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {

128

DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);

129

}

130

131

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

132

DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);

133

Optional<Integer> getParallelism();

134

}

135

```

136

137

[DataStream Connectors](./datastream-connectors.md)

138

139

### Built-in Test Connectors

140

141

Ready-to-use connectors for development, testing, and debugging table applications.

142

143

```java { .api }

144

// BlackHole connector - discards all data for performance testing

145

CREATE TABLE sink_table (...) WITH ('connector' = 'blackhole');

146

147

// DataGen connector - generates test data

148

CREATE TABLE source_table (...) WITH (

149

'connector' = 'datagen',

150

'rows-per-second' = '100',

151

'fields.user_id.kind' = 'sequence',

152

'fields.name.kind' = 'random'

153

);

154

155

// Print connector - outputs data to console for debugging

156

CREATE TABLE debug_table (...) WITH ('connector' = 'print');

157

```

158

159

[Built-in Connectors](./builtin-connectors.md)

160

161

### Watermark Strategies

162

163

Time-based event processing with configurable watermark assignment strategies for handling out-of-order events.

164

165

```java { .api }

166

public abstract class PeriodicWatermarkAssigner {

167

public abstract void nextTimestamp(long timestamp);

168

public abstract Watermark getWatermark();

169

public abstract Map<String, String> toProperties();

170

}

171

172

public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {

173

public BoundedOutOfOrderTimestamps(long delay);

174

}

175

176

public final class AscendingTimestamps extends PeriodicWatermarkAssigner {

177

public AscendingTimestamps();

178

}

179

```

180

181

[Watermark Strategies](./watermark-strategies.md)

182

183

### Statement Set Operations

184

185

Batch execution optimization for multiple table operations with shared planning and execution.

186

187

```java { .api }

188

public interface StreamStatementSet extends StatementSet {

189

StreamStatementSet add(TablePipeline tablePipeline);

190

StreamStatementSet addInsertSql(String statement);

191

StreamStatementSet addInsert(String targetPath, Table table);

192

StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);

193

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);

194

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);

195

StreamStatementSet printExplain(ExplainDetail... extraDetails);

196

197

void attachAsDataStream();

198

199

// Inherited from StatementSet:

200

// TableResult execute();

201

// CompiledPlan compilePlan(); // @Experimental

202

// String explain(ExplainDetail... extraDetails);

203

// String explain(ExplainFormat format, ExplainDetail... extraDetails);

204

}

205

```

206

207

[Statement Sets](./statement-sets.md)

208

209

### Procedure Context

210

211

Context for stored procedure execution with access to StreamExecutionEnvironment.

212

213

```java { .api }

214

public interface ProcedureContext {

215

StreamExecutionEnvironment getExecutionEnvironment();

216

}

217

218

public class DefaultProcedureContext implements ProcedureContext {

219

// Default implementation

220

}

221

```

222

223

[Procedures](./procedures.md)

224

225

## Types

226

227

### Core Schema Types

228

229

```java { .api }

230

import org.apache.flink.table.api.Schema;

231

import org.apache.flink.table.api.EnvironmentSettings;

232

import org.apache.flink.table.api.TableDescriptor;

233

import org.apache.flink.table.api.TableResult;

234

import org.apache.flink.table.api.CompiledPlan;

235

import org.apache.flink.table.api.ExplainDetail;

236

import org.apache.flink.table.api.ExplainFormat;

237

import org.apache.flink.table.connector.ChangelogMode;

238

import org.apache.flink.types.Row;

239

import org.apache.flink.types.RowKind;

240

```

241

242

### DataStream Integration Types

243

244

```java { .api }

245

import org.apache.flink.streaming.api.datastream.DataStream;

246

import org.apache.flink.streaming.api.datastream.DataStreamSink;

247

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

248

import org.apache.flink.api.common.typeinfo.TypeInformation;

249

import org.apache.flink.table.types.AbstractDataType;

250

import org.apache.flink.table.types.DataType;

251

import org.apache.flink.api.java.tuple.Tuple2;

252

import org.apache.flink.table.expressions.Expression;

253

```

254

255

### Connector Provider Types

256

257

```java { .api }

258

import org.apache.flink.table.connector.ProviderContext;

259

import org.apache.flink.table.connector.ParallelismProvider;

260

import org.apache.flink.table.data.RowData;

261

import java.util.Optional;

262

```