0
# Table Sources and Sinks
1
2
High-level Table API integration for reading from and writing to Hive tables with advanced optimizations including partition pruning, projection pushdown, limit pushdown, and streaming support.
3
4
## Capabilities
5
6
### HiveTableSource
7
8
Table source for reading data from Hive tables with comprehensive optimization support.
9
10
```java { .api }
11
/**
12
* Table source for reading data from Hive tables with optimization support
13
*/
14
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
15
SupportsProjectionPushDown, SupportsLimitPushDown {
16
17
/** Get runtime provider for scanning operations */
18
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
19
20
/** Get supported changelog mode */
21
public ChangelogMode getChangelogMode();
22
23
/** Apply partition pruning optimization */
24
public void applyPartitions(List<Map<String, String>> remainingPartitions);
25
26
/** Apply column projection optimization */
27
public void applyProjection(int[][] projectedFields);
28
29
/** Apply limit pushdown optimization */
30
public void applyLimit(long limit);
31
32
/** Create copy of the source */
33
public DynamicTableSource copy();
34
35
/** Get source summary for planning */
36
public String asSummaryString();
37
}
38
```
39
40
**Usage Examples:**
41
42
```java
43
import org.apache.flink.table.api.TableEnvironment;
44
import org.apache.flink.table.api.Table;
45
46
// Register Hive catalog
47
TableEnvironment tableEnv = TableEnvironment.create(settings);
48
tableEnv.registerCatalog("hive", hiveCatalog);
49
tableEnv.useCatalog("hive");
50
51
// Read from Hive table with SQL
52
Table result = tableEnv.sqlQuery("SELECT id, name FROM my_table WHERE year = '2023'");
53
54
// Read with partition pruning
55
Table filtered = tableEnv.sqlQuery(
56
"SELECT * FROM partitioned_table WHERE partition_col = 'value'"
57
);
58
59
// Read with column projection and limit
60
Table limited = tableEnv.sqlQuery(
61
"SELECT col1, col2 FROM large_table LIMIT 1000"
62
);
63
```
64
65
### HiveTableSink
66
67
Table sink for writing data to Hive tables with partitioning and overwrite support.
68
69
```java { .api }
70
/**
71
* Table sink for writing data to Hive tables with partitioning support
72
*/
73
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
74
75
/** Get runtime provider for writing operations */
76
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
77
78
/** Apply static partitioning */
79
public void applyStaticPartition(Map<String, String> partition);
80
81
/** Apply overwrite mode */
82
public void applyOverwrite(boolean overwrite);
83
84
/** Check if partition grouping is required */
85
public boolean requiresPartitionGrouping(boolean supportsGrouping);
86
87
/** Create copy of the sink */
88
public DynamicTableSink copy();
89
90
/** Get sink summary for planning */
91
public String asSummaryString();
92
}
93
```
94
95
**Usage Examples:**
96
97
```java
98
// Write to Hive table with SQL
99
tableEnv.executeSql("INSERT INTO my_table SELECT * FROM source_table");
100
101
// Write with static partitioning
102
tableEnv.executeSql(
103
"INSERT INTO partitioned_table PARTITION (year = '2023', month = '01') " +
104
"SELECT id, name FROM source_table"
105
);
106
107
// Overwrite existing data
108
tableEnv.executeSql("INSERT OVERWRITE my_table SELECT * FROM updated_data");
109
110
// Write with dynamic partitioning
111
tableEnv.executeSql(
112
"INSERT INTO partitioned_table " +
113
"SELECT id, name, year, month FROM source_with_partitions"
114
);
115
```
116
117
### HiveLookupTableSource
118
119
Specialized table source for temporal joins and lookup operations with Hive tables.
120
121
```java { .api }
122
/**
123
* Lookup table source for temporal joins with Hive tables
124
* Extends HiveTableSource and implements LookupTableSource interface
125
*/
126
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
127
128
/** Get lookup runtime provider */
129
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
130
131
/** Create copy of the lookup source */
132
public DynamicTableSource copy();
133
}
134
```
135
136
**Usage Examples:**
137
138
```java
139
// Temporal join with Hive dimension table
140
Table orders = tableEnv.from("Orders");
141
Table result = tableEnv.sqlQuery(
142
"SELECT o.*, d.description " +
143
"FROM Orders AS o " +
144
"JOIN dimension_table FOR SYSTEM_TIME AS OF o.proc_time AS d " +
145
"ON o.product_id = d.id"
146
);
147
```
148
149
### HiveDynamicTableFactory
150
151
Factory for creating Hive table sources and sinks from configuration.
152
153
```java { .api }
154
/**
155
* Factory for creating Hive dynamic table sources and sinks
156
*/
157
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
158
159
/** Create dynamic table source */
160
public DynamicTableSource createDynamicTableSource(Context context);
161
162
/** Create dynamic table sink */
163
public DynamicTableSink createDynamicTableSink(Context context);
164
165
/** Get factory identifier */
166
public String factoryIdentifier();
167
168
/** Get required configuration options */
169
public Set<ConfigOption<?>> requiredOptions();
170
171
/** Get optional configuration options */
172
public Set<ConfigOption<?>> optionalOptions();
173
}
174
```
175
176
### Legacy Table Factory
177
178
Backward compatibility support for older Flink versions.
179
180
```java { .api }
181
/**
182
* Legacy table factory for backward compatibility
183
* @deprecated Use HiveDynamicTableFactory instead
184
*/
185
@Deprecated
186
public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {
187
188
/** Create table source */
189
public TableSource<Row> createTableSource(Context context);
190
191
/** Create table sink */
192
public TableSink<Row> createTableSink(Context context);
193
194
/** Get required properties */
195
public Map<String, String> requiredContext();
196
197
/** Get supported properties */
198
public List<String> supportedProperties();
199
}
200
```
201
202
## Optimization Features
203
204
### Partition Pushdown
205
206
The connector automatically prunes partitions based on SQL WHERE clauses, significantly reducing data scanning:
207
208
```java
209
// Only scans partitions matching the filter
210
SELECT * FROM sales WHERE year = 2023 AND month = 'January'
211
```
212
213
### Projection Pushdown
214
215
Column projection reduces data transfer by only reading required columns:
216
217
```java
218
// Only reads 'id' and 'name' columns
219
SELECT id, name FROM wide_table WHERE active = true
220
```
221
222
### Limit Pushdown
223
224
Limit operations are pushed to the source to reduce data processing:
225
226
```java
227
// Stops reading after 100 records
228
SELECT * FROM large_table LIMIT 100
229
```
230
231
### Streaming Support
232
233
Tables can be read in streaming mode for continuous processing:
234
235
```java
236
// Configure for streaming reads
237
tableEnv.getConfig().getConfiguration().setString(
238
"table.exec.source.idle-timeout", "10s"
239
);
240
```