0
# Source API
1
2
New Source API implementation for Hive tables providing enhanced control over split enumeration and reading with support for continuous partition monitoring, parallel reading, and efficient split management.
3
4
## Capabilities
5
6
### HiveSource
7
8
Main Source API implementation for reading Hive tables that extends AbstractFileSource and uses the new Flink Source interface.
9
10
```java { .api }
11
/**
12
* A unified data source that reads a hive table. HiveSource works on HiveSourceSplit and
13
* uses BulkFormat to read the data. A built-in BulkFormat is provided to return records in
14
* type of RowData. It's also possible to implement a custom BulkFormat to return data in
15
* different types. Use HiveSourceBuilder to build HiveSource instances.
16
*
17
* @param <T> the type of record returned by this source
18
*/
19
@PublicEvolving
20
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
21
/**
22
* Package-private constructor used by HiveSourceBuilder
23
* @param inputPaths - Array of input paths (typically contains single dummy path)
24
* @param fileEnumerator - Provider for file enumeration
25
* @param splitAssigner - Provider for split assignment
26
* @param readerFormat - BulkFormat for reading records
27
* @param continuousEnumerationSettings - Settings for continuous monitoring (null for batch)
28
* @param jobConf - Hadoop JobConf with Hive configurations
29
* @param tablePath - ObjectPath identifying the Hive table
30
* @param partitionKeys - List of partition key names
31
* @param fetcher - Continuous partition fetcher for streaming mode (can be null)
32
* @param fetcherContext - Context for continuous partition fetching (can be null)
33
*/
34
HiveSource(
35
Path[] inputPaths,
36
FileEnumerator.Provider fileEnumerator,
37
FileSplitAssigner.Provider splitAssigner,
38
BulkFormat<T, HiveSourceSplit> readerFormat,
39
@Nullable ContinuousEnumerationSettings continuousEnumerationSettings,
40
JobConf jobConf,
41
ObjectPath tablePath,
42
List<String> partitionKeys,
43
@Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,
44
@Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext);
45
46
/**
47
* Get serializer for HiveSourceSplit objects
48
* @return Serializer instance for splits
49
*/
50
@Override
51
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
52
53
/**
54
* Get serializer for enumerator checkpoints
55
* @return Serializer for checkpoint state
56
*/
57
@Override
58
public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>>
59
getEnumeratorCheckpointSerializer();
60
61
/**
62
* Create split enumerator for discovering and assigning splits
63
* @param enumContext - Context for enumerator creation
64
* @return SplitEnumerator instance for managing splits
65
*/
66
@Override
67
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>>
68
createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
69
70
/**
71
* Create split enumerator from checkpoint state
72
* @param enumContext - Context for enumerator creation
73
* @param checkpoint - Checkpoint state to restore from
74
* @return SplitEnumerator instance restored from checkpoint
75
*/
76
@Override
77
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>>
78
restoreEnumerator(
79
SplitEnumeratorContext<HiveSourceSplit> enumContext,
80
PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
81
}
82
```
83
84
### HiveSourceBuilder
85
86
Builder pattern for creating configured HiveSource instances with validation and setup.
87
88
```java { .api }
89
/**
90
* Builder to build HiveSource instances.
91
*/
92
@PublicEvolving
93
public class HiveSourceBuilder {
94
/**
95
* Creates a builder to read a hive table using metastore information
96
* @param jobConf - Holds hive and hadoop configurations
97
* @param flinkConf - Holds flink configurations
98
* @param hiveVersion - The version of hive in use, if null will be auto-detected
99
* @param dbName - The name of the database the table belongs to
100
* @param tableName - The name of the table
101
* @param tableOptions - Additional options needed to read the table, which take precedence over table properties stored in metastore
102
*/
103
public HiveSourceBuilder(
104
@Nonnull JobConf jobConf,
105
@Nonnull ReadableConfig flinkConf,
106
@Nullable String hiveVersion,
107
@Nonnull String dbName,
108
@Nonnull String tableName,
109
@Nonnull Map<String, String> tableOptions);
110
111
/**
112
* Creates a builder to read a hive table using catalog table information
113
* @param jobConf - Holds hive and hadoop configurations
114
* @param flinkConf - Holds flink configurations
115
* @param tablePath - Path of the table to be read
116
* @param hiveVersion - The version of hive in use, if null will be auto-detected
117
* @param catalogTable - The table to be read
118
*/
119
public HiveSourceBuilder(
120
@Nonnull JobConf jobConf,
121
@Nonnull ReadableConfig flinkConf,
122
@Nonnull ObjectPath tablePath,
123
@Nullable String hiveVersion,
124
@Nonnull CatalogTable catalogTable);
125
126
/**
127
* Sets the partitions to read in batch mode. By default, batch source reads all partitions in a hive table.
128
* @param partitions - List of specific partitions to read
129
* @return Builder instance for chaining
130
*/
131
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
132
133
/**
134
* Sets the maximum number of records this source should return
135
* @param limit - Maximum number of records to read
136
* @return Builder instance for chaining
137
*/
138
public HiveSourceBuilder setLimit(Long limit);
139
140
/**
141
* Sets the indices of projected fields
142
* @param projectedFields - Indices of the fields to project, starting from 0
143
* @return Builder instance for chaining
144
*/
145
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
146
147
/**
148
* Builds HiveSource with default built-in BulkFormat that returns records in type of RowData
149
* @return HiveSource configured for RowData output
150
*/
151
public HiveSource<RowData> buildWithDefaultBulkFormat();
152
153
/**
154
* Builds HiveSource with custom BulkFormat
155
* @param bulkFormat - Custom BulkFormat for reading records
156
* @return HiveSource configured with the provided BulkFormat
157
*/
158
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
159
}
160
```
161
162
### Split Management
163
164
Classes for managing source splits and their lifecycle.
165
166
```java { .api }
167
/**
168
* A sub-class of FileSourceSplit that contains extra information needed to read a hive table.
169
*/
170
@PublicEvolving
171
public class HiveSourceSplit extends FileSourceSplit {
172
/**
173
* Create HiveSourceSplit from Hadoop FileSplit
174
* @param fileSplit - Hadoop FileSplit containing file and offset information
175
* @param hiveTablePartition - Hive table partition metadata
176
* @param readerPosition - Current reader position for checkpointing (can be null)
177
* @throws IOException if split creation fails
178
*/
179
public HiveSourceSplit(
180
FileSplit fileSplit,
181
HiveTablePartition hiveTablePartition,
182
@Nullable CheckpointedPosition readerPosition) throws IOException;
183
184
/**
185
* Create HiveSourceSplit with explicit parameters
186
* @param id - Unique identifier for this split
187
* @param filePath - Path to the file this split reads
188
* @param offset - Start position in the file
189
* @param length - Length of data to read
190
* @param hostnames - Preferred hosts for reading this split
191
* @param readerPosition - Current reader position for checkpointing (can be null)
192
* @param hiveTablePartition - Hive table partition metadata
193
*/
194
public HiveSourceSplit(
195
String id,
196
Path filePath,
197
long offset,
198
long length,
199
String[] hostnames,
200
@Nullable CheckpointedPosition readerPosition,
201
HiveTablePartition hiveTablePartition);
202
203
/**
204
* Get Hive table partition metadata for this split
205
* @return HiveTablePartition containing partition information
206
*/
207
public HiveTablePartition getHiveTablePartition();
208
209
/**
210
* Convert this split to MapReduce FileSplit format
211
* @return FileSplit compatible with Hadoop MapReduce API
212
*/
213
public FileSplit toMapRedSplit();
214
215
/**
216
* Update split with new checkpointed position
217
* @param position - New checkpointed position (can be null)
218
* @return New HiveSourceSplit with updated position
219
*/
220
@Override
221
public FileSourceSplit updateWithCheckpointedPosition(@Nullable CheckpointedPosition position);
222
}
223
224
/**
225
* Serializer for HiveSourceSplit objects
226
*/
227
public class HiveSourceSplitSerializer implements SimpleVersionedSerializer<HiveSourceSplit> {
228
public static final HiveSourceSplitSerializer INSTANCE = new HiveSourceSplitSerializer();
229
230
/**
231
* Get serializer version
232
* @return Version number for compatibility
233
*/
234
@Override
235
public int getVersion();
236
237
/**
238
* Serialize split to bytes
239
* @param split - Split to serialize
240
* @return Serialized bytes
241
* @throws IOException if serialization fails
242
*/
243
@Override
244
public byte[] serialize(HiveSourceSplit split) throws IOException;
245
246
/**
247
* Deserialize split from bytes
248
* @param version - Serializer version
249
* @param serialized - Serialized bytes
250
* @return Deserialized split
251
* @throws IOException if deserialization fails
252
*/
253
@Override
254
public HiveSourceSplit deserialize(int version, byte[] serialized) throws IOException;
255
}
256
```
257
258
**Usage Examples:**
259
260
```java
261
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
262
import org.apache.flink.connectors.hive.HiveSource;
263
import org.apache.flink.connectors.hive.HiveSourceBuilder;
264
import org.apache.flink.configuration.Configuration;
265
import org.apache.hadoop.mapred.JobConf;
266
267
// Create streaming environment
268
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
269
270
// Configure Hive source with Source API for streaming
271
JobConf jobConf = new JobConf();
272
// Configure jobConf with Hive settings...
273
274
Configuration flinkConf = new Configuration();
275
// Set streaming source properties
276
flinkConf.setString("streaming-source.enable", "true");
277
flinkConf.setString("streaming-source.monitor-interval", "1 min");
278
279
Map<String, String> tableOptions = new HashMap<>();
280
tableOptions.put("streaming-source.enable", "true");
281
tableOptions.put("streaming-source.monitor-interval", "1 min");
282
283
HiveSource<RowData> hiveSource = new HiveSourceBuilder(
284
jobConf,
285
flinkConf,
286
null, // auto-detect Hive version
287
"default",
288
"streaming_events",
289
tableOptions)
290
.setProjectedFields(new int[]{0, 1, 3}) // Project specific columns
291
.buildWithDefaultBulkFormat();
292
293
// Create data stream from Hive source
294
DataStreamSource<RowData> stream = env.fromSource(
295
hiveSource,
296
WatermarkStrategy.noWatermarks(),
297
"hive-source"
298
);
299
300
// Process the stream
301
stream
302
.map(new ProcessRowDataFunction())
303
.print();
304
305
env.execute("Hive Source API Example");
306
```
307
308
```java
309
// Batch reading with Source API
310
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
311
312
// Configure specific partitions to read
313
List<HiveTablePartition> partitions = Arrays.asList(
314
// Create partitions using HivePartitionUtils or other methods
315
);
316
317
HiveSource<RowData> batchSource = new HiveSourceBuilder(
318
jobConf,
319
flinkConf,
320
null, // auto-detect Hive version
321
"sales",
322
"orders",
323
new HashMap<>())
324
.setPartitions(partitions)
325
.setLimit(10000L) // Limit to 10k records
326
.buildWithDefaultBulkFormat();
327
328
DataStreamSource<RowData> batchStream = env.fromSource(
329
batchSource,
330
WatermarkStrategy.noWatermarks(),
331
"hive-batch-source"
332
);
333
334
batchStream.print();
335
env.execute("Hive Batch Source Example");
336
```
337
338
## Types
339
340
```java { .api }
341
public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
342
implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
343
/**
344
* Get boundedness of this source
345
* @return Boundedness.BOUNDED for batch mode, CONTINUOUS_UNBOUNDED for streaming
346
*/
347
@Override
348
public Boundedness getBoundedness();
349
350
/**
351
* Create source reader
352
* @param readerContext - Reader context provided by Flink runtime
353
* @return SourceReader instance for reading splits
354
*/
355
@Override
356
public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
357
358
/**
359
* Create split enumerator
360
* @param enumContext - Enumerator context provided by Flink runtime
361
* @return SplitEnumerator instance for managing splits
362
* @throws Exception if creation fails
363
*/
364
@Override
365
public abstract SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(
366
SplitEnumeratorContext<SplitT> enumContext);
367
368
/**
369
* Restore split enumerator from checkpoint
370
* @param enumContext - Enumerator context provided by Flink runtime
371
* @param checkpoint - Checkpoint state to restore from
372
* @return SplitEnumerator instance restored from checkpoint
373
*/
374
@Override
375
public abstract SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> restoreEnumerator(
376
SplitEnumeratorContext<SplitT> enumContext,
377
PendingSplitsCheckpoint<SplitT> checkpoint);
378
379
/**
380
* Get split serializer
381
* @return Serializer for split objects
382
*/
383
@Override
384
public abstract SimpleVersionedSerializer<SplitT> getSplitSerializer();
385
386
/**
387
* Get checkpoint serializer
388
* @return Serializer for checkpoint objects
389
*/
390
@Override
391
public SimpleVersionedSerializer<PendingSplitsCheckpoint<SplitT>>
392
getEnumeratorCheckpointSerializer();
393
394
/**
395
* Get type information for produced records
396
* @return TypeInformation for the output type
397
*/
398
@Override
399
public TypeInformation<T> getProducedType();
400
}
401
402
public class FileSourceSplit implements SourceSplit {
403
/**
404
* Get split identifier
405
* @return Unique split ID string
406
*/
407
@Override
408
public String splitId();
409
410
/**
411
* Get file path for this split
412
* @return Path to the file
413
*/
414
public Path path();
415
416
/**
417
* Get start position in file
418
* @return Start byte position
419
*/
420
public long offset();
421
422
/**
423
* Get length of data to read
424
* @return Length in bytes
425
*/
426
public long length();
427
428
/**
429
* Get preferred hosts for locality
430
* @return Array of host names
431
*/
432
public String[] hostnames();
433
434
/**
435
* Get current reader position for checkpointing
436
* @return Optional containing checkpointed position
437
*/
438
public Optional<CheckpointedPosition> getReaderPosition();
439
}
440
441
public enum Boundedness {
442
BOUNDED,
443
CONTINUOUS_UNBOUNDED
444
}
445
```