0
# Table Source and Sink Operations
1
2
Core functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling, projection pushdown, and efficient data processing.
3
4
## Capabilities
5
6
### HiveSource
7
8
Unified data source for reading Hive tables in both bounded (batch) and unbounded (streaming) modes.
9
10
```java { .api }
11
/**
12
* Unified data source for reading Hive tables (bounded/unbounded)
13
* Implements Flink's Source interface for integration with DataStream API
14
*/
15
@PublicEvolving
16
public class HiveSource<T> implements Source<T, HiveSourceSplit, HivePendingSplitsCheckpoint> {
17
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
18
public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> createEnumerator(
19
SplitEnumeratorContext<HiveSourceSplit> enumContext) throws Exception;
20
public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> restoreEnumerator(
21
SplitEnumeratorContext<HiveSourceSplit> enumContext,
22
HivePendingSplitsCheckpoint checkpoint) throws Exception;
23
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext) throws Exception;
24
}
25
```
26
27
### HiveSourceBuilder
28
29
Builder pattern implementation for constructing HiveSource instances with flexible configuration options.
30
31
```java { .api }
32
/**
33
* Builder for constructing HiveSource instances with configuration options
34
* Provides fluent API for setting partitions, projections, and limits
35
*/
36
@PublicEvolving
37
public class HiveSourceBuilder {
38
/**
39
* Create a new HiveSourceBuilder
40
* @param jobConf Hadoop JobConf with Hive configuration
41
* @param flinkConf Flink configuration options
42
* @param hiveVersion Hive version for compatibility (null for auto-detection)
43
* @param dbName Database name
44
* @param tableName Table name
45
* @param tableOptions Additional table options (take precedence over metastore properties)
46
*/
47
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf,
48
String hiveVersion, String dbName,
49
String tableName, Map<String, String> tableOptions);
50
51
/**
52
* Build HiveSource with default RowData bulk format
53
* @return HiveSource configured for RowData processing
54
*/
55
public HiveSource<RowData> buildWithDefaultBulkFormat();
56
57
/**
58
* Build HiveSource with custom bulk format
59
* @param bulkFormat Custom bulk format for reading data
60
* @return HiveSource configured with custom format
61
*/
62
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
63
64
/**
65
* Set specific partitions to read (for batch mode)
66
* @param partitions List of partitions to read
67
* @return Builder instance for chaining
68
*/
69
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
70
71
/**
72
* Set maximum number of records to read
73
* @param limit Maximum record count
74
* @return Builder instance for chaining
75
*/
76
public HiveSourceBuilder setLimit(Long limit);
77
78
/**
79
* Set field projection for reading subset of columns
80
* @param projectedFields Array of field indices to project
81
* @return Builder instance for chaining
82
*/
83
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
84
}
85
```
86
87
**Usage Examples:**
88
89
```java
90
import org.apache.flink.connectors.hive.HiveSourceBuilder;
91
import org.apache.flink.connectors.hive.HiveTablePartition;
92
import org.apache.hadoop.mapred.JobConf;
93
94
// Basic source creation
95
JobConf jobConf = new JobConf();
96
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
97
98
HiveSource<RowData> source = new HiveSourceBuilder(
99
jobConf,
100
new Configuration(),
101
"2.3.9", // hiveVersion
102
"default",
103
"user_events",
104
Collections.emptyMap()
105
).buildWithDefaultBulkFormat();
106
107
// Source with specific partitions and projection
108
List<HiveTablePartition> partitions = Arrays.asList(
109
HiveTablePartition.ofPartition(storageDesc, Map.of("year", "2024", "month", "01"), tableParams),
110
HiveTablePartition.ofPartition(storageDesc, Map.of("year", "2024", "month", "02"), tableParams)
111
);
112
113
HiveSource<RowData> filteredSource = new HiveSourceBuilder(
114
jobConf,
115
new Configuration(),
116
"2.3.9", // hiveVersion
117
"analytics",
118
"sales_data",
119
Collections.emptyMap()
120
)
121
.setPartitions(partitions)
122
.setProjectedFields(new int[]{0, 2, 5}) // Select specific columns
123
.setLimit(100000L) // Limit records
124
.buildWithDefaultBulkFormat();
125
126
// Use in DataStream
127
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
128
DataStream<RowData> stream = env.fromSource(
129
source,
130
WatermarkStrategy.noWatermarks(),
131
"hive-source"
132
);
133
```
134
135
### HiveTableSource
136
137
Dynamic table source implementation for Table API integration with advanced pushdown capabilities.
138
139
```java { .api }
140
/**
141
* Dynamic table source for Hive integration with Table API
142
* Supports partition pushdown, projection pushdown, limit pushdown, and statistics
143
*/
144
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
145
SupportsProjectionPushDown, SupportsLimitPushDown,
146
SupportsStatisticReport, SupportsDynamicFiltering {
147
148
/**
149
* Get scan runtime provider for table scanning
150
* @param scanContext Scan context with runtime information
151
* @return Provider for scan runtime
152
*/
153
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
154
155
/**
156
* Apply partition pruning to reduce data scanning
157
* @param remainingPartitions Partitions remaining after pruning
158
* @return Result with updated source
159
*/
160
public Result applyPartitions(List<Map<String, String>> remainingPartitions);
161
162
/**
163
* Apply field projection to read only required columns
164
* @param projectedFields Array of projected field indices
165
* @param producedDataType Data type after projection
166
* @return Result with updated source
167
*/
168
public Result applyProjection(int[][] projectedFields, DataType producedDataType);
169
170
/**
171
* Apply limit pushdown for result size optimization
172
* @param limit Maximum number of records to read
173
* @return Result with updated source
174
*/
175
public Result applyLimit(long limit);
176
177
/**
178
* Report table statistics for query optimization
179
* @return Table statistics including row count and column statistics
180
*/
181
public ChangelogMode getChangelogMode();
182
183
/**
184
* Get table statistics for cost-based optimization
185
* @return TableStats with row count and column statistics
186
*/
187
public TableStats reportStatistics();
188
}
189
```
190
191
### HiveLookupTableSource
192
193
Specialized table source for lookup join operations with caching support.
194
195
```java { .api }
196
/**
197
* Lookup table source extending HiveTableSource for join operations
198
* Provides temporal table lookup functionality with optional caching
199
*/
200
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
201
202
/**
203
* Get lookup runtime provider for join operations
204
* @param context Lookup context with join information
205
* @return Provider for lookup runtime
206
*/
207
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
208
}
209
```
210
211
### HiveTableSink
212
213
Dynamic table sink for writing data to Hive tables with partitioning and overwrite support.
214
215
```java { .api }
216
/**
217
* Dynamic table sink for writing to Hive tables
218
* Supports static partitioning, overwrite mode, and partition commit policies
219
*/
220
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
221
222
/**
223
* Get sink runtime provider for data writing
224
* @param context Sink context with runtime information
225
* @return Provider for sink runtime
226
*/
227
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
228
229
/**
230
* Apply static partition specification for writing
231
* @param partition Static partition key-value pairs
232
* @return Result with updated sink
233
*/
234
public Result applyStaticPartition(Map<String, String> partition);
235
236
/**
237
* Enable or disable overwrite mode for existing data
238
* @param overwrite Whether to overwrite existing data
239
* @return Result with updated sink configuration
240
*/
241
public Result applyOverwrite(boolean overwrite);
242
243
/**
244
* Configure partition grouping behavior for writing
245
* @param requiresGrouping Whether partition grouping is required
246
* @return Result with updated sink configuration
247
*/
248
public Result requiresPartitionGrouping(boolean requiresGrouping);
249
250
/**
251
* Create copy of sink for plan optimization
252
* @return Copy of the sink
253
*/
254
public DynamicTableSink copy();
255
256
/**
257
* Get human-readable summary of sink configuration
258
* @return Summary string
259
*/
260
public String asSummaryString();
261
}
262
```
263
264
**Usage Examples:**
265
266
```java
267
// Table API source usage
268
TableEnvironment tableEnv = TableEnvironment.create(settings);
269
270
// Register Hive catalog
271
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/etc/hive/conf");
272
tableEnv.registerCatalog("hive", hiveCatalog);
273
tableEnv.useCatalog("hive");
274
275
// Query with automatic source optimization
276
Table result = tableEnv.sqlQuery("""
277
SELECT user_id, event_type, COUNT(*) as event_count
278
FROM user_events
279
WHERE event_date >= '2024-01-01'
280
AND event_type IN ('login', 'purchase')
281
GROUP BY user_id, event_type
282
LIMIT 1000
283
""");
284
285
// Write to Hive table with partitioning
286
Table processedData = tableEnv.fromValues(
287
DataTypes.ROW(
288
DataTypes.FIELD("user_id", DataTypes.STRING()),
289
DataTypes.FIELD("revenue", DataTypes.DECIMAL(10, 2)),
290
DataTypes.FIELD("month", DataTypes.STRING())
291
),
292
Row.of("user123", new BigDecimal("99.99"), "2024-01"),
293
Row.of("user456", new BigDecimal("149.99"), "2024-01")
294
);
295
296
// Insert with automatic sink configuration
297
processedData.executeInsert("analytics.monthly_revenue");
298
```
299
300
### Integration with Flink's Connector Framework
301
302
The Hive connector seamlessly integrates with Flink's unified connector framework:
303
304
```java
305
// Factory registration happens automatically via SPI
306
// HiveDynamicTableFactory is discovered and used by Flink
307
308
// Manual source creation for DataStream API
309
HiveSource<RowData> source = new HiveSourceBuilder(jobConf, config, "2.3.9", "db", "table", Map.of())
310
.setPartitions(specificPartitions)
311
.setProjectedFields(new int[]{0, 1, 3})
312
.buildWithDefaultBulkFormat();
313
314
// Use with Flink's Source API
315
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
316
DataStream<RowData> hiveStream = env.fromSource(
317
source,
318
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
319
"hive-source"
320
);
321
```
322
323
## Advanced Features
324
325
### Streaming Mode Support
326
327
The connector supports continuous monitoring of Hive tables for streaming scenarios:
328
329
```java
330
// Enable streaming mode via configuration
331
Configuration config = new Configuration();
332
config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
333
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
334
335
HiveSource<RowData> streamingSource = new HiveSourceBuilder(
336
jobConf, config, "2.3.9", "db", "streaming_table", Map.of()
337
).buildWithDefaultBulkFormat();
338
```
339
340
### Partition Commit Policies
341
342
Configure how partitions are committed after writing:
343
344
```java
345
// Set partition commit policy
346
Configuration sinkConfig = new Configuration();
347
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
348
349
// Commit policy affects when partitions become visible
350
```
351
352
### Performance Optimization
353
354
```java
355
// Enable parallelism inference for optimal performance
356
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
357
358
// Configure reader fallback for compatibility
359
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
360
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
361
```