or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md

core-table-operations.mddocs/

0

# Core Table Operations

1

2

This document covers the essential table environment setup, table creation, and basic table operations in Apache Flink Table Uber Blink.

3

4

## Table Environment Creation

5

6

The TableEnvironment is the main entry point for all table operations.

7

8

### Pure Table Environment

9

10

```java { .api }

11

static TableEnvironment create(EnvironmentSettings settings);

12

static TableEnvironment create(Configuration configuration);

13

```

14

15

**Usage:**

16

17

```java

18

// Create with settings builder

19

EnvironmentSettings settings = EnvironmentSettings.newInstance()

20

.useBlinkPlanner()

21

.inStreamingMode()

22

.build();

23

TableEnvironment tEnv = TableEnvironment.create(settings);

24

25

// Create with configuration

26

Configuration config = new Configuration();

27

config.setString("table.sql-dialect", "default");

28

TableEnvironment tEnv = TableEnvironment.create(config);

29

```

30

31

## Environment Settings

32

33

```java { .api }

34

class EnvironmentSettings {

35

static EnvironmentSettings.Builder newInstance();

36

37

interface Builder {

38

Builder useBlinkPlanner();

39

Builder inStreamingMode();

40

Builder inBatchMode();

41

Builder withBuiltInCatalogName(String catalogName);

42

Builder withBuiltInDatabaseName(String databaseName);

43

Builder withConfiguration(Configuration configuration);

44

EnvironmentSettings build();

45

}

46

}

47

```

48

49

## SQL Execution

50

51

### Direct SQL Execution

52

53

```java { .api }

54

TableResult executeSql(String statement);

55

Table sqlQuery(String query);

56

```

57

58

**Usage:**

59

60

```java

61

// Execute DDL/DML statements

62

TableResult result = tEnv.executeSql(

63

"CREATE TABLE users (id INT, name STRING, age INT) WITH ('connector' = 'filesystem', 'path' = '/data/users', 'format' = 'json')"

64

);

65

66

// Execute queries returning tables

67

Table userTable = tEnv.sqlQuery("SELECT * FROM users WHERE age > 18");

68

```

69

70

## Table Creation and Management

71

72

### Creating Tables

73

74

```java { .api }

75

void createTable(String path, TableDescriptor descriptor);

76

void createTemporaryTable(String path, TableDescriptor descriptor);

77

void createTemporaryView(String path, Table view);

78

void dropTable(String path);

79

void dropTemporaryTable(String path);

80

void dropTemporaryView(String path);

81

boolean tableExists(String path);

82

```

83

84

### Table Access

85

86

```java { .api }

87

Table from(String path);

88

Table scan(String... tablePath);

89

Table fromValues(Object... values);

90

Table fromValues(AbstractDataType<?> rowType, Object... values);

91

Table fromValues(DataType rowType, Object... values);

92

```

93

94

**Usage:**

95

96

```java

97

// Create table with descriptor

98

TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")

99

.schema(Schema.newBuilder()

100

.column("id", DataTypes.INT())

101

.column("name", DataTypes.STRING())

102

.column("age", DataTypes.INT())

103

.build())

104

.option("path", "/data/users")

105

.option("format", "json")

106

.build();

107

108

tEnv.createTable("users", descriptor);

109

110

// Access existing table

111

Table userTable = tEnv.from("users");

112

113

// Create table from values

114

Table valuesTable = tEnv.fromValues(

115

DataTypes.ROW(

116

DataTypes.FIELD("id", DataTypes.INT()),

117

DataTypes.FIELD("name", DataTypes.STRING())

118

),

119

Row.of(1, "Alice"),

120

Row.of(2, "Bob")

121

);

122

123

// Create temporary view

124

tEnv.createTemporaryView("user_view", userTable.where($"age".isGreater(18)));

125

```

126

127

## Basic Table Operations

128

129

The Table interface provides fluent API for data transformations.

130

131

### Selection and Projection

132

133

```java { .api }

134

interface Table {

135

Table select(Expression... fields);

136

Table addColumns(Expression... fields);

137

Table addOrReplaceColumns(Expression... fields);

138

Table renameColumns(Expression... fields);

139

Table dropColumns(Expression... fields);

140

}

141

```

142

143

### Filtering and Sorting

144

145

```java { .api }

146

interface Table {

147

Table where(Expression predicate);

148

Table filter(Expression predicate);

149

Table distinct();

150

Table orderBy(Expression... fields);

151

Table offset(int offset);

152

Table fetch(int fetch);

153

Table limit(int fetch);

154

}

155

```

156

157

### Grouping and Aggregation

158

159

```java { .api }

160

interface Table {

161

GroupedTable groupBy(Expression... fields);

162

WindowGroupedTable window(GroupWindow window);

163

}

164

165

interface GroupedTable {

166

Table select(Expression... fields);

167

AggregatedTable aggregate(Expression aggregateFunction);

168

FlatAggregateTable flatAggregate(Expression tableAggregateFunction);

169

}

170

```

171

172

**Usage:**

173

174

```java

175

Table users = tEnv.from("users");

176

177

// Select specific columns

178

Table result = users.select($("name"), $("age"));

179

180

// Filter data

181

Table adults = users.where($("age").isGreaterOrEqual(18));

182

183

// Group and aggregate

184

Table ageGroups = users

185

.groupBy($("age"))

186

.select($("age"), $("age").count().as("count"));

187

```

188

189

## Joins

190

191

```java { .api }

192

interface Table {

193

Table join(Table right);

194

Table join(Table right, Expression joinPredicate);

195

Table leftOuterJoin(Table right);

196

Table leftOuterJoin(Table right, Expression joinPredicate);

197

Table rightOuterJoin(Table right);

198

Table rightOuterJoin(Table right, Expression joinPredicate);

199

Table fullOuterJoin(Table right);

200

Table fullOuterJoin(Table right, Expression joinPredicate);

201

}

202

```

203

204

**Usage:**

205

206

```java

207

Table users = tEnv.from("users");

208

Table orders = tEnv.from("orders");

209

210

// Inner join

211

Table userOrders = users.join(orders, $("users.id").isEqual($("orders.user_id")));

212

213

// Left outer join

214

Table usersWithOrders = users.leftOuterJoin(orders, $("users.id").isEqual($("orders.user_id")));

215

```

216

217

## Set Operations

218

219

```java { .api }

220

interface Table {

221

Table unionAll(Table right);

222

Table union(Table right);

223

Table intersect(Table right);

224

Table intersectAll(Table right);

225

Table minus(Table right);

226

Table minusAll(Table right);

227

}

228

```

229

230

## Table Execution

231

232

```java { .api }

233

interface Table {

234

TableResult execute();

235

void executeInsert(String tablePath);

236

TableResult executeInsert(String tablePath, boolean overwrite);

237

CloseableIterator<Row> execute().collect();

238

}

239

```

240

241

## Statement Sets (Multi-Sink Operations)

242

243

For executing multiple DML statements as a single job:

244

245

```java { .api }

246

interface TableEnvironment {

247

StatementSet createStatementSet();

248

}

249

250

interface StatementSet {

251

StatementSet addInsertSql(String statement);

252

StatementSet addInsert(String targetPath, Table table);

253

StatementSet addInsert(String targetPath, Table table, boolean overwrite);

254

String explain(ExplainDetail... extraDetails);

255

TableResult execute();

256

}

257

```

258

259

**Usage:**

260

261

```java

262

// Create statement set for multi-sink execution

263

StatementSet statementSet = tEnv.createStatementSet();

264

265

// Add multiple insert operations

266

statementSet.addInsert("sink_table_1", processedData);

267

statementSet.addInsertSql("INSERT INTO sink_table_2 SELECT * FROM source WHERE condition = true");

268

269

// Execute all operations as single job

270

TableResult result = statementSet.execute();

271

```

272

273

```java

274

Table result = users.select($("name"), $("age")).where($("age").isGreater(21));

275

276

// Execute and print results

277

result.execute().print();

278

279

// Collect results as iterator

280

try (CloseableIterator<Row> iterator = result.execute().collect()) {

281

while (iterator.hasNext()) {

282

Row row = iterator.next();

283

System.out.println(row);

284

}

285

}

286

287

// Insert into another table

288

result.executeInsert("young_adults");

289

```

290

291

## Schema Information

292

293

```java { .api }

294

interface Table {

295

ResolvedSchema getResolvedSchema();

296

TableSchema getSchema(); // Deprecated

297

String explain();

298

String explain(ExplainDetail... extraDetails);

299

}

300

```

301

302

## Table Configuration

303

304

```java { .api }

305

interface TableEnvironment {

306

TableConfig getConfig();

307

Configuration getConfiguration();

308

}

309

310

class TableConfig {

311

Configuration getConfiguration();

312

ZoneId getLocalTimeZone();

313

void setLocalTimeZone(ZoneId zoneId);

314

}

315

```

316

317

## Error Handling

318

319

Common exceptions when working with table operations:

320

321

```java { .api }

322

class TableException extends RuntimeException;

323

class SqlParserException extends TableException;

324

class ValidationException extends TableException;

325

class CodeGenException extends TableException;

326

```

327

328

## Types

329

330

```java { .api }

331

interface Table extends Serializable;

332

333

class ResolvedSchema {

334

List<Column> getColumns();

335

List<String> getColumnNames();

336

List<DataType> getColumnDataTypes();

337

Optional<UniqueConstraint> getPrimaryKey();

338

List<UniqueConstraint> getUniqueConstraints();

339

}

340

341

enum ExplainDetail {

342

CHANGELOG_MODE,

343

ESTIMATED_COST,

344

JSON_EXECUTION_PLAN

345

}

346

347

interface CloseableIterator<T> extends Iterator<T>, AutoCloseable;

348

```