or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdconfiguration.mddata-source.mdfunctions.mdindex.mdtable-api.md

data-source.mddocs/

0

# Unified Data Source

1

2

Lower-level DataStream API integration providing fine-grained control over Hive data processing with custom formats, transformations, and advanced source configuration options.

3

4

## Capabilities

5

6

### HiveSource

7

8

Unified data source for reading Hive tables with custom data types and advanced configuration.

9

10

```java { .api }

11

/**

12

* Unified data source for reading Hive tables with custom data types

13

*/

14

@PublicEvolving

15

public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {

16

17

/** Get split serializer for checkpoint operations */

18

public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();

19

20

/** Get enumerator checkpoint serializer */

21

public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();

22

23

/** Create split enumerator for assigning splits to readers */

24

public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);

25

26

/** Restore split enumerator from checkpoint */

27

public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext, PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);

28

}

29

```

30

31

**Usage Examples:**

32

33

```java

34

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

35

import org.apache.flink.streaming.api.datastream.DataStream;

36

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

37

import org.apache.flink.connectors.hive.HiveSource;

38

import org.apache.flink.connectors.hive.HiveSourceBuilder;

39

40

// Create streaming environment

41

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

42

43

// Build and use HiveSource

44

HiveSource<RowData> source = new HiveSourceBuilder()

45

.setProjectedFields(new int[]{0, 1, 2}) // Select specific columns

46

.setLimit(10000L) // Limit number of records

47

.buildWithDefaultBulkFormat();

48

49

// Create DataStream from source

50

DataStream<RowData> stream = env.fromSource(

51

source,

52

WatermarkStrategy.noWatermarks(),

53

"hive-source"

54

);

55

56

// Process the stream

57

stream.map(row -> {

58

// Custom processing logic

59

return processRow(row);

60

}).print();

61

62

env.execute("Hive Streaming Job");

63

```

64

65

### HiveSourceBuilder

66

67

Builder pattern for constructing HiveSource instances with various configuration options.

68

69

```java { .api }

70

/**

71

* Builder for constructing HiveSource instances with configuration options

72

*/

73

@PublicEvolving

74

public class HiveSourceBuilder {

75

76

/**

77

* Set projected fields for column pruning

78

* @param projectedFields - Array of column indices to read

79

* @return Builder instance for chaining

80

*/

81

public HiveSourceBuilder setProjectedFields(int[] projectedFields);

82

83

/**

84

* Set limit for number of records to read

85

* @param limit - Maximum number of records to read

86

* @return Builder instance for chaining

87

*/

88

public HiveSourceBuilder setLimit(Long limit);

89

90

/**

91

* Set specific partitions to read from

92

* @param partitions - List of partitions to include

93

* @return Builder instance for chaining

94

*/

95

public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

96

97

/**

98

* Build source with default bulk format

99

* @return Configured HiveSource instance

100

*/

101

public <T> HiveSource<T> buildWithDefaultBulkFormat();

102

}

103

```

104

105

**Usage Examples:**

106

107

```java

108

import org.apache.flink.connectors.hive.HiveTablePartition;

109

110

// Build source with column projection

111

HiveSource<RowData> projectedSource = new HiveSourceBuilder()

112

.setProjectedFields(new int[]{0, 2, 5}) // Read columns 0, 2, and 5

113

.buildWithDefaultBulkFormat();

114

115

// Build source with partition filtering

116

List<HiveTablePartition> partitions = Arrays.asList(

117

getPartition("year=2023", "month=01"),

118

getPartition("year=2023", "month=02")

119

);

120

121

HiveSource<RowData> partitionedSource = new HiveSourceBuilder()

122

.setPartitions(partitions)

123

.setLimit(5000L)

124

.buildWithDefaultBulkFormat();

125

126

// Build source with all optimizations

127

HiveSource<RowData> optimizedSource = new HiveSourceBuilder()

128

.setProjectedFields(new int[]{0, 1, 3})

129

.setPartitions(selectedPartitions)

130

.setLimit(1000L)

131

.buildWithDefaultBulkFormat();

132

```

133

134

### HiveTablePartition

135

136

Represents a Hive table partition with metadata and storage information.

137

138

```java { .api }

139

/**

140

* Represents a Hive table partition with metadata

141

*/

142

@PublicEvolving

143

public class HiveTablePartition {

144

145

/** Get storage descriptor for the partition */

146

public StorageDescriptor getStorageDescriptor();

147

148

/** Get partition specification as key-value pairs */

149

public LinkedHashMap<String, String> getPartitionSpec();

150

151

/** Get table properties */

152

public Properties getTableProperties();

153

154

/** Get partition location */

155

public String getLocation();

156

157

/** Check if partition is stored in a specific format */

158

public boolean isStoredAsSubDirectories();

159

}

160

```

161

162

### HiveSourceSplit

163

164

Represents a split for reading from Hive sources, containing partition and file information.

165

166

```java { .api }

167

/**

168

* Represents a split for reading from Hive sources

169

*/

170

@PublicEvolving

171

public class HiveSourceSplit implements SourceSplit {

172

173

/** Get unique split identifier */

174

public String splitId();

175

176

/** Get associated Hive table partition */

177

public HiveTablePartition getHiveTablePartition();

178

179

/** Get file splits within this partition */

180

public List<FileSplit> getFileSplits();

181

182

/** Get reader schema for this split */

183

public TableSchema getReaderSchema();

184

}

185

```

186

187

## Advanced Usage Patterns

188

189

### Streaming Mode Configuration

190

191

Configure continuous monitoring of Hive tables for new data:

192

193

```java

194

// Enable streaming mode with partition monitoring

195

Configuration config = new Configuration();

196

config.setString("table.exec.source.idle-timeout", "10s");

197

config.setBoolean("table.exec.hive.infer-source-parallelism", true);

198

199

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

200

env.configure(config);

201

```

202

203

### Custom Data Processing

204

205

Process Hive data with custom transformations:

206

207

```java

208

DataStream<RowData> hiveStream = env.fromSource(hiveSource, watermarkStrategy, "hive");

209

210

// Custom processing with DataStream API

211

DataStream<CustomRecord> processed = hiveStream

212

.map(new HiveRowDataMapper())

213

.filter(record -> record.isValid())

214

.keyBy(CustomRecord::getKey)

215

.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))

216

.aggregate(new CustomAggregator());

217

```

218

219

### Batch Processing Optimization

220

221

Optimize for large batch processing workloads:

222

223

```java

224

// Configure for batch processing

225

HiveSource<RowData> batchSource = new HiveSourceBuilder()

226

.setProjectedFields(requiredColumns)

227

.setLimit(null) // No limit for full table scan

228

.buildWithDefaultBulkFormat();

229

230

// Use with batch execution environment

231

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

232

batchEnv.getConfig().enableObjectReuse(); // Optimize for batch

233

```

234

235

### Partition Discovery

236

237

Automatically discover and process new partitions:

238

239

```java

240

// Configure partition discovery interval

241

config.setString("partition.discovery.interval-millis", "60000"); // 1 minute

242

243

// Source will automatically discover new partitions

244

HiveSource<RowData> discoverySource = new HiveSourceBuilder()

245

.buildWithDefaultBulkFormat();

246

```