or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdconfiguration.mddata-source.mdfunctions.mdindex.mdtable-api.md

table-api.mddocs/

0

# Table Sources and Sinks

1

2

High-level Table API integration for reading from and writing to Hive tables with advanced optimizations including partition pruning, projection pushdown, limit pushdown, and streaming support.

3

4

## Capabilities

5

6

### HiveTableSource

7

8

Table source for reading data from Hive tables with comprehensive optimization support.

9

10

```java { .api }

11

/**

12

* Table source for reading data from Hive tables with optimization support

13

*/

14

public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,

15

SupportsProjectionPushDown, SupportsLimitPushDown {

16

17

/** Get runtime provider for scanning operations */

18

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

19

20

/** Get supported changelog mode */

21

public ChangelogMode getChangelogMode();

22

23

/** Apply partition pruning optimization */

24

public void applyPartitions(List<Map<String, String>> remainingPartitions);

25

26

/** Apply column projection optimization */

27

public void applyProjection(int[][] projectedFields);

28

29

/** Apply limit pushdown optimization */

30

public void applyLimit(long limit);

31

32

/** Create copy of the source */

33

public DynamicTableSource copy();

34

35

/** Get source summary for planning */

36

public String asSummaryString();

37

}

38

```

39

40

**Usage Examples:**

41

42

```java

43

import org.apache.flink.table.api.TableEnvironment;

44

import org.apache.flink.table.api.Table;

45

46

// Register Hive catalog

47

TableEnvironment tableEnv = TableEnvironment.create(settings);

48

tableEnv.registerCatalog("hive", hiveCatalog);

49

tableEnv.useCatalog("hive");

50

51

// Read from Hive table with SQL

52

Table result = tableEnv.sqlQuery("SELECT id, name FROM my_table WHERE year = '2023'");

53

54

// Read with partition pruning

55

Table filtered = tableEnv.sqlQuery(

56

"SELECT * FROM partitioned_table WHERE partition_col = 'value'"

57

);

58

59

// Read with column projection and limit

60

Table limited = tableEnv.sqlQuery(

61

"SELECT col1, col2 FROM large_table LIMIT 1000"

62

);

63

```

64

65

### HiveTableSink

66

67

Table sink for writing data to Hive tables with partitioning and overwrite support.

68

69

```java { .api }

70

/**

71

* Table sink for writing data to Hive tables with partitioning support

72

*/

73

public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

74

75

/** Get runtime provider for writing operations */

76

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

77

78

/** Apply static partitioning */

79

public void applyStaticPartition(Map<String, String> partition);

80

81

/** Apply overwrite mode */

82

public void applyOverwrite(boolean overwrite);

83

84

/** Check if partition grouping is required */

85

public boolean requiresPartitionGrouping(boolean supportsGrouping);

86

87

/** Create copy of the sink */

88

public DynamicTableSink copy();

89

90

/** Get sink summary for planning */

91

public String asSummaryString();

92

}

93

```

94

95

**Usage Examples:**

96

97

```java

98

// Write to Hive table with SQL

99

tableEnv.executeSql("INSERT INTO my_table SELECT * FROM source_table");

100

101

// Write with static partitioning

102

tableEnv.executeSql(

103

"INSERT INTO partitioned_table PARTITION (year = '2023', month = '01') " +

104

"SELECT id, name FROM source_table"

105

);

106

107

// Overwrite existing data

108

tableEnv.executeSql("INSERT OVERWRITE my_table SELECT * FROM updated_data");

109

110

// Write with dynamic partitioning

111

tableEnv.executeSql(

112

"INSERT INTO partitioned_table " +

113

"SELECT id, name, year, month FROM source_with_partitions"

114

);

115

```

116

117

### HiveLookupTableSource

118

119

Specialized table source for temporal joins and lookup operations with Hive tables.

120

121

```java { .api }

122

/**

123

* Lookup table source for temporal joins with Hive tables

124

* Extends HiveTableSource and implements LookupTableSource interface

125

*/

126

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {

127

128

/** Get lookup runtime provider */

129

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

130

131

/** Create copy of the lookup source */

132

public DynamicTableSource copy();

133

}

134

```

135

136

**Usage Examples:**

137

138

```java

139

// Temporal join with Hive dimension table

140

Table orders = tableEnv.from("Orders");

141

Table result = tableEnv.sqlQuery(

142

"SELECT o.*, d.description " +

143

"FROM Orders AS o " +

144

"JOIN dimension_table FOR SYSTEM_TIME AS OF o.proc_time AS d " +

145

"ON o.product_id = d.id"

146

);

147

```

148

149

### HiveDynamicTableFactory

150

151

Factory for creating Hive table sources and sinks from configuration.

152

153

```java { .api }

154

/**

155

* Factory for creating Hive dynamic table sources and sinks

156

*/

157

public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

158

159

/** Create dynamic table source */

160

public DynamicTableSource createDynamicTableSource(Context context);

161

162

/** Create dynamic table sink */

163

public DynamicTableSink createDynamicTableSink(Context context);

164

165

/** Get factory identifier */

166

public String factoryIdentifier();

167

168

/** Get required configuration options */

169

public Set<ConfigOption<?>> requiredOptions();

170

171

/** Get optional configuration options */

172

public Set<ConfigOption<?>> optionalOptions();

173

}

174

```

175

176

### Legacy Table Factory

177

178

Backward compatibility support for older Flink versions.

179

180

```java { .api }

181

/**

182

* Legacy table factory for backward compatibility

183

* @deprecated Use HiveDynamicTableFactory instead

184

*/

185

@Deprecated

186

public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {

187

188

/** Create table source */

189

public TableSource<Row> createTableSource(Context context);

190

191

/** Create table sink */

192

public TableSink<Row> createTableSink(Context context);

193

194

/** Get required properties */

195

public Map<String, String> requiredContext();

196

197

/** Get supported properties */

198

public List<String> supportedProperties();

199

}

200

```

201

202

## Optimization Features

203

204

### Partition Pushdown

205

206

The connector automatically prunes partitions based on SQL WHERE clauses, significantly reducing data scanning:

207

208

```java

209

// Only scans partitions matching the filter

210

SELECT * FROM sales WHERE year = 2023 AND month = 'January'

211

```

212

213

### Projection Pushdown

214

215

Column projection reduces data transfer by only reading required columns:

216

217

```java

218

// Only reads 'id' and 'name' columns

219

SELECT id, name FROM wide_table WHERE active = true

220

```

221

222

### Limit Pushdown

223

224

Limit operations are pushed to the source to reduce data processing:

225

226

```java

227

// Stops reading after 100 records

228

SELECT * FROM large_table LIMIT 100

229

```

230

231

### Streaming Support

232

233

Tables can be read in streaming mode for continuous processing:

234

235

```java

236

// Configure for streaming reads

237

tableEnv.getConfig().getConfiguration().setString(

238

"table.exec.source.idle-timeout", "10s"

239

);

240

```