0
# Table Sources and Sinks
1
2
Core table connector implementations providing comprehensive read and write capabilities for Hive tables. These classes support both batch and streaming processing modes, with advanced features including partition management, performance optimizations, and streaming ingestion capabilities.
3
4
## Capabilities
5
6
### Hive Table Source
7
8
Primary table source implementation for reading data from Hive tables, supporting both batch and streaming modes with advanced pushdown optimizations.
9
10
```java { .api }
11
/**
12
* Table source for reading Hive tables with comprehensive optimization support
13
* Supports: partition pushdown, projection pushdown, limit pushdown, statistics reporting
14
*/
15
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
16
SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport {
17
18
/**
19
* Creates table source scan runtime provider for execution
20
* @param scanContext Context containing runtime information
21
* @return ScanRuntimeProvider for execution
22
*/
23
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
24
25
/**
26
* Applies partition pruning to reduce data scanning
27
* @param remainingPartitions Partitions that remain after pruning
28
*/
29
public void applyPartitions(List<Map<String, String>> remainingPartitions);
30
31
/**
32
* Applies column projection to minimize data transfer
33
* @param projectedFields Array of projected field indices
34
* @param nestedFields Nested field projections
35
*/
36
public void applyProjection(int[][] projectedFields, DataType[] nestedFields);
37
38
/**
39
* Applies limit pushdown for query optimization
40
* @param limit Maximum number of records to read
41
*/
42
public void applyLimit(long limit);
43
44
/**
45
* Reports table statistics for cost-based optimization
46
* @param reportContext Context for statistics reporting
47
* @return Table statistics including row count and column statistics
48
*/
49
public TableStats reportStatistics(StatisticReportContext reportContext);
50
51
/**
52
* Creates a copy of this table source for planning
53
* @return Deep copy of the table source
54
*/
55
public DynamicTableSource copy();
56
57
/**
58
* Returns string summary of the table source
59
* @return Human-readable description
60
*/
61
public String asSummaryString();
62
}
63
```
64
65
**Usage Examples:**
66
67
```sql
68
-- Batch reading with partition pruning
69
SELECT id, name, amount
70
FROM hive_table
71
WHERE partition_date BETWEEN '2023-01-01' AND '2023-01-31'
72
AND region = 'us-west';
73
74
-- Streaming source configuration
75
CREATE TABLE hive_stream_source (
76
id BIGINT,
77
event_data STRING,
78
event_time TIMESTAMP(3),
79
partition_hour STRING
80
) PARTITIONED BY (partition_hour)
81
WITH (
82
'connector' = 'hive',
83
'streaming-source.enable' = 'true',
84
'streaming-source.partition.include' = 'latest',
85
'streaming-source.monitor-interval' = '10 min',
86
'streaming-source.consume-start-offset' = '2023-01-01 00:00:00'
87
);
88
```
89
90
### Hive Table Sink
91
92
Primary table sink implementation for writing data to Hive tables with comprehensive partitioning and commit policy support.
93
94
```java { .api }
95
/**
96
* Table sink for writing data to Hive tables with partition support
97
* Supports: dynamic partitioning, overwrite modes, custom commit policies
98
*/
99
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
100
101
/**
102
* Creates sink runtime provider for execution
103
* @param sinkContext Context containing runtime information
104
* @return SinkRuntimeProvider for execution
105
*/
106
public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext sinkContext);
107
108
/**
109
* Applies static partition specifications
110
* @param partitions Map of partition column to value
111
*/
112
public void applyStaticPartition(Map<String, String> partitions);
113
114
/**
115
* Configures overwrite mode for the sink
116
* @param overwrite Whether to overwrite existing data
117
*/
118
public void applyOverwrite(boolean overwrite);
119
120
/**
121
* Creates a copy of this table sink for planning
122
* @return Deep copy of the table sink
123
*/
124
public DynamicTableSink copy();
125
126
/**
127
* Returns string summary of the table sink
128
* @return Human-readable description
129
*/
130
public String asSummaryString();
131
}
132
```
133
134
**Usage Examples:**
135
136
```sql
137
-- Writing to partitioned table
138
INSERT INTO hive_partitioned_table
139
PARTITION (year='2023', month='01')
140
SELECT id, name, amount FROM source_table;
141
142
-- Dynamic partitioning with overwrite
143
INSERT OVERWRITE hive_table
144
SELECT id, name, amount, DATE_FORMAT(event_time, 'yyyy-MM-dd') as partition_date
145
FROM streaming_source;
146
147
-- Sink configuration with commit policies
148
CREATE TABLE hive_sink (
149
id BIGINT,
150
data STRING,
151
partition_date STRING
152
) PARTITIONED BY (partition_date)
153
WITH (
154
'connector' = 'hive',
155
'sink.partition-commit.policy.kind' = 'metastore,success-file',
156
'table.exec.hive.sink.statistic-auto-gather.enable' = 'true'
157
);
158
```
159
160
### Hive Source (DataStream API)
161
162
Low-level source implementation using Flink's new Source API, providing fine-grained control over data ingestion and parallelism.
163
164
```java { .api }
165
/**
166
* Generic source implementation using Flink's new Source API
167
* Provides low-level control over data ingestion and split management
168
* @param <T> Output data type
169
*/
170
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit>,
171
ResultTypeQueryable<T> {
172
173
/**
174
* Creates a bounded split enumerator for batch execution
175
* @param enumContext Enumerator context
176
* @return Split enumerator instance
177
*/
178
public SplitEnumerator<HiveSourceSplit, HiveSplitEnumeratorState> createEnumerator(
179
SplitEnumeratorContext<HiveSourceSplit> enumContext);
180
181
/**
182
* Restores split enumerator from checkpoint state
183
* @param enumContext Enumerator context
184
* @param checkpoint Checkpoint state to restore from
185
* @return Restored split enumerator
186
*/
187
public SplitEnumerator<HiveSourceSplit, HiveSplitEnumeratorState> restoreEnumerator(
188
SplitEnumeratorContext<HiveSourceSplit> enumContext,
189
HiveSplitEnumeratorState checkpoint);
190
191
/**
192
* Creates a source reader for processing splits
193
* @param readerContext Reader context
194
* @return Source reader instance
195
*/
196
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);
197
198
/**
199
* Returns the boundedness of this source
200
* @return Boundedness.BOUNDED for batch, CONTINUOUS_UNBOUNDED for streaming
201
*/
202
public Boundedness getBoundedness();
203
204
/**
205
* Returns the produced data type
206
* @return TypeInformation for the output type
207
*/
208
public TypeInformation<T> getProducedType();
209
}
210
```
211
212
**Usage Example:**
213
214
```java
215
import org.apache.flink.connectors.hive.HiveSource;
216
import org.apache.flink.connectors.hive.HiveSourceBuilder;
217
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
218
import org.apache.flink.table.data.RowData;
219
220
// Create streaming environment
221
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
222
223
// Build Hive source
224
HiveSource<RowData> hiveSource = new HiveSourceBuilder()
225
.setTableIdentifier(ObjectIdentifier.of("catalog", "database", "table"))
226
.setHiveConfiguration(hiveConf)
227
.setProjectFields(new int[]{0, 1, 2}) // Project specific columns
228
.setLimit(10000) // Limit number of records
229
.build();
230
231
// Add to execution graph
232
env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")
233
.map(row -> processRow(row))
234
.print();
235
236
env.execute("Hive DataStream Job");
237
```
238
239
### Hive Source Builder
240
241
Builder pattern implementation for constructing HiveSource instances with fluent API for configuration.
242
243
```java { .api }
244
/**
245
* Builder for creating HiveSource instances with fluent configuration API
246
*/
247
public class HiveSourceBuilder {
248
249
/**
250
* Sets the table identifier for the source
251
* @param tableIdentifier Table identifier (catalog.database.table)
252
* @return This builder instance for chaining
253
*/
254
public HiveSourceBuilder setTableIdentifier(ObjectIdentifier tableIdentifier);
255
256
/**
257
* Sets Hive configuration for the source
258
* @param hiveConf Hive configuration object
259
* @return This builder instance for chaining
260
*/
261
public HiveSourceBuilder setHiveConfiguration(HiveConf hiveConf);
262
263
/**
264
* Sets projected field indices to minimize data transfer
265
* @param projectFields Array of field indices to project
266
* @return This builder instance for chaining
267
*/
268
public HiveSourceBuilder setProjectFields(int[] projectFields);
269
270
/**
271
* Sets maximum number of records to read
272
* @param limit Maximum record count
273
* @return This builder instance for chaining
274
*/
275
public HiveSourceBuilder setLimit(long limit);
276
277
/**
278
* Sets specific partitions to read from
279
* @param partitions List of partition specifications
280
* @return This builder instance for chaining
281
*/
282
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
283
284
/**
285
* Builds the configured HiveSource instance
286
* @return Configured HiveSource<RowData> instance
287
*/
288
public HiveSource<RowData> build();
289
}
290
```
291
292
### Hive Lookup Table Source
293
294
Specialized table source for dimension table lookups in streaming joins, providing caching and optimized access patterns.
295
296
```java { .api }
297
/**
298
* Lookup table source for dimension table joins with caching support
299
* Extends HiveTableSource with lookup join capabilities
300
*/
301
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
302
303
/**
304
* Creates lookup runtime provider for join operations
305
* @param lookupContext Context containing lookup configuration
306
* @return LookupRuntimeProvider for execution
307
*/
308
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);
309
310
/**
311
* Creates a copy of this lookup table source
312
* @return Deep copy of the lookup table source
313
*/
314
public DynamicTableSource copy();
315
}
316
```
317
318
**Usage Example:**
319
320
```sql
321
-- Configure lookup join with caching
322
CREATE TABLE dim_table (
323
id BIGINT PRIMARY KEY NOT ENFORCED,
324
name STRING,
325
category STRING
326
) WITH (
327
'connector' = 'hive',
328
'lookup.join.cache.ttl' = '1 hour'
329
);
330
331
-- Use in temporal join
332
SELECT
333
orders.order_id,
334
orders.amount,
335
dim.name,
336
dim.category
337
FROM orders_stream
338
JOIN dim_table FOR SYSTEM_TIME AS OF orders.proc_time AS dim
339
ON orders.product_id = dim.id;
340
```
341
342
## Performance Optimization Features
343
344
### Partition Pruning
345
- Automatic partition elimination based on WHERE clause predicates
346
- Supports complex partition expressions and date range filtering
347
- Reduces I/O by scanning only relevant partitions
348
349
### Projection Pushdown
350
- Column pruning to minimize data transfer
351
- Supports nested field projection for complex data types
352
- Integrates with columnar storage formats (Parquet, ORC)
353
354
### Limit Pushdown
355
- Pushes LIMIT operations to reduce data scanning
356
- Optimizes TOP-N queries and sampling operations
357
- Combines with other pushdowns for maximum efficiency
358
359
### Statistics Integration
360
- Provides table and column statistics for cost-based optimization
361
- Supports Hive metastore statistics and file-level metadata
362
- Enables intelligent join ordering and execution planning
363
364
### Vectorized Processing
365
- Native vectorized readers for Parquet and ORC formats
366
- Batch processing for improved CPU efficiency
367
- Configurable fallback to MapReduce readers when needed