or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md

table-source-sink.mddocs/

0

# Table Source and Sink Operations

1

2

Core functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling, projection pushdown, and efficient data processing.

3

4

## Capabilities

5

6

### HiveSource

7

8

Unified data source for reading Hive tables in both bounded (batch) and unbounded (streaming) modes.

9

10

```java { .api }

11

/**

12

* Unified data source for reading Hive tables (bounded/unbounded)

13

* Implements Flink's Source interface for integration with DataStream API

14

*/

15

@PublicEvolving

16

public class HiveSource<T> implements Source<T, HiveSourceSplit, HivePendingSplitsCheckpoint> {

17

public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();

18

public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> createEnumerator(

19

SplitEnumeratorContext<HiveSourceSplit> enumContext) throws Exception;

20

public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> restoreEnumerator(

21

SplitEnumeratorContext<HiveSourceSplit> enumContext,

22

HivePendingSplitsCheckpoint checkpoint) throws Exception;

23

public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext) throws Exception;

24

}

25

```

26

27

### HiveSourceBuilder

28

29

Builder pattern implementation for constructing HiveSource instances with flexible configuration options.

30

31

```java { .api }

32

/**

33

* Builder for constructing HiveSource instances with configuration options

34

* Provides fluent API for setting partitions, projections, and limits

35

*/

36

@PublicEvolving

37

public class HiveSourceBuilder {

38

/**

39

* Create a new HiveSourceBuilder

40

* @param jobConf Hadoop JobConf with Hive configuration

41

* @param flinkConf Flink configuration options

42

* @param hiveVersion Hive version for compatibility (null for auto-detection)

43

* @param dbName Database name

44

* @param tableName Table name

45

* @param tableOptions Additional table options (take precedence over metastore properties)

46

*/

47

public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf,

48

String hiveVersion, String dbName,

49

String tableName, Map<String, String> tableOptions);

50

51

/**

52

* Build HiveSource with default RowData bulk format

53

* @return HiveSource configured for RowData processing

54

*/

55

public HiveSource<RowData> buildWithDefaultBulkFormat();

56

57

/**

58

* Build HiveSource with custom bulk format

59

* @param bulkFormat Custom bulk format for reading data

60

* @return HiveSource configured with custom format

61

*/

62

public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);

63

64

/**

65

* Set specific partitions to read (for batch mode)

66

* @param partitions List of partitions to read

67

* @return Builder instance for chaining

68

*/

69

public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

70

71

/**

72

* Set maximum number of records to read

73

* @param limit Maximum record count

74

* @return Builder instance for chaining

75

*/

76

public HiveSourceBuilder setLimit(Long limit);

77

78

/**

79

* Set field projection for reading subset of columns

80

* @param projectedFields Array of field indices to project

81

* @return Builder instance for chaining

82

*/

83

public HiveSourceBuilder setProjectedFields(int[] projectedFields);

84

}

85

```

86

87

**Usage Examples:**

88

89

```java

90

import org.apache.flink.connectors.hive.HiveSourceBuilder;

91

import org.apache.flink.connectors.hive.HiveTablePartition;

92

import org.apache.hadoop.mapred.JobConf;

93

94

// Basic source creation

95

JobConf jobConf = new JobConf();

96

jobConf.set("hive.metastore.uris", "thrift://localhost:9083");

97

98

HiveSource<RowData> source = new HiveSourceBuilder(

99

jobConf,

100

new Configuration(),

101

"2.3.9", // hiveVersion

102

"default",

103

"user_events",

104

Collections.emptyMap()

105

).buildWithDefaultBulkFormat();

106

107

// Source with specific partitions and projection

108

List<HiveTablePartition> partitions = Arrays.asList(

109

HiveTablePartition.ofPartition(storageDesc, Map.of("year", "2024", "month", "01"), tableParams),

110

HiveTablePartition.ofPartition(storageDesc, Map.of("year", "2024", "month", "02"), tableParams)

111

);

112

113

HiveSource<RowData> filteredSource = new HiveSourceBuilder(

114

jobConf,

115

new Configuration(),

116

"2.3.9", // hiveVersion

117

"analytics",

118

"sales_data",

119

Collections.emptyMap()

120

)

121

.setPartitions(partitions)

122

.setProjectedFields(new int[]{0, 2, 5}) // Select specific columns

123

.setLimit(100000L) // Limit records

124

.buildWithDefaultBulkFormat();

125

126

// Use in DataStream

127

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

128

DataStream<RowData> stream = env.fromSource(

129

source,

130

WatermarkStrategy.noWatermarks(),

131

"hive-source"

132

);

133

```

134

135

### HiveTableSource

136

137

Dynamic table source implementation for Table API integration with advanced pushdown capabilities.

138

139

```java { .api }

140

/**

141

* Dynamic table source for Hive integration with Table API

142

* Supports partition pushdown, projection pushdown, limit pushdown, and statistics

143

*/

144

public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,

145

SupportsProjectionPushDown, SupportsLimitPushDown,

146

SupportsStatisticReport, SupportsDynamicFiltering {

147

148

/**

149

* Get scan runtime provider for table scanning

150

* @param scanContext Scan context with runtime information

151

* @return Provider for scan runtime

152

*/

153

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

154

155

/**

156

* Apply partition pruning to reduce data scanning

157

* @param remainingPartitions Partitions remaining after pruning

158

* @return Result with updated source

159

*/

160

public Result applyPartitions(List<Map<String, String>> remainingPartitions);

161

162

/**

163

* Apply field projection to read only required columns

164

* @param projectedFields Array of projected field indices

165

* @param producedDataType Data type after projection

166

* @return Result with updated source

167

*/

168

public Result applyProjection(int[][] projectedFields, DataType producedDataType);

169

170

/**

171

* Apply limit pushdown for result size optimization

172

* @param limit Maximum number of records to read

173

* @return Result with updated source

174

*/

175

public Result applyLimit(long limit);

176

177

/**

178

* Report table statistics for query optimization

179

* @return Table statistics including row count and column statistics

180

*/

181

public ChangelogMode getChangelogMode();

182

183

/**

184

* Get table statistics for cost-based optimization

185

* @return TableStats with row count and column statistics

186

*/

187

public TableStats reportStatistics();

188

}

189

```

190

191

### HiveLookupTableSource

192

193

Specialized table source for lookup join operations with caching support.

194

195

```java { .api }

196

/**

197

* Lookup table source extending HiveTableSource for join operations

198

* Provides temporal table lookup functionality with optional caching

199

*/

200

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {

201

202

/**

203

* Get lookup runtime provider for join operations

204

* @param context Lookup context with join information

205

* @return Provider for lookup runtime

206

*/

207

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

208

}

209

```

210

211

### HiveTableSink

212

213

Dynamic table sink for writing data to Hive tables with partitioning and overwrite support.

214

215

```java { .api }

216

/**

217

* Dynamic table sink for writing to Hive tables

218

* Supports static partitioning, overwrite mode, and partition commit policies

219

*/

220

public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

221

222

/**

223

* Get sink runtime provider for data writing

224

* @param context Sink context with runtime information

225

* @return Provider for sink runtime

226

*/

227

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

228

229

/**

230

* Apply static partition specification for writing

231

* @param partition Static partition key-value pairs

232

* @return Result with updated sink

233

*/

234

public Result applyStaticPartition(Map<String, String> partition);

235

236

/**

237

* Enable or disable overwrite mode for existing data

238

* @param overwrite Whether to overwrite existing data

239

* @return Result with updated sink configuration

240

*/

241

public Result applyOverwrite(boolean overwrite);

242

243

/**

244

* Configure partition grouping behavior for writing

245

* @param requiresGrouping Whether partition grouping is required

246

* @return Result with updated sink configuration

247

*/

248

public Result requiresPartitionGrouping(boolean requiresGrouping);

249

250

/**

251

* Create copy of sink for plan optimization

252

* @return Copy of the sink

253

*/

254

public DynamicTableSink copy();

255

256

/**

257

* Get human-readable summary of sink configuration

258

* @return Summary string

259

*/

260

public String asSummaryString();

261

}

262

```

263

264

**Usage Examples:**

265

266

```java

267

// Table API source usage

268

TableEnvironment tableEnv = TableEnvironment.create(settings);

269

270

// Register Hive catalog

271

HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/etc/hive/conf");

272

tableEnv.registerCatalog("hive", hiveCatalog);

273

tableEnv.useCatalog("hive");

274

275

// Query with automatic source optimization

276

Table result = tableEnv.sqlQuery("""

277

SELECT user_id, event_type, COUNT(*) as event_count

278

FROM user_events

279

WHERE event_date >= '2024-01-01'

280

AND event_type IN ('login', 'purchase')

281

GROUP BY user_id, event_type

282

LIMIT 1000

283

""");

284

285

// Write to Hive table with partitioning

286

Table processedData = tableEnv.fromValues(

287

DataTypes.ROW(

288

DataTypes.FIELD("user_id", DataTypes.STRING()),

289

DataTypes.FIELD("revenue", DataTypes.DECIMAL(10, 2)),

290

DataTypes.FIELD("month", DataTypes.STRING())

291

),

292

Row.of("user123", new BigDecimal("99.99"), "2024-01"),

293

Row.of("user456", new BigDecimal("149.99"), "2024-01")

294

);

295

296

// Insert with automatic sink configuration

297

processedData.executeInsert("analytics.monthly_revenue");

298

```

299

300

### Integration with Flink's Connector Framework

301

302

The Hive connector seamlessly integrates with Flink's unified connector framework:

303

304

```java

305

// Factory registration happens automatically via SPI

306

// HiveDynamicTableFactory is discovered and used by Flink

307

308

// Manual source creation for DataStream API

309

HiveSource<RowData> source = new HiveSourceBuilder(jobConf, config, "2.3.9", "db", "table", Map.of())

310

.setPartitions(specificPartitions)

311

.setProjectedFields(new int[]{0, 1, 3})

312

.buildWithDefaultBulkFormat();

313

314

// Use with Flink's Source API

315

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

316

DataStream<RowData> hiveStream = env.fromSource(

317

source,

318

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),

319

"hive-source"

320

);

321

```

322

323

## Advanced Features

324

325

### Streaming Mode Support

326

327

The connector supports continuous monitoring of Hive tables for streaming scenarios:

328

329

```java

330

// Enable streaming mode via configuration

331

Configuration config = new Configuration();

332

config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);

333

config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));

334

335

HiveSource<RowData> streamingSource = new HiveSourceBuilder(

336

jobConf, config, "2.3.9", "db", "streaming_table", Map.of()

337

).buildWithDefaultBulkFormat();

338

```

339

340

### Partition Commit Policies

341

342

Configure how partitions are committed after writing:

343

344

```java

345

// Set partition commit policy

346

Configuration sinkConfig = new Configuration();

347

sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");

348

349

// Commit policy affects when partitions become visible

350

```

351

352

### Performance Optimization

353

354

```java

355

// Enable parallelism inference for optimal performance

356

config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

357

358

// Configure reader fallback for compatibility

359

config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

360

config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

361

```