or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

index.mddocs/

0

# Apache Flink Table API Java Bridge

1

2

Apache Flink Table API Java Bridge is a critical component that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs. This bridge provides the integration layer between Flink's high-level Table API and the core Java DataStream API, offering a unified programming model for both stream and batch table operations.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-table-api-java-bridge_2.11

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Version**: 1.14.6

10

- **Installation**: Add to your Maven dependencies or use as provided dependency in Flink applications

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-table-api-java-bridge_2.11</artifactId>

16

<version>1.14.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

24

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

25

import org.apache.flink.table.api.Table;

26

import org.apache.flink.table.api.Schema;

27

import org.apache.flink.table.api.EnvironmentSettings;

28

import org.apache.flink.table.api.TableConfig;

29

import org.apache.flink.streaming.api.datastream.DataStream;

30

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

31

import org.apache.flink.types.Row;

32

import org.apache.flink.table.connector.ChangelogMode;

33

import org.apache.flink.table.types.AbstractDataType;

34

import org.apache.flink.api.common.typeinfo.TypeInformation;

35

import org.apache.flink.api.java.tuple.Tuple2;

36

```

37

38

## Basic Usage

39

40

```java

41

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

42

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

43

import org.apache.flink.table.api.Table;

44

import org.apache.flink.streaming.api.datastream.DataStream;

45

import org.apache.flink.types.Row;

46

import org.apache.flink.table.api.Schema;

47

48

// Create execution environment

49

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

50

51

// Create table environment

52

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

53

54

// Convert DataStream to Table with automatic schema derivation

55

DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("John", 25), new MyPojo("Jane", 30));

56

Table table = tableEnv.fromDataStream(dataStream);

57

58

// Or with explicit schema

59

Schema schema = Schema.newBuilder()

60

.column("name", "STRING")

61

.column("age", "INT")

62

.build();

63

Table tableWithSchema = tableEnv.fromDataStream(dataStream, schema);

64

65

// Perform table operations

66

Table result = table.select($("name"), $("age").plus(1).as("age_plus_one"));

67

68

// Convert back to DataStream

69

DataStream<Row> resultStream = tableEnv.toDataStream(result);

70

71

// Execute

72

env.execute("Table Bridge Example");

73

```

74

75

## Architecture

76

77

The Flink Table API Java Bridge consists of several key components:

78

79

1. **Core Bridge API**: Main integration interfaces (`StreamTableEnvironment`, `StreamStatementSet`)

80

2. **Modern Connector Framework**: New connector interfaces for sources and sinks

81

3. **Legacy Connector Support**: Backward compatibility with deprecated connector interfaces

82

4. **Built-in Connectors**: DataGen, Print, and BlackHole connectors for testing

83

5. **Watermark Strategies**: Time-based watermarking for event-time processing

84

85

## Capabilities

86

87

### Stream Table Environment

88

89

The main entry point for creating and managing table environments that integrate with DataStream API.

90

91

```java { .api }

92

public interface StreamTableEnvironment extends TableEnvironment {

93

// Factory methods

94

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

95

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

96

@Deprecated

97

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);

98

99

// Function registration (deprecated)

100

@Deprecated

101

<T> void registerFunction(String name, TableFunction<T> tableFunction);

102

@Deprecated

103

<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);

104

@Deprecated

105

<T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction);

106

107

// DataStream to Table conversions

108

<T> Table fromDataStream(DataStream<T> dataStream);

109

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

110

@Deprecated

111

<T> Table fromDataStream(DataStream<T> dataStream, String fields);

112

@Deprecated

113

<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

114

115

Table fromChangelogStream(DataStream<Row> dataStream);

116

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

117

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

118

119

// Temporary view creation

120

<T> void createTemporaryView(String path, DataStream<T> dataStream);

121

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

122

@Deprecated

123

<T> void createTemporaryView(String path, DataStream<T> dataStream, String fields);

124

@Deprecated

125

<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);

126

@Deprecated

127

<T> void registerDataStream(String name, DataStream<T> dataStream);

128

@Deprecated

129

<T> void registerDataStream(String name, DataStream<T> dataStream, String fields);

130

131

// Table to DataStream conversions

132

DataStream<Row> toDataStream(Table table);

133

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

134

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

135

@Deprecated

136

<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

137

@Deprecated

138

<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

139

140

DataStream<Row> toChangelogStream(Table table);

141

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

142

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

143

@Deprecated

144

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

145

@Deprecated

146

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);

147

148

// Statement set creation

149

StreamStatementSet createStatementSet();

150

151

// Job execution (deprecated)

152

@Deprecated

153

JobExecutionResult execute(String jobName) throws Exception;

154

}

155

```

156

157

[Stream Table Environment](./stream-table-environment.md)

158

159

### DataStream Conversions

160

161

Convert between DataStream and Table representations for seamless integration.

162

163

```java { .api }

164

// DataStream to Table conversions

165

<T> Table fromDataStream(DataStream<T> dataStream);

166

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

167

Table fromChangelogStream(DataStream<Row> dataStream);

168

169

// Table to DataStream conversions

170

DataStream<Row> toDataStream(Table table);

171

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

172

DataStream<Row> toChangelogStream(Table table);

173

```

174

175

[DataStream Conversions](./datastream-conversions.md)

176

177

### Modern Connector Framework

178

179

New connector interfaces following FLIP-95 design for better integration with DataStream API.

180

181

```java { .api }

182

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {

183

DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);

184

}

185

186

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

187

DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);

188

189

@Override

190

default Optional<Integer> getParallelism() {

191

return Optional.empty();

192

}

193

}

194

195

public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {

196

static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);

197

SourceFunction<RowData> createSourceFunction();

198

boolean isBounded();

199

}

200

201

public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {

202

static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);

203

static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);

204

SinkFunction<RowData> createSinkFunction();

205

Optional<Integer> getParallelism();

206

}

207

```

208

209

[Modern Connector Framework](./modern-connector-framework.md)

210

211

### Built-in Connectors

212

213

Ready-to-use connectors for development and testing scenarios.

214

215

```java { .api }

216

// DataGen Connector for generating test data

217

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

218

// Factory methods and configuration

219

}

220

221

// Print Connector for outputting results

222

public class PrintTableSinkFactory implements DynamicTableSinkFactory {

223

// Factory methods and configuration

224

}

225

226

// BlackHole Connector for performance testing

227

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {

228

// Factory methods and configuration

229

}

230

```

231

232

[Built-in Connectors](./built-in-connectors.md)

233

234

### Watermark Strategies

235

236

Time-based watermarking strategies for event-time processing in streaming applications. These are legacy watermark strategy classes for table sources.

237

238

```java { .api }

239

public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {

240

public abstract void nextTimestamp(long timestamp);

241

public abstract Watermark getWatermark();

242

public abstract Map<String, String> toProperties();

243

}

244

245

public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {

246

public abstract Watermark getWatermark(Row row, long timestamp);

247

public abstract Map<String, String> toProperties();

248

}

249

250

public final class AscendingTimestamps extends PeriodicWatermarkAssigner {

251

@Override

252

public void nextTimestamp(long timestamp);

253

@Override

254

public Watermark getWatermark();

255

@Override

256

public Map<String, String> toProperties();

257

}

258

259

public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {

260

public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);

261

@Override

262

public void nextTimestamp(long timestamp);

263

@Override

264

public Watermark getWatermark();

265

@Override

266

public Map<String, String> toProperties();

267

}

268

```

269

270

[Watermark Strategies](./watermark-strategies.md)

271

272

### Legacy Connector Support

273

274

Deprecated but maintained interfaces for backward compatibility with existing connector implementations.

275

276

```java { .api }

277

@Deprecated

278

public interface StreamTableSource<T> extends TableSource<T> {

279

default boolean isBounded() { return false; }

280

DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);

281

}

282

283

@Deprecated

284

public interface StreamTableSink<T> extends TableSink<T> {

285

DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);

286

}

287

```

288

289

[Legacy Connector Support](./legacy-connector-support.md)

290

291

## Types

292

293

```java { .api }

294

public interface StreamStatementSet extends StatementSet {

295

@Override

296

StreamStatementSet addInsertSql(String statement);

297

@Override

298

StreamStatementSet addInsert(String targetPath, Table table);

299

@Override

300

StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);

301

@Override

302

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);

303

@Override

304

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);

305

void attachAsDataStream();

306

}

307

308

public class Schema {

309

public static Schema.Builder newBuilder();

310

// Schema definition for DataStream to Table conversions

311

}

312

313

public enum ChangelogMode {

314

INSERT_ONLY,

315

UPSERT,

316

ALL;

317

318

public static ChangelogMode insertOnly();

319

public static ChangelogMode upsert();

320

public static ChangelogMode all();

321

}

322

323

public abstract class WatermarkStrategy {

324

public abstract Map<String, String> toProperties();

325

}

326

327

public interface ParallelismProvider {

328

Optional<Integer> getParallelism();

329

}

330

```