or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-flink-table-api-java-bridge

Java bridge API for Apache Flink's Table/SQL functionality, enabling seamless integration between table operations and DataStream APIs

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-java-bridge@1.20.x

To install, run

npx @tessl/cli install tessl/maven-flink-table-api-java-bridge@1.20.0

0

# Apache Flink Table API Java Bridge

1

2

Apache Flink's Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API. It enables developers to convert between DataStream and Table representations, register custom connectors, and execute table operations within streaming applications.

3

4

## Package Information

5

6

- **Package Name**: flink-table-api-java-bridge

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-table-api-java-bridge</artifactId>

14

<version>1.20.2</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

22

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

23

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

24

import org.apache.flink.table.api.Table;

25

import org.apache.flink.table.api.Schema;

26

import org.apache.flink.streaming.api.datastream.DataStream;

27

import org.apache.flink.types.Row;

28

import org.apache.flink.table.types.AbstractDataType;

29

```

30

31

## Basic Usage

32

33

```java

34

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

35

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

36

import org.apache.flink.table.api.Table;

37

import org.apache.flink.streaming.api.datastream.DataStream;

38

import org.apache.flink.types.Row;

39

40

// Create execution environment

41

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

42

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

43

44

// Convert DataStream to Table

45

DataStream<Row> dataStream = // ... your data stream

46

Table table = tableEnv.fromDataStream(dataStream);

47

48

// Execute SQL queries

49

Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE amount > 100");

50

51

// Convert back to DataStream

52

DataStream<Row> resultStream = tableEnv.toDataStream(result);

53

```

54

55

## Architecture

56

57

The Flink Table API Java Bridge is organized around several key components:

58

59

- **StreamTableEnvironment**: Main entry point providing factory methods and conversion utilities

60

- **Connector Framework**: Interfaces for implementing custom table sources and sinks

61

- **Built-in Connectors**: Ready-to-use connectors for testing and common use cases (DataGen, Print, BlackHole)

62

- **Legacy APIs**: Deprecated interfaces maintained for backward compatibility

63

- **Procedure Context**: Framework for stored procedure execution

64

65

## Capabilities

66

67

### Table Environment Management

68

69

Core functionality for creating and managing table environments that bridge DataStream and Table APIs.

70

71

```java { .api }

72

public interface StreamTableEnvironment extends TableEnvironment {

73

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

74

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

75

}

76

```

77

78

[Table Environment](./table-environment.md)

79

80

### DataStream Integration

81

82

Bi-directional conversion between DataStream and Table with support for custom schemas and changelog processing.

83

84

```java { .api }

85

// DataStream to Table conversion

86

<T> Table fromDataStream(DataStream<T> dataStream);

87

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

88

Table fromChangelogStream(DataStream<Row> dataStream);

89

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

90

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

91

92

// Table to DataStream conversion

93

DataStream<Row> toDataStream(Table table);

94

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

95

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

96

DataStream<Row> toChangelogStream(Table table);

97

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

98

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

99

100

// Temporary view creation

101

<T> void createTemporaryView(String path, DataStream<T> dataStream);

102

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

103

104

// Statement set creation

105

StreamStatementSet createStatementSet();

106

107

// Deprecated methods (maintained for backward compatibility)

108

@Deprecated <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

109

@Deprecated <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);

110

@Deprecated <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

111

@Deprecated <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

112

@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

113

@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);

114

```

115

116

[DataStream Integration](./datastream-integration.md)

117

118

### Connector Framework

119

120

Provider interfaces for implementing custom table sources and sinks that integrate with DataStream API.

121

122

```java { .api }

123

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {

124

DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);

125

}

126

127

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

128

DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);

129

Optional<Integer> getParallelism();

130

}

131

132

public interface ProviderContext {

133

String generateUid(String name);

134

}

135

136

public interface ParallelismProvider {

137

Optional<Integer> getParallelism();

138

}

139

```

140

141

[Connector Framework](./connector-framework.md)

142

143

### Built-in Connectors

144

145

Production-ready connectors for common use cases including test data generation, development output, and performance testing.

146

147

```java { .api }

148

// DataGen connector for test data generation

149

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

150

public String factoryIdentifier(); // Returns "datagen"

151

}

152

153

// Print connector for development output

154

public class PrintTableSinkFactory implements DynamicTableSinkFactory {

155

public String factoryIdentifier(); // Returns "print"

156

}

157

158

// BlackHole connector for performance testing

159

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {

160

public String factoryIdentifier(); // Returns "blackhole"

161

}

162

```

163

164

[Built-in Connectors](./built-in-connectors.md)

165

166

### Statement Set Operations

167

168

Batch execution of multiple table operations for optimized query planning and execution.

169

170

```java { .api }

171

public interface StreamStatementSet extends StatementSet {

172

StreamStatementSet add(TablePipeline tablePipeline);

173

StreamStatementSet addInsertSql(String statement);

174

StreamStatementSet addInsert(String targetPath, Table table);

175

StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);

176

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);

177

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);

178

void attachAsDataStream();

179

StreamStatementSet printExplain(ExplainDetail... extraDetails);

180

}

181

```

182

183

[Statement Set Operations](./statement-set.md)

184

185

### Procedure Context

186

187

Framework for stored procedure execution with access to StreamExecutionEnvironment.

188

189

```java { .api }

190

public interface ProcedureContext {

191

StreamExecutionEnvironment getExecutionEnvironment();

192

}

193

194

public class DefaultProcedureContext implements ProcedureContext {

195

public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment);

196

}

197

```

198

199

[Procedure Context](./procedure-context.md)

200

201

## Type Definitions

202

203

### Core Types

204

205

```java { .api }

206

import org.apache.flink.table.api.Schema;

207

import org.apache.flink.table.api.Table;

208

import org.apache.flink.table.api.EnvironmentSettings;

209

import org.apache.flink.table.api.TableDescriptor;

210

import org.apache.flink.table.api.ExplainDetail;

211

import org.apache.flink.table.api.TablePipeline;

212

import org.apache.flink.table.connector.ChangelogMode;

213

import org.apache.flink.table.connector.ProviderContext;

214

import org.apache.flink.table.connector.ParallelismProvider;

215

import org.apache.flink.table.connector.source.ScanTableSource;

216

import org.apache.flink.table.connector.sink.DynamicTableSink;

217

import org.apache.flink.table.factories.DynamicTableSourceFactory;

218

import org.apache.flink.table.factories.DynamicTableSinkFactory;

219

import org.apache.flink.types.Row;

220

import org.apache.flink.types.RowKind;

221

import org.apache.flink.streaming.api.datastream.DataStream;

222

import org.apache.flink.streaming.api.datastream.DataStreamSink;

223

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

224

import org.apache.flink.table.data.RowData;

225

import org.apache.flink.table.expressions.Expression;

226

import org.apache.flink.api.common.typeinfo.TypeInformation;

227

import org.apache.flink.api.java.tuple.Tuple2;

228

import java.util.Optional;

229

```

230

231

### Schema Building

232

233

```java { .api }

234

// Schema builder for custom table schemas

235

Schema schema = Schema.newBuilder()

236

.column("id", DataTypes.BIGINT())

237

.column("name", DataTypes.STRING())

238

.columnByExpression("computed", "id * 2")

239

.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))

240

.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")

241

.build();

242

```

243

244

### Changelog Modes

245

246

```java { .api }

247

// Changelog mode options

248

ChangelogMode.all() // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

249

ChangelogMode.insertOnly() // INSERT only

250

ChangelogMode.upsert() // INSERT, UPDATE_AFTER, DELETE

251

```