0
# Unified Data Source
1
2
Lower-level DataStream API integration providing fine-grained control over Hive data processing with custom formats, transformations, and advanced source configuration options.
3
4
## Capabilities
5
6
### HiveSource
7
8
Unified data source for reading Hive tables with custom data types and advanced configuration.
9
10
```java { .api }
11
/**
12
* Unified data source for reading Hive tables with custom data types
13
*/
14
@PublicEvolving
15
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
16
17
/** Get split serializer for checkpoint operations */
18
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
19
20
/** Get enumerator checkpoint serializer */
21
public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();
22
23
/** Create split enumerator for assigning splits to readers */
24
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
25
26
/** Restore split enumerator from checkpoint */
27
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext, PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
28
}
29
```
30
31
**Usage Examples:**
32
33
```java
34
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
35
import org.apache.flink.streaming.api.datastream.DataStream;
36
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
37
import org.apache.flink.connectors.hive.HiveSource;
38
import org.apache.flink.connectors.hive.HiveSourceBuilder;
39
40
// Create streaming environment
41
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
42
43
// Build and use HiveSource
44
HiveSource<RowData> source = new HiveSourceBuilder()
45
.setProjectedFields(new int[]{0, 1, 2}) // Select specific columns
46
.setLimit(10000L) // Limit number of records
47
.buildWithDefaultBulkFormat();
48
49
// Create DataStream from source
50
DataStream<RowData> stream = env.fromSource(
51
source,
52
WatermarkStrategy.noWatermarks(),
53
"hive-source"
54
);
55
56
// Process the stream
57
stream.map(row -> {
58
// Custom processing logic
59
return processRow(row);
60
}).print();
61
62
env.execute("Hive Streaming Job");
63
```
64
65
### HiveSourceBuilder
66
67
Builder pattern for constructing HiveSource instances with various configuration options.
68
69
```java { .api }
70
/**
71
* Builder for constructing HiveSource instances with configuration options
72
*/
73
@PublicEvolving
74
public class HiveSourceBuilder {
75
76
/**
77
* Set projected fields for column pruning
78
* @param projectedFields - Array of column indices to read
79
* @return Builder instance for chaining
80
*/
81
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
82
83
/**
84
* Set limit for number of records to read
85
* @param limit - Maximum number of records to read
86
* @return Builder instance for chaining
87
*/
88
public HiveSourceBuilder setLimit(Long limit);
89
90
/**
91
* Set specific partitions to read from
92
* @param partitions - List of partitions to include
93
* @return Builder instance for chaining
94
*/
95
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
96
97
/**
98
* Build source with default bulk format
99
* @return Configured HiveSource instance
100
*/
101
public <T> HiveSource<T> buildWithDefaultBulkFormat();
102
}
103
```
104
105
**Usage Examples:**
106
107
```java
108
import org.apache.flink.connectors.hive.HiveTablePartition;
109
110
// Build source with column projection
111
HiveSource<RowData> projectedSource = new HiveSourceBuilder()
112
.setProjectedFields(new int[]{0, 2, 5}) // Read columns 0, 2, and 5
113
.buildWithDefaultBulkFormat();
114
115
// Build source with partition filtering
116
List<HiveTablePartition> partitions = Arrays.asList(
117
getPartition("year=2023", "month=01"),
118
getPartition("year=2023", "month=02")
119
);
120
121
HiveSource<RowData> partitionedSource = new HiveSourceBuilder()
122
.setPartitions(partitions)
123
.setLimit(5000L)
124
.buildWithDefaultBulkFormat();
125
126
// Build source with all optimizations
127
HiveSource<RowData> optimizedSource = new HiveSourceBuilder()
128
.setProjectedFields(new int[]{0, 1, 3})
129
.setPartitions(selectedPartitions)
130
.setLimit(1000L)
131
.buildWithDefaultBulkFormat();
132
```
133
134
### HiveTablePartition
135
136
Represents a Hive table partition with metadata and storage information.
137
138
```java { .api }
139
/**
140
* Represents a Hive table partition with metadata
141
*/
142
@PublicEvolving
143
public class HiveTablePartition {
144
145
/** Get storage descriptor for the partition */
146
public StorageDescriptor getStorageDescriptor();
147
148
/** Get partition specification as key-value pairs */
149
public LinkedHashMap<String, String> getPartitionSpec();
150
151
/** Get table properties */
152
public Properties getTableProperties();
153
154
/** Get partition location */
155
public String getLocation();
156
157
/** Check if partition is stored in a specific format */
158
public boolean isStoredAsSubDirectories();
159
}
160
```
161
162
### HiveSourceSplit
163
164
Represents a split for reading from Hive sources, containing partition and file information.
165
166
```java { .api }
167
/**
168
* Represents a split for reading from Hive sources
169
*/
170
@PublicEvolving
171
public class HiveSourceSplit implements SourceSplit {
172
173
/** Get unique split identifier */
174
public String splitId();
175
176
/** Get associated Hive table partition */
177
public HiveTablePartition getHiveTablePartition();
178
179
/** Get file splits within this partition */
180
public List<FileSplit> getFileSplits();
181
182
/** Get reader schema for this split */
183
public TableSchema getReaderSchema();
184
}
185
```
186
187
## Advanced Usage Patterns
188
189
### Streaming Mode Configuration
190
191
Configure continuous monitoring of Hive tables for new data:
192
193
```java
194
// Enable streaming mode with partition monitoring
195
Configuration config = new Configuration();
196
config.setString("table.exec.source.idle-timeout", "10s");
197
config.setBoolean("table.exec.hive.infer-source-parallelism", true);
198
199
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
200
env.configure(config);
201
```
202
203
### Custom Data Processing
204
205
Process Hive data with custom transformations:
206
207
```java
208
DataStream<RowData> hiveStream = env.fromSource(hiveSource, watermarkStrategy, "hive");
209
210
// Custom processing with DataStream API
211
DataStream<CustomRecord> processed = hiveStream
212
.map(new HiveRowDataMapper())
213
.filter(record -> record.isValid())
214
.keyBy(CustomRecord::getKey)
215
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
216
.aggregate(new CustomAggregator());
217
```
218
219
### Batch Processing Optimization
220
221
Optimize for large batch processing workloads:
222
223
```java
224
// Configure for batch processing
225
HiveSource<RowData> batchSource = new HiveSourceBuilder()
226
.setProjectedFields(requiredColumns)
227
.setLimit(null) // No limit for full table scan
228
.buildWithDefaultBulkFormat();
229
230
// Use with batch execution environment
231
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
232
batchEnv.getConfig().enableObjectReuse(); // Optimize for batch
233
```
234
235
### Partition Discovery
236
237
Automatically discover and process new partitions:
238
239
```java
240
// Configure partition discovery interval
241
config.setString("partition.discovery.interval-millis", "60000"); // 1 minute
242
243
// Source will automatically discover new partitions
244
HiveSource<RowData> discoverySource = new HiveSourceBuilder()
245
.buildWithDefaultBulkFormat();
246
```