or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-operations.mdconfiguration.mdhive-functions.mdindex.mdsource-api.mdtable-sinks.mdtable-sources.md

source-api.mddocs/

0

# Source API

1

2

New Source API implementation for Hive tables providing enhanced control over split enumeration and reading with support for continuous partition monitoring, parallel reading, and efficient split management.

3

4

## Capabilities

5

6

### HiveSource

7

8

Main Source API implementation for reading Hive tables that extends AbstractFileSource and uses the new Flink Source interface.

9

10

```java { .api }

11

/**

12

* A unified data source that reads a hive table. HiveSource works on HiveSourceSplit and

13

* uses BulkFormat to read the data. A built-in BulkFormat is provided to return records in

14

* type of RowData. It's also possible to implement a custom BulkFormat to return data in

15

* different types. Use HiveSourceBuilder to build HiveSource instances.

16

*

17

* @param <T> the type of record returned by this source

18

*/

19

@PublicEvolving

20

public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {

21

/**

22

* Package-private constructor used by HiveSourceBuilder

23

* @param inputPaths - Array of input paths (typically contains single dummy path)

24

* @param fileEnumerator - Provider for file enumeration

25

* @param splitAssigner - Provider for split assignment

26

* @param readerFormat - BulkFormat for reading records

27

* @param continuousEnumerationSettings - Settings for continuous monitoring (null for batch)

28

* @param jobConf - Hadoop JobConf with Hive configurations

29

* @param tablePath - ObjectPath identifying the Hive table

30

* @param partitionKeys - List of partition key names

31

* @param fetcher - Continuous partition fetcher for streaming mode (can be null)

32

* @param fetcherContext - Context for continuous partition fetching (can be null)

33

*/

34

HiveSource(

35

Path[] inputPaths,

36

FileEnumerator.Provider fileEnumerator,

37

FileSplitAssigner.Provider splitAssigner,

38

BulkFormat<T, HiveSourceSplit> readerFormat,

39

@Nullable ContinuousEnumerationSettings continuousEnumerationSettings,

40

JobConf jobConf,

41

ObjectPath tablePath,

42

List<String> partitionKeys,

43

@Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,

44

@Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext);

45

46

/**

47

* Get serializer for HiveSourceSplit objects

48

* @return Serializer instance for splits

49

*/

50

@Override

51

public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();

52

53

/**

54

* Get serializer for enumerator checkpoints

55

* @return Serializer for checkpoint state

56

*/

57

@Override

58

public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>>

59

getEnumeratorCheckpointSerializer();

60

61

/**

62

* Create split enumerator for discovering and assigning splits

63

* @param enumContext - Context for enumerator creation

64

* @return SplitEnumerator instance for managing splits

65

*/

66

@Override

67

public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>>

68

createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);

69

70

/**

71

* Create split enumerator from checkpoint state

72

* @param enumContext - Context for enumerator creation

73

* @param checkpoint - Checkpoint state to restore from

74

* @return SplitEnumerator instance restored from checkpoint

75

*/

76

@Override

77

public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>>

78

restoreEnumerator(

79

SplitEnumeratorContext<HiveSourceSplit> enumContext,

80

PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);

81

}

82

```

83

84

### HiveSourceBuilder

85

86

Builder pattern for creating configured HiveSource instances with validation and setup.

87

88

```java { .api }

89

/**

90

* Builder to build HiveSource instances.

91

*/

92

@PublicEvolving

93

public class HiveSourceBuilder {

94

/**

95

* Creates a builder to read a hive table using metastore information

96

* @param jobConf - Holds hive and hadoop configurations

97

* @param flinkConf - Holds flink configurations

98

* @param hiveVersion - The version of hive in use, if null will be auto-detected

99

* @param dbName - The name of the database the table belongs to

100

* @param tableName - The name of the table

101

* @param tableOptions - Additional options needed to read the table, which take precedence over table properties stored in metastore

102

*/

103

public HiveSourceBuilder(

104

@Nonnull JobConf jobConf,

105

@Nonnull ReadableConfig flinkConf,

106

@Nullable String hiveVersion,

107

@Nonnull String dbName,

108

@Nonnull String tableName,

109

@Nonnull Map<String, String> tableOptions);

110

111

/**

112

* Creates a builder to read a hive table using catalog table information

113

* @param jobConf - Holds hive and hadoop configurations

114

* @param flinkConf - Holds flink configurations

115

* @param tablePath - Path of the table to be read

116

* @param hiveVersion - The version of hive in use, if null will be auto-detected

117

* @param catalogTable - The table to be read

118

*/

119

public HiveSourceBuilder(

120

@Nonnull JobConf jobConf,

121

@Nonnull ReadableConfig flinkConf,

122

@Nonnull ObjectPath tablePath,

123

@Nullable String hiveVersion,

124

@Nonnull CatalogTable catalogTable);

125

126

/**

127

* Sets the partitions to read in batch mode. By default, batch source reads all partitions in a hive table.

128

* @param partitions - List of specific partitions to read

129

* @return Builder instance for chaining

130

*/

131

public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

132

133

/**

134

* Sets the maximum number of records this source should return

135

* @param limit - Maximum number of records to read

136

* @return Builder instance for chaining

137

*/

138

public HiveSourceBuilder setLimit(Long limit);

139

140

/**

141

* Sets the indices of projected fields

142

* @param projectedFields - Indices of the fields to project, starting from 0

143

* @return Builder instance for chaining

144

*/

145

public HiveSourceBuilder setProjectedFields(int[] projectedFields);

146

147

/**

148

* Builds HiveSource with default built-in BulkFormat that returns records in type of RowData

149

* @return HiveSource configured for RowData output

150

*/

151

public HiveSource<RowData> buildWithDefaultBulkFormat();

152

153

/**

154

* Builds HiveSource with custom BulkFormat

155

* @param bulkFormat - Custom BulkFormat for reading records

156

* @return HiveSource configured with the provided BulkFormat

157

*/

158

public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);

159

}

160

```

161

162

### Split Management

163

164

Classes for managing source splits and their lifecycle.

165

166

```java { .api }

167

/**

168

* A sub-class of FileSourceSplit that contains extra information needed to read a hive table.

169

*/

170

@PublicEvolving

171

public class HiveSourceSplit extends FileSourceSplit {

172

/**

173

* Create HiveSourceSplit from Hadoop FileSplit

174

* @param fileSplit - Hadoop FileSplit containing file and offset information

175

* @param hiveTablePartition - Hive table partition metadata

176

* @param readerPosition - Current reader position for checkpointing (can be null)

177

* @throws IOException if split creation fails

178

*/

179

public HiveSourceSplit(

180

FileSplit fileSplit,

181

HiveTablePartition hiveTablePartition,

182

@Nullable CheckpointedPosition readerPosition) throws IOException;

183

184

/**

185

* Create HiveSourceSplit with explicit parameters

186

* @param id - Unique identifier for this split

187

* @param filePath - Path to the file this split reads

188

* @param offset - Start position in the file

189

* @param length - Length of data to read

190

* @param hostnames - Preferred hosts for reading this split

191

* @param readerPosition - Current reader position for checkpointing (can be null)

192

* @param hiveTablePartition - Hive table partition metadata

193

*/

194

public HiveSourceSplit(

195

String id,

196

Path filePath,

197

long offset,

198

long length,

199

String[] hostnames,

200

@Nullable CheckpointedPosition readerPosition,

201

HiveTablePartition hiveTablePartition);

202

203

/**

204

* Get Hive table partition metadata for this split

205

* @return HiveTablePartition containing partition information

206

*/

207

public HiveTablePartition getHiveTablePartition();

208

209

/**

210

* Convert this split to MapReduce FileSplit format

211

* @return FileSplit compatible with Hadoop MapReduce API

212

*/

213

public FileSplit toMapRedSplit();

214

215

/**

216

* Update split with new checkpointed position

217

* @param position - New checkpointed position (can be null)

218

* @return New HiveSourceSplit with updated position

219

*/

220

@Override

221

public FileSourceSplit updateWithCheckpointedPosition(@Nullable CheckpointedPosition position);

222

}

223

224

/**

225

* Serializer for HiveSourceSplit objects

226

*/

227

public class HiveSourceSplitSerializer implements SimpleVersionedSerializer<HiveSourceSplit> {

228

public static final HiveSourceSplitSerializer INSTANCE = new HiveSourceSplitSerializer();

229

230

/**

231

* Get serializer version

232

* @return Version number for compatibility

233

*/

234

@Override

235

public int getVersion();

236

237

/**

238

* Serialize split to bytes

239

* @param split - Split to serialize

240

* @return Serialized bytes

241

* @throws IOException if serialization fails

242

*/

243

@Override

244

public byte[] serialize(HiveSourceSplit split) throws IOException;

245

246

/**

247

* Deserialize split from bytes

248

* @param version - Serializer version

249

* @param serialized - Serialized bytes

250

* @return Deserialized split

251

* @throws IOException if deserialization fails

252

*/

253

@Override

254

public HiveSourceSplit deserialize(int version, byte[] serialized) throws IOException;

255

}

256

```

257

258

**Usage Examples:**

259

260

```java

261

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

262

import org.apache.flink.connectors.hive.HiveSource;

263

import org.apache.flink.connectors.hive.HiveSourceBuilder;

264

import org.apache.flink.configuration.Configuration;

265

import org.apache.hadoop.mapred.JobConf;

266

267

// Create streaming environment

268

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

269

270

// Configure Hive source with Source API for streaming

271

JobConf jobConf = new JobConf();

272

// Configure jobConf with Hive settings...

273

274

Configuration flinkConf = new Configuration();

275

// Set streaming source properties

276

flinkConf.setString("streaming-source.enable", "true");

277

flinkConf.setString("streaming-source.monitor-interval", "1 min");

278

279

Map<String, String> tableOptions = new HashMap<>();

280

tableOptions.put("streaming-source.enable", "true");

281

tableOptions.put("streaming-source.monitor-interval", "1 min");

282

283

HiveSource<RowData> hiveSource = new HiveSourceBuilder(

284

jobConf,

285

flinkConf,

286

null, // auto-detect Hive version

287

"default",

288

"streaming_events",

289

tableOptions)

290

.setProjectedFields(new int[]{0, 1, 3}) // Project specific columns

291

.buildWithDefaultBulkFormat();

292

293

// Create data stream from Hive source

294

DataStreamSource<RowData> stream = env.fromSource(

295

hiveSource,

296

WatermarkStrategy.noWatermarks(),

297

"hive-source"

298

);

299

300

// Process the stream

301

stream

302

.map(new ProcessRowDataFunction())

303

.print();

304

305

env.execute("Hive Source API Example");

306

```

307

308

```java

309

// Batch reading with Source API

310

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

311

312

// Configure specific partitions to read

313

List<HiveTablePartition> partitions = Arrays.asList(

314

// Create partitions using HivePartitionUtils or other methods

315

);

316

317

HiveSource<RowData> batchSource = new HiveSourceBuilder(

318

jobConf,

319

flinkConf,

320

null, // auto-detect Hive version

321

"sales",

322

"orders",

323

new HashMap<>())

324

.setPartitions(partitions)

325

.setLimit(10000L) // Limit to 10k records

326

.buildWithDefaultBulkFormat();

327

328

DataStreamSource<RowData> batchStream = env.fromSource(

329

batchSource,

330

WatermarkStrategy.noWatermarks(),

331

"hive-batch-source"

332

);

333

334

batchStream.print();

335

env.execute("Hive Batch Source Example");

336

```

337

338

## Types

339

340

```java { .api }

341

public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>

342

implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {

343

/**

344

* Get boundedness of this source

345

* @return Boundedness.BOUNDED for batch mode, CONTINUOUS_UNBOUNDED for streaming

346

*/

347

@Override

348

public Boundedness getBoundedness();

349

350

/**

351

* Create source reader

352

* @param readerContext - Reader context provided by Flink runtime

353

* @return SourceReader instance for reading splits

354

*/

355

@Override

356

public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);

357

358

/**

359

* Create split enumerator

360

* @param enumContext - Enumerator context provided by Flink runtime

361

* @return SplitEnumerator instance for managing splits

362

* @throws Exception if creation fails

363

*/

364

@Override

365

public abstract SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(

366

SplitEnumeratorContext<SplitT> enumContext);

367

368

/**

369

* Restore split enumerator from checkpoint

370

* @param enumContext - Enumerator context provided by Flink runtime

371

* @param checkpoint - Checkpoint state to restore from

372

* @return SplitEnumerator instance restored from checkpoint

373

*/

374

@Override

375

public abstract SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> restoreEnumerator(

376

SplitEnumeratorContext<SplitT> enumContext,

377

PendingSplitsCheckpoint<SplitT> checkpoint);

378

379

/**

380

* Get split serializer

381

* @return Serializer for split objects

382

*/

383

@Override

384

public abstract SimpleVersionedSerializer<SplitT> getSplitSerializer();

385

386

/**

387

* Get checkpoint serializer

388

* @return Serializer for checkpoint objects

389

*/

390

@Override

391

public SimpleVersionedSerializer<PendingSplitsCheckpoint<SplitT>>

392

getEnumeratorCheckpointSerializer();

393

394

/**

395

* Get type information for produced records

396

* @return TypeInformation for the output type

397

*/

398

@Override

399

public TypeInformation<T> getProducedType();

400

}

401

402

public class FileSourceSplit implements SourceSplit {

403

/**

404

* Get split identifier

405

* @return Unique split ID string

406

*/

407

@Override

408

public String splitId();

409

410

/**

411

* Get file path for this split

412

* @return Path to the file

413

*/

414

public Path path();

415

416

/**

417

* Get start position in file

418

* @return Start byte position

419

*/

420

public long offset();

421

422

/**

423

* Get length of data to read

424

* @return Length in bytes

425

*/

426

public long length();

427

428

/**

429

* Get preferred hosts for locality

430

* @return Array of host names

431

*/

432

public String[] hostnames();

433

434

/**

435

* Get current reader position for checkpointing

436

* @return Optional containing checkpointed position

437

*/

438

public Optional<CheckpointedPosition> getReaderPosition();

439

}

440

441

public enum Boundedness {

442

BOUNDED,

443

CONTINUOUS_UNBOUNDED

444

}

445

```