0
# Table Sources
1
2
Reading data from Hive tables with support for both batch and streaming modes, partition pruning, projection pushdown, and lookup joins. Provides comprehensive integration with Flink's table ecosystem while leveraging Hive's storage capabilities.
3
4
## Capabilities
5
6
### HiveTableSource
7
8
Primary table source for reading Hive tables in streaming mode with continuous partition monitoring.
9
10
```java { .api }
11
/**
12
* Table source for reading data from Hive tables in streaming mode
13
* Supports continuous partition monitoring and various optimizations
14
*/
15
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
16
/**
17
* Creates HiveTableSource for streaming Hive table access
18
* @param jobConf - Hadoop job configuration
19
* @param conf - Flink configuration
20
* @param tablePath - Path to the Hive table
21
* @param catalogTable - Catalog table metadata
22
*/
23
public HiveTableSource(JobConf jobConf, ReadableConfig conf, ObjectPath tablePath, CatalogTable catalogTable);
24
25
/**
26
* Get the scan runtime provider for reading data
27
* @param scanContext - Context for scan operation
28
* @return ScanRuntimeProvider for data stream creation
29
*/
30
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
31
32
/**
33
* Get the changelog mode supported by this source
34
* @return ChangelogMode indicating supported change types
35
*/
36
public ChangelogMode getChangelogMode();
37
38
/**
39
* Copy this source with different configuration
40
* @return New HiveTableSource instance
41
*/
42
public DynamicTableSource copy();
43
44
/**
45
* Get string summary of this table source
46
* @return Human-readable description
47
*/
48
public String asSummaryString();
49
}
50
```
51
52
### HiveLookupTableSource
53
54
Table source with both scan and lookup capabilities for dimension table use cases.
55
56
```java { .api }
57
/**
58
* Table source with both scan and lookup capabilities for Hive tables
59
* Ideal for dimension tables used in joins
60
*/
61
public class HiveLookupTableSource implements LookupTableSource, ScanTableSource {
62
/**
63
* Creates HiveLookupTableSource for scan and lookup operations
64
* @param jobConf - Hadoop job configuration
65
* @param conf - Flink configuration
66
* @param tablePath - Path to the Hive table
67
* @param catalogTable - Catalog table metadata
68
*/
69
public HiveLookupTableSource(JobConf jobConf, ReadableConfig conf, ObjectPath tablePath, CatalogTable catalogTable);
70
71
/**
72
* Get the lookup runtime provider for join operations
73
* @param context - Context for lookup operation
74
* @return LookupRuntimeProvider for lookup function creation
75
*/
76
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
77
78
/**
79
* Get the scan runtime provider for reading data
80
* @param scanContext - Context for scan operation
81
* @return ScanRuntimeProvider for data stream creation
82
*/
83
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
84
85
/**
86
* Get the changelog mode supported by this source
87
* @return ChangelogMode indicating supported change types
88
*/
89
public ChangelogMode getChangelogMode();
90
91
/**
92
* Copy this source with different configuration
93
* @return New HiveLookupTableSource instance
94
*/
95
public DynamicTableSource copy();
96
}
97
```
98
99
### Partition Pushdown Support
100
101
Interface for optimizing queries by pushing partition filters to the source.
102
103
```java { .api }
104
/**
105
* Apply partition pushdown optimization
106
* Filters data at the source level based on partition specifications
107
* @param remainingPartitions - List of partition specs that remain after filtering
108
*/
109
public void applyPartitions(List<Map<String, String>> remainingPartitions);
110
111
/**
112
* Check if nested projection is supported
113
* @return true if nested field projection is supported
114
*/
115
public boolean supportsNestedProjection();
116
```
117
118
### Projection Pushdown Support
119
120
Interface for optimizing queries by pushing column projections to the source.
121
122
```java { .api }
123
/**
124
* Apply projection pushdown optimization
125
* Only reads specified columns from the underlying storage
126
* @param projectedFields - Array of field indices to project in nested format
127
*/
128
public void applyProjection(int[][] projectedFields);
129
```
130
131
### Limit Pushdown Support
132
133
Interface for optimizing queries by pushing LIMIT operations to the source.
134
135
```java { .api }
136
/**
137
* Apply limit pushdown optimization
138
* Limits the number of records read at the source level
139
* @param limit - Maximum number of records to read
140
*/
141
public void applyLimit(long limit);
142
```
143
144
### Reading Context and Configuration
145
146
Context classes for configuring Hive table reading behavior.
147
148
```java { .api }
149
/**
150
* Context for Hive partition operations
151
*/
152
public class HivePartitionContext {
153
public HivePartitionContext(List<HiveTablePartition> allPartitions, List<HiveTablePartition> remainingPartitions);
154
public List<HiveTablePartition> getAllPartitions();
155
public List<HiveTablePartition> getRemainingPartitions();
156
}
157
158
/**
159
* Context for continuous Hive partition monitoring
160
*/
161
public class HiveContinuousPartitionContext extends HivePartitionFetcherContextBase<HiveTablePartition> {
162
public HiveContinuousPartitionContext(ObjectPath tablePath, CatalogTable catalogTable, List<String> partitionKeys);
163
public List<HiveTablePartition> getPartitions(List<String> partitionValues);
164
}
165
```
166
167
### Input Format Classes
168
169
Low-level input format classes for reading Hive table data.
170
171
```java { .api }
172
/**
173
* Input format for reading Hive table data
174
* Handles various Hive file formats and SerDes
175
*/
176
public class HiveTableInputFormat extends RichInputFormat<RowData, HiveTableInputSplit> {
177
/**
178
* Open input format for reading
179
* @param split - Input split to read
180
* @throws IOException if open fails
181
*/
182
public void open(HiveTableInputSplit split) throws IOException;
183
184
/**
185
* Check if more records are available
186
* @return true if more records available
187
* @throws IOException if check fails
188
*/
189
public boolean reachedEnd() throws IOException;
190
191
/**
192
* Read next record
193
* @param reuse - Reusable RowData object
194
* @return Next RowData record
195
* @throws IOException if read fails
196
*/
197
public RowData nextRecord(RowData reuse) throws IOException;
198
199
/**
200
* Close input format
201
* @throws IOException if close fails
202
*/
203
public void close() throws IOException;
204
}
205
206
/**
207
* Input split for Hive table reading
208
*/
209
public class HiveTableInputSplit implements InputSplit {
210
public HiveTableInputSplit(int splitNumber, Path path, long start, long length, String[] hosts);
211
public int getSplitNumber();
212
public String[] getHostnames();
213
}
214
```
215
216
### Split Reader Implementations
217
218
Specialized readers for different Hive file formats.
219
220
```java { .api }
221
/**
222
* Split reader interface for Hive formats
223
*/
224
public interface SplitReader<T> {
225
/**
226
* Read next record from split
227
* @return Next record or null if end reached
228
* @throws IOException if read fails
229
*/
230
T read() throws IOException;
231
232
/**
233
* Close the reader
234
* @throws IOException if close fails
235
*/
236
void close() throws IOException;
237
}
238
239
/**
240
* Split reader for ORC files with vectorization
241
*/
242
public class HiveVectorizedOrcSplitReader implements SplitReader<RowData> {
243
public HiveVectorizedOrcSplitReader(HiveShim hiveShim, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split);
244
}
245
246
/**
247
* Split reader for Parquet files with vectorization
248
*/
249
public class HiveVectorizedParquetSplitReader implements SplitReader<RowData> {
250
public HiveVectorizedParquetSplitReader(HiveShim hiveShim, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split);
251
}
252
253
/**
254
* Split reader for MapReduce-based formats
255
*/
256
public class HiveMapredSplitReader implements SplitReader<RowData> {
257
public HiveMapredSplitReader(JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split, HiveShim hiveShim);
258
}
259
```
260
261
**Usage Examples:**
262
263
```java
264
import org.apache.flink.table.api.TableEnvironment;
265
import org.apache.flink.table.catalog.hive.HiveCatalog;
266
import org.apache.flink.configuration.Configuration;
267
268
// Set up table environment with Hive catalog
269
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
270
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
271
tableEnv.registerCatalog("hive", hiveCatalog);
272
tableEnv.useCatalog("hive");
273
274
// Query Hive table with automatic source optimization
275
Table result = tableEnv.sqlQuery(
276
"SELECT customer_id, order_total " +
277
"FROM hive_catalog.sales.orders " +
278
"WHERE partition_date >= '2023-01-01' " +
279
"AND order_total > 100.0 " +
280
"LIMIT 1000"
281
);
282
283
// The HiveTableSource will automatically apply:
284
// - Partition pushdown (partition_date >= '2023-01-01')
285
// - Projection pushdown (only customer_id, order_total columns)
286
// - Limit pushdown (LIMIT 1000)
287
288
result.execute().print();
289
```
290
291
```java
292
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
293
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
294
295
// Set up streaming environment for continuous monitoring
296
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
297
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
298
299
// Register Hive catalog
300
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
301
tableEnv.registerCatalog("hive", hiveCatalog);
302
tableEnv.useCatalog("hive");
303
304
// Enable continuous partition monitoring
305
tableEnv.getConfig().getConfiguration().setBoolean("streaming-source.enable", true);
306
tableEnv.getConfig().getConfiguration().setString("streaming-source.partition.include", "all");
307
308
// Stream from Hive table with partition monitoring
309
Table stream = tableEnv.sqlQuery(
310
"SELECT event_time, user_id, action " +
311
"FROM hive_catalog.events.user_actions"
312
);
313
314
// Convert to DataStream for further processing
315
DataStream<Row> dataStream = tableEnv.toAppendStream(stream, Row.class);
316
dataStream.print();
317
318
env.execute("Hive Streaming Source Example");
319
```
320
321
## Types
322
323
```java { .api }
324
public class HiveTablePartition {
325
public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, String> partitionSpec);
326
public StorageDescriptor getStorageDescriptor();
327
public Map<String, String> getPartitionSpec();
328
public String getLocation();
329
}
330
331
public class HiveSourceSplit implements SourceSplit {
332
public String splitId();
333
public HiveTableInputSplit getHiveTableInputSplit();
334
}
335
336
public interface ScanContext {
337
DataTypeFactory getDataTypeFactory();
338
}
339
340
public interface LookupContext {
341
String[] getKeys();
342
DataTypeFactory getDataTypeFactory();
343
}
344
345
public interface ScanRuntimeProvider extends DynamicTableSource.RuntimeProvider {
346
// Marker interface for scan providers
347
}
348
349
public interface LookupRuntimeProvider extends DynamicTableSource.RuntimeProvider {
350
// Marker interface for lookup providers
351
}
352
```