or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-operations.mdconfiguration.mdhive-functions.mdindex.mdsource-api.mdtable-sinks.mdtable-sources.md

table-sources.mddocs/

0

# Table Sources

1

2

Reading data from Hive tables with support for both batch and streaming modes, partition pruning, projection pushdown, and lookup joins. Provides comprehensive integration with Flink's table ecosystem while leveraging Hive's storage capabilities.

3

4

## Capabilities

5

6

### HiveTableSource

7

8

Primary table source for reading Hive tables in streaming mode with continuous partition monitoring.

9

10

```java { .api }

11

/**

12

* Table source for reading data from Hive tables in streaming mode

13

* Supports continuous partition monitoring and various optimizations

14

*/

15

public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {

16

/**

17

* Creates HiveTableSource for streaming Hive table access

18

* @param jobConf - Hadoop job configuration

19

* @param conf - Flink configuration

20

* @param tablePath - Path to the Hive table

21

* @param catalogTable - Catalog table metadata

22

*/

23

public HiveTableSource(JobConf jobConf, ReadableConfig conf, ObjectPath tablePath, CatalogTable catalogTable);

24

25

/**

26

* Get the scan runtime provider for reading data

27

* @param scanContext - Context for scan operation

28

* @return ScanRuntimeProvider for data stream creation

29

*/

30

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

31

32

/**

33

* Get the changelog mode supported by this source

34

* @return ChangelogMode indicating supported change types

35

*/

36

public ChangelogMode getChangelogMode();

37

38

/**

39

* Copy this source with different configuration

40

* @return New HiveTableSource instance

41

*/

42

public DynamicTableSource copy();

43

44

/**

45

* Get string summary of this table source

46

* @return Human-readable description

47

*/

48

public String asSummaryString();

49

}

50

```

51

52

### HiveLookupTableSource

53

54

Table source with both scan and lookup capabilities for dimension table use cases.

55

56

```java { .api }

57

/**

58

* Table source with both scan and lookup capabilities for Hive tables

59

* Ideal for dimension tables used in joins

60

*/

61

public class HiveLookupTableSource implements LookupTableSource, ScanTableSource {

62

/**

63

* Creates HiveLookupTableSource for scan and lookup operations

64

* @param jobConf - Hadoop job configuration

65

* @param conf - Flink configuration

66

* @param tablePath - Path to the Hive table

67

* @param catalogTable - Catalog table metadata

68

*/

69

public HiveLookupTableSource(JobConf jobConf, ReadableConfig conf, ObjectPath tablePath, CatalogTable catalogTable);

70

71

/**

72

* Get the lookup runtime provider for join operations

73

* @param context - Context for lookup operation

74

* @return LookupRuntimeProvider for lookup function creation

75

*/

76

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

77

78

/**

79

* Get the scan runtime provider for reading data

80

* @param scanContext - Context for scan operation

81

* @return ScanRuntimeProvider for data stream creation

82

*/

83

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

84

85

/**

86

* Get the changelog mode supported by this source

87

* @return ChangelogMode indicating supported change types

88

*/

89

public ChangelogMode getChangelogMode();

90

91

/**

92

* Copy this source with different configuration

93

* @return New HiveLookupTableSource instance

94

*/

95

public DynamicTableSource copy();

96

}

97

```

98

99

### Partition Pushdown Support

100

101

Interface for optimizing queries by pushing partition filters to the source.

102

103

```java { .api }

104

/**

105

* Apply partition pushdown optimization

106

* Filters data at the source level based on partition specifications

107

* @param remainingPartitions - List of partition specs that remain after filtering

108

*/

109

public void applyPartitions(List<Map<String, String>> remainingPartitions);

110

111

/**

112

* Check if nested projection is supported

113

* @return true if nested field projection is supported

114

*/

115

public boolean supportsNestedProjection();

116

```

117

118

### Projection Pushdown Support

119

120

Interface for optimizing queries by pushing column projections to the source.

121

122

```java { .api }

123

/**

124

* Apply projection pushdown optimization

125

* Only reads specified columns from the underlying storage

126

* @param projectedFields - Array of field indices to project in nested format

127

*/

128

public void applyProjection(int[][] projectedFields);

129

```

130

131

### Limit Pushdown Support

132

133

Interface for optimizing queries by pushing LIMIT operations to the source.

134

135

```java { .api }

136

/**

137

* Apply limit pushdown optimization

138

* Limits the number of records read at the source level

139

* @param limit - Maximum number of records to read

140

*/

141

public void applyLimit(long limit);

142

```

143

144

### Reading Context and Configuration

145

146

Context classes for configuring Hive table reading behavior.

147

148

```java { .api }

149

/**

150

* Context for Hive partition operations

151

*/

152

public class HivePartitionContext {

153

public HivePartitionContext(List<HiveTablePartition> allPartitions, List<HiveTablePartition> remainingPartitions);

154

public List<HiveTablePartition> getAllPartitions();

155

public List<HiveTablePartition> getRemainingPartitions();

156

}

157

158

/**

159

* Context for continuous Hive partition monitoring

160

*/

161

public class HiveContinuousPartitionContext extends HivePartitionFetcherContextBase<HiveTablePartition> {

162

public HiveContinuousPartitionContext(ObjectPath tablePath, CatalogTable catalogTable, List<String> partitionKeys);

163

public List<HiveTablePartition> getPartitions(List<String> partitionValues);

164

}

165

```

166

167

### Input Format Classes

168

169

Low-level input format classes for reading Hive table data.

170

171

```java { .api }

172

/**

173

* Input format for reading Hive table data

174

* Handles various Hive file formats and SerDes

175

*/

176

public class HiveTableInputFormat extends RichInputFormat<RowData, HiveTableInputSplit> {

177

/**

178

* Open input format for reading

179

* @param split - Input split to read

180

* @throws IOException if open fails

181

*/

182

public void open(HiveTableInputSplit split) throws IOException;

183

184

/**

185

* Check if more records are available

186

* @return true if more records available

187

* @throws IOException if check fails

188

*/

189

public boolean reachedEnd() throws IOException;

190

191

/**

192

* Read next record

193

* @param reuse - Reusable RowData object

194

* @return Next RowData record

195

* @throws IOException if read fails

196

*/

197

public RowData nextRecord(RowData reuse) throws IOException;

198

199

/**

200

* Close input format

201

* @throws IOException if close fails

202

*/

203

public void close() throws IOException;

204

}

205

206

/**

207

* Input split for Hive table reading

208

*/

209

public class HiveTableInputSplit implements InputSplit {

210

public HiveTableInputSplit(int splitNumber, Path path, long start, long length, String[] hosts);

211

public int getSplitNumber();

212

public String[] getHostnames();

213

}

214

```

215

216

### Split Reader Implementations

217

218

Specialized readers for different Hive file formats.

219

220

```java { .api }

221

/**

222

* Split reader interface for Hive formats

223

*/

224

public interface SplitReader<T> {

225

/**

226

* Read next record from split

227

* @return Next record or null if end reached

228

* @throws IOException if read fails

229

*/

230

T read() throws IOException;

231

232

/**

233

* Close the reader

234

* @throws IOException if close fails

235

*/

236

void close() throws IOException;

237

}

238

239

/**

240

* Split reader for ORC files with vectorization

241

*/

242

public class HiveVectorizedOrcSplitReader implements SplitReader<RowData> {

243

public HiveVectorizedOrcSplitReader(HiveShim hiveShim, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split);

244

}

245

246

/**

247

* Split reader for Parquet files with vectorization

248

*/

249

public class HiveVectorizedParquetSplitReader implements SplitReader<RowData> {

250

public HiveVectorizedParquetSplitReader(HiveShim hiveShim, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split);

251

}

252

253

/**

254

* Split reader for MapReduce-based formats

255

*/

256

public class HiveMapredSplitReader implements SplitReader<RowData> {

257

public HiveMapredSplitReader(JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split, HiveShim hiveShim);

258

}

259

```

260

261

**Usage Examples:**

262

263

```java

264

import org.apache.flink.table.api.TableEnvironment;

265

import org.apache.flink.table.catalog.hive.HiveCatalog;

266

import org.apache.flink.configuration.Configuration;

267

268

// Set up table environment with Hive catalog

269

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

270

HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");

271

tableEnv.registerCatalog("hive", hiveCatalog);

272

tableEnv.useCatalog("hive");

273

274

// Query Hive table with automatic source optimization

275

Table result = tableEnv.sqlQuery(

276

"SELECT customer_id, order_total " +

277

"FROM hive_catalog.sales.orders " +

278

"WHERE partition_date >= '2023-01-01' " +

279

"AND order_total > 100.0 " +

280

"LIMIT 1000"

281

);

282

283

// The HiveTableSource will automatically apply:

284

// - Partition pushdown (partition_date >= '2023-01-01')

285

// - Projection pushdown (only customer_id, order_total columns)

286

// - Limit pushdown (LIMIT 1000)

287

288

result.execute().print();

289

```

290

291

```java

292

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

293

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

294

295

// Set up streaming environment for continuous monitoring

296

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

297

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

298

299

// Register Hive catalog

300

HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");

301

tableEnv.registerCatalog("hive", hiveCatalog);

302

tableEnv.useCatalog("hive");

303

304

// Enable continuous partition monitoring

305

tableEnv.getConfig().getConfiguration().setBoolean("streaming-source.enable", true);

306

tableEnv.getConfig().getConfiguration().setString("streaming-source.partition.include", "all");

307

308

// Stream from Hive table with partition monitoring

309

Table stream = tableEnv.sqlQuery(

310

"SELECT event_time, user_id, action " +

311

"FROM hive_catalog.events.user_actions"

312

);

313

314

// Convert to DataStream for further processing

315

DataStream<Row> dataStream = tableEnv.toAppendStream(stream, Row.class);

316

dataStream.print();

317

318

env.execute("Hive Streaming Source Example");

319

```

320

321

## Types

322

323

```java { .api }

324

public class HiveTablePartition {

325

public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, String> partitionSpec);

326

public StorageDescriptor getStorageDescriptor();

327

public Map<String, String> getPartitionSpec();

328

public String getLocation();

329

}

330

331

public class HiveSourceSplit implements SourceSplit {

332

public String splitId();

333

public HiveTableInputSplit getHiveTableInputSplit();

334

}

335

336

public interface ScanContext {

337

DataTypeFactory getDataTypeFactory();

338

}

339

340

public interface LookupContext {

341

String[] getKeys();

342

DataTypeFactory getDataTypeFactory();

343

}

344

345

public interface ScanRuntimeProvider extends DynamicTableSource.RuntimeProvider {

346

// Marker interface for scan providers

347

}

348

349

public interface LookupRuntimeProvider extends DynamicTableSource.RuntimeProvider {

350

// Marker interface for lookup providers

351

}

352

```