or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md

table-sources-sinks.mddocs/

0

# Table Sources and Sinks

1

2

Core table connector implementations providing comprehensive read and write capabilities for Hive tables. These classes support both batch and streaming processing modes, with advanced features including partition management, performance optimizations, and streaming ingestion capabilities.

3

4

## Capabilities

5

6

### Hive Table Source

7

8

Primary table source implementation for reading data from Hive tables, supporting both batch and streaming modes with advanced pushdown optimizations.

9

10

```java { .api }

11

/**

12

* Table source for reading Hive tables with comprehensive optimization support

13

* Supports: partition pushdown, projection pushdown, limit pushdown, statistics reporting

14

*/

15

public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,

16

SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport {

17

18

/**

19

* Creates table source scan runtime provider for execution

20

* @param scanContext Context containing runtime information

21

* @return ScanRuntimeProvider for execution

22

*/

23

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

24

25

/**

26

* Applies partition pruning to reduce data scanning

27

* @param remainingPartitions Partitions that remain after pruning

28

*/

29

public void applyPartitions(List<Map<String, String>> remainingPartitions);

30

31

/**

32

* Applies column projection to minimize data transfer

33

* @param projectedFields Array of projected field indices

34

* @param nestedFields Nested field projections

35

*/

36

public void applyProjection(int[][] projectedFields, DataType[] nestedFields);

37

38

/**

39

* Applies limit pushdown for query optimization

40

* @param limit Maximum number of records to read

41

*/

42

public void applyLimit(long limit);

43

44

/**

45

* Reports table statistics for cost-based optimization

46

* @param reportContext Context for statistics reporting

47

* @return Table statistics including row count and column statistics

48

*/

49

public TableStats reportStatistics(StatisticReportContext reportContext);

50

51

/**

52

* Creates a copy of this table source for planning

53

* @return Deep copy of the table source

54

*/

55

public DynamicTableSource copy();

56

57

/**

58

* Returns string summary of the table source

59

* @return Human-readable description

60

*/

61

public String asSummaryString();

62

}

63

```

64

65

**Usage Examples:**

66

67

```sql

68

-- Batch reading with partition pruning

69

SELECT id, name, amount

70

FROM hive_table

71

WHERE partition_date BETWEEN '2023-01-01' AND '2023-01-31'

72

AND region = 'us-west';

73

74

-- Streaming source configuration

75

CREATE TABLE hive_stream_source (

76

id BIGINT,

77

event_data STRING,

78

event_time TIMESTAMP(3),

79

partition_hour STRING

80

) PARTITIONED BY (partition_hour)

81

WITH (

82

'connector' = 'hive',

83

'streaming-source.enable' = 'true',

84

'streaming-source.partition.include' = 'latest',

85

'streaming-source.monitor-interval' = '10 min',

86

'streaming-source.consume-start-offset' = '2023-01-01 00:00:00'

87

);

88

```

89

90

### Hive Table Sink

91

92

Primary table sink implementation for writing data to Hive tables with comprehensive partitioning and commit policy support.

93

94

```java { .api }

95

/**

96

* Table sink for writing data to Hive tables with partition support

97

* Supports: dynamic partitioning, overwrite modes, custom commit policies

98

*/

99

public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

100

101

/**

102

* Creates sink runtime provider for execution

103

* @param sinkContext Context containing runtime information

104

* @return SinkRuntimeProvider for execution

105

*/

106

public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext sinkContext);

107

108

/**

109

* Applies static partition specifications

110

* @param partitions Map of partition column to value

111

*/

112

public void applyStaticPartition(Map<String, String> partitions);

113

114

/**

115

* Configures overwrite mode for the sink

116

* @param overwrite Whether to overwrite existing data

117

*/

118

public void applyOverwrite(boolean overwrite);

119

120

/**

121

* Creates a copy of this table sink for planning

122

* @return Deep copy of the table sink

123

*/

124

public DynamicTableSink copy();

125

126

/**

127

* Returns string summary of the table sink

128

* @return Human-readable description

129

*/

130

public String asSummaryString();

131

}

132

```

133

134

**Usage Examples:**

135

136

```sql

137

-- Writing to partitioned table

138

INSERT INTO hive_partitioned_table

139

PARTITION (year='2023', month='01')

140

SELECT id, name, amount FROM source_table;

141

142

-- Dynamic partitioning with overwrite

143

INSERT OVERWRITE hive_table

144

SELECT id, name, amount, DATE_FORMAT(event_time, 'yyyy-MM-dd') as partition_date

145

FROM streaming_source;

146

147

-- Sink configuration with commit policies

148

CREATE TABLE hive_sink (

149

id BIGINT,

150

data STRING,

151

partition_date STRING

152

) PARTITIONED BY (partition_date)

153

WITH (

154

'connector' = 'hive',

155

'sink.partition-commit.policy.kind' = 'metastore,success-file',

156

'table.exec.hive.sink.statistic-auto-gather.enable' = 'true'

157

);

158

```

159

160

### Hive Source (DataStream API)

161

162

Low-level source implementation using Flink's new Source API, providing fine-grained control over data ingestion and parallelism.

163

164

```java { .api }

165

/**

166

* Generic source implementation using Flink's new Source API

167

* Provides low-level control over data ingestion and split management

168

* @param <T> Output data type

169

*/

170

public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit>,

171

ResultTypeQueryable<T> {

172

173

/**

174

* Creates a bounded split enumerator for batch execution

175

* @param enumContext Enumerator context

176

* @return Split enumerator instance

177

*/

178

public SplitEnumerator<HiveSourceSplit, HiveSplitEnumeratorState> createEnumerator(

179

SplitEnumeratorContext<HiveSourceSplit> enumContext);

180

181

/**

182

* Restores split enumerator from checkpoint state

183

* @param enumContext Enumerator context

184

* @param checkpoint Checkpoint state to restore from

185

* @return Restored split enumerator

186

*/

187

public SplitEnumerator<HiveSourceSplit, HiveSplitEnumeratorState> restoreEnumerator(

188

SplitEnumeratorContext<HiveSourceSplit> enumContext,

189

HiveSplitEnumeratorState checkpoint);

190

191

/**

192

* Creates a source reader for processing splits

193

* @param readerContext Reader context

194

* @return Source reader instance

195

*/

196

public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);

197

198

/**

199

* Returns the boundedness of this source

200

* @return Boundedness.BOUNDED for batch, CONTINUOUS_UNBOUNDED for streaming

201

*/

202

public Boundedness getBoundedness();

203

204

/**

205

* Returns the produced data type

206

* @return TypeInformation for the output type

207

*/

208

public TypeInformation<T> getProducedType();

209

}

210

```

211

212

**Usage Example:**

213

214

```java

215

import org.apache.flink.connectors.hive.HiveSource;

216

import org.apache.flink.connectors.hive.HiveSourceBuilder;

217

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

218

import org.apache.flink.table.data.RowData;

219

220

// Create streaming environment

221

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

222

223

// Build Hive source

224

HiveSource<RowData> hiveSource = new HiveSourceBuilder()

225

.setTableIdentifier(ObjectIdentifier.of("catalog", "database", "table"))

226

.setHiveConfiguration(hiveConf)

227

.setProjectFields(new int[]{0, 1, 2}) // Project specific columns

228

.setLimit(10000) // Limit number of records

229

.build();

230

231

// Add to execution graph

232

env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")

233

.map(row -> processRow(row))

234

.print();

235

236

env.execute("Hive DataStream Job");

237

```

238

239

### Hive Source Builder

240

241

Builder pattern implementation for constructing HiveSource instances with fluent API for configuration.

242

243

```java { .api }

244

/**

245

* Builder for creating HiveSource instances with fluent configuration API

246

*/

247

public class HiveSourceBuilder {

248

249

/**

250

* Sets the table identifier for the source

251

* @param tableIdentifier Table identifier (catalog.database.table)

252

* @return This builder instance for chaining

253

*/

254

public HiveSourceBuilder setTableIdentifier(ObjectIdentifier tableIdentifier);

255

256

/**

257

* Sets Hive configuration for the source

258

* @param hiveConf Hive configuration object

259

* @return This builder instance for chaining

260

*/

261

public HiveSourceBuilder setHiveConfiguration(HiveConf hiveConf);

262

263

/**

264

* Sets projected field indices to minimize data transfer

265

* @param projectFields Array of field indices to project

266

* @return This builder instance for chaining

267

*/

268

public HiveSourceBuilder setProjectFields(int[] projectFields);

269

270

/**

271

* Sets maximum number of records to read

272

* @param limit Maximum record count

273

* @return This builder instance for chaining

274

*/

275

public HiveSourceBuilder setLimit(long limit);

276

277

/**

278

* Sets specific partitions to read from

279

* @param partitions List of partition specifications

280

* @return This builder instance for chaining

281

*/

282

public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

283

284

/**

285

* Builds the configured HiveSource instance

286

* @return Configured HiveSource<RowData> instance

287

*/

288

public HiveSource<RowData> build();

289

}

290

```

291

292

### Hive Lookup Table Source

293

294

Specialized table source for dimension table lookups in streaming joins, providing caching and optimized access patterns.

295

296

```java { .api }

297

/**

298

* Lookup table source for dimension table joins with caching support

299

* Extends HiveTableSource with lookup join capabilities

300

*/

301

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {

302

303

/**

304

* Creates lookup runtime provider for join operations

305

* @param lookupContext Context containing lookup configuration

306

* @return LookupRuntimeProvider for execution

307

*/

308

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);

309

310

/**

311

* Creates a copy of this lookup table source

312

* @return Deep copy of the lookup table source

313

*/

314

public DynamicTableSource copy();

315

}

316

```

317

318

**Usage Example:**

319

320

```sql

321

-- Configure lookup join with caching

322

CREATE TABLE dim_table (

323

id BIGINT PRIMARY KEY NOT ENFORCED,

324

name STRING,

325

category STRING

326

) WITH (

327

'connector' = 'hive',

328

'lookup.join.cache.ttl' = '1 hour'

329

);

330

331

-- Use in temporal join

332

SELECT

333

orders.order_id,

334

orders.amount,

335

dim.name,

336

dim.category

337

FROM orders_stream

338

JOIN dim_table FOR SYSTEM_TIME AS OF orders.proc_time AS dim

339

ON orders.product_id = dim.id;

340

```

341

342

## Performance Optimization Features

343

344

### Partition Pruning

345

- Automatic partition elimination based on WHERE clause predicates

346

- Supports complex partition expressions and date range filtering

347

- Reduces I/O by scanning only relevant partitions

348

349

### Projection Pushdown

350

- Column pruning to minimize data transfer

351

- Supports nested field projection for complex data types

352

- Integrates with columnar storage formats (Parquet, ORC)

353

354

### Limit Pushdown

355

- Pushes LIMIT operations to reduce data scanning

356

- Optimizes TOP-N queries and sampling operations

357

- Combines with other pushdowns for maximum efficiency

358

359

### Statistics Integration

360

- Provides table and column statistics for cost-based optimization

361

- Supports Hive metastore statistics and file-level metadata

362

- Enables intelligent join ordering and execution planning

363

364

### Vectorized Processing

365

- Native vectorized readers for Parquet and ORC formats

366

- Batch processing for improved CPU efficiency

367

- Configurable fallback to MapReduce readers when needed