or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-operations.mdconfiguration.mdhive-functions.mdindex.mdsource-api.mdtable-sinks.mdtable-sources.md

table-sinks.mddocs/

0

# Table Sinks

1

2

Writing data to Hive tables with support for partitioning, multiple file formats, streaming ingestion with compaction, and seamless integration with Hive metastore for metadata management.

3

4

## Capabilities

5

6

### HiveTableSink

7

8

Primary table sink for writing data to Hive tables with comprehensive partitioning and format support.

9

10

```java { .api }

11

/**

12

* Table sink for writing data to Hive tables

13

* Supports partitioning, overwrite modes, and various file formats

14

*/

15

public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

16

/**

17

* Creates HiveTableSink for writing to Hive tables

18

* @param conf - Flink configuration

19

* @param jobConf - Hadoop job configuration

20

* @param identifier - Table identifier

21

* @param catalogTable - Catalog table metadata

22

* @param configuredParallelism - Configured sink parallelism (can be null)

23

*/

24

public HiveTableSink(ReadableConfig conf, JobConf jobConf, ObjectIdentifier identifier, CatalogTable catalogTable, Integer configuredParallelism);

25

26

/**

27

* Get the sink runtime provider for writing data

28

* @param context - Context for sink operation

29

* @return SinkRuntimeProvider for data stream sink creation

30

*/

31

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

32

33

/**

34

* Get the changelog mode supported by this sink

35

* @return ChangelogMode indicating supported change types

36

*/

37

public ChangelogMode getChangelogMode();

38

39

/**

40

* Copy this sink with different configuration

41

* @return New HiveTableSink instance

42

*/

43

public DynamicTableSink copy();

44

45

/**

46

* Get string summary of this table sink

47

* @return Human-readable description

48

*/

49

public String asSummaryString();

50

}

51

```

52

53

### Partitioning Support

54

55

Interface for configuring partitioned writes to Hive tables.

56

57

```java { .api }

58

/**

59

* Apply static partition specification

60

* Sets fixed partition values for all written records

61

* @param partition - Map of partition key to value

62

* @return New HiveTableSink with static partitioning applied

63

*/

64

public DynamicTableSink applyStaticPartition(Map<String, String> partition);

65

66

/**

67

* Check if sink requires partition grouping

68

* @param supportsGrouping - Whether grouping is supported by the runtime

69

* @return true if partition grouping is required

70

*/

71

public boolean requiresPartitionGrouping(boolean supportsGrouping);

72

```

73

74

### Overwrite Support

75

76

Interface for configuring overwrite behavior when writing to existing data.

77

78

```java { .api }

79

/**

80

* Apply overwrite mode configuration

81

* Controls whether existing data should be overwritten

82

* @param overwrite - true to enable overwrite mode

83

* @return New DynamicTableSink with overwrite configuration

84

*/

85

public DynamicTableSink applyOverwrite(boolean overwrite);

86

```

87

88

### Writer Factory Classes

89

90

Factory classes for creating writers for different file formats and configurations.

91

92

```java { .api }

93

/**

94

* Factory for creating Hive bulk writers

95

* Handles format-specific writer creation and configuration

96

*/

97

public class HiveBulkWriterFactory implements BulkWriter.Factory<RowData> {

98

/**

99

* Create HiveBulkWriterFactory for specific format

100

* @param jobConf - Hadoop job configuration

101

* @param tableSchema - Schema of the table

102

* @param hiveShim - Hive version compatibility shim

103

* @param isCompressed - Whether output should be compressed

104

*/

105

public HiveBulkWriterFactory(JobConf jobConf, TableSchema tableSchema, HiveShim hiveShim, boolean isCompressed);

106

107

/**

108

* Create bulk writer for given file

109

* @param out - Output stream to write to

110

* @return BulkWriter instance for writing records

111

* @throws IOException if writer creation fails

112

*/

113

public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;

114

}

115

116

/**

117

* Factory for creating Hive output formats

118

* Provides MapReduce-compatible output format creation

119

*/

120

public class HiveOutputFormatFactory {

121

/**

122

* Create output format for Hive table

123

* @param jobConf - Hadoop job configuration

124

* @param catalogTable - Catalog table metadata

125

* @param storageDescriptor - Hive table storage descriptor

126

* @param partitionSpec - Partition specification (can be empty)

127

* @param isOverwrite - Whether to overwrite existing data

128

* @return OutputFormat instance for writing

129

*/

130

public static OutputFormat<NullWritable, RowData> createOutputFormat(

131

JobConf jobConf,

132

CatalogTable catalogTable,

133

StorageDescriptor storageDescriptor,

134

Map<String, String> partitionSpec,

135

boolean isOverwrite

136

);

137

}

138

139

/**

140

* Factory for creating generic Hive writers

141

* Abstracts writer creation across different formats

142

*/

143

public class HiveWriterFactory implements WriterFactory<RowData> {

144

/**

145

* Create HiveWriterFactory with configuration

146

* @param jobConf - Hadoop job configuration

147

* @param catalogTable - Catalog table metadata

148

* @param isOverwrite - Whether to overwrite existing data

149

* @param staticPartSpec - Static partition specification

150

*/

151

public HiveWriterFactory(JobConf jobConf, CatalogTable catalogTable, boolean isOverwrite, LinkedHashMap<String, String> staticPartSpec);

152

153

/**

154

* Create writer for specific partition

155

* @param context - Writer context with partition info

156

* @return Writer instance for the partition

157

* @throws IOException if writer creation fails

158

*/

159

public Writer<RowData> createWriter(WriterInitContext context) throws IOException;

160

}

161

```

162

163

### Streaming Sink Configuration

164

165

Configuration classes for streaming sinks with file rolling and compaction.

166

167

```java { .api }

168

/**

169

* Configure streaming file sink with rolling policies

170

* @param basePath - Base path for writing files

171

* @param writerFactory - Factory for creating bulk writers

172

* @param bucketAssigner - Function to assign records to buckets/partitions

173

* @param rollingPolicy - Policy for when to roll files

174

* @param outputFileConfig - Configuration for output file naming

175

* @return Configured StreamingFileSink

176

*/

177

public static <T> StreamingFileSink<T> createStreamingSink(

178

Path basePath,

179

BulkWriter.Factory<T> writerFactory,

180

BucketAssigner<T, String> bucketAssigner,

181

RollingPolicy<T, String> rollingPolicy,

182

OutputFileConfig outputFileConfig

183

);

184

```

185

186

### Partition and Bucket Assignment

187

188

Classes for managing data distribution across partitions and files.

189

190

```java { .api }

191

/**

192

* Assigns records to partition buckets based on partition keys

193

*/

194

public class HiveRowDataPartitionComputer implements PartitionComputer<RowData> {

195

/**

196

* Create partition computer for Hive table

197

* @param hiveShim - Hive version compatibility shim

198

* @param defaultPartName - Default name for null partition values

199

* @param fieldNames - Names of all table fields

200

* @param fieldTypes - Types of all table fields

201

* @param partitionColumns - Names of partition columns

202

*/

203

public HiveRowDataPartitionComputer(HiveShim hiveShim, String defaultPartName, String[] fieldNames, DataType[] fieldTypes, String[] partitionColumns);

204

205

/**

206

* Generate partition path for given record

207

* @param in - Input record

208

* @return Partition path string

209

*/

210

public String generatePartValues(RowData in);

211

}

212

213

/**

214

* Legacy partition computer for Row objects

215

*/

216

public class HiveRowPartitionComputer implements PartitionComputer<Row> {

217

public HiveRowPartitionComputer(HiveShim hiveShim, String defaultPartName, String[] fieldNames, TypeInformation<?>[] fieldTypes, String[] partitionColumns);

218

public String generatePartValues(Row in);

219

}

220

```

221

222

### File and Checkpoint Management

223

224

Classes for managing file lifecycle and streaming checkpoints.

225

226

```java { .api }

227

/**

228

* Configuration for output file naming

229

*/

230

public class OutputFileConfig {

231

/**

232

* Create output file configuration

233

* @param partPrefix - Prefix for part files

234

* @param partSuffix - Suffix for part files

235

*/

236

public OutputFileConfig(String partPrefix, String partSuffix);

237

238

public String getPartPrefix();

239

public String getPartSuffix();

240

}

241

242

/**

243

* Rolling policy based on checkpoints

244

* Files are rolled when checkpoints occur

245

*/

246

public class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {

247

/**

248

* Check if file should be rolled

249

* @param partFileState - Current state of the part file

250

* @param element - Current element being processed

251

* @param processingTime - Current processing time

252

* @return true if file should be rolled

253

*/

254

public boolean shouldRollOnEvent(PartFileInfo partFileState, IN element, long processingTime);

255

256

/**

257

* Check if file should be rolled on processing time

258

* @param partFileState - Current state of the part file

259

* @param processingTime - Current processing time

260

* @return true if file should be rolled

261

*/

262

public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long processingTime);

263

}

264

```

265

266

**Usage Examples:**

267

268

```java

269

import org.apache.flink.table.api.TableEnvironment;

270

import org.apache.flink.table.catalog.hive.HiveCatalog;

271

272

// Set up table environment with Hive catalog

273

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

274

HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");

275

tableEnv.registerCatalog("hive", hiveCatalog);

276

tableEnv.useCatalog("hive");

277

278

// Insert data into partitioned Hive table

279

tableEnv.executeSql(

280

"INSERT INTO hive_catalog.sales.orders " +

281

"PARTITION (year='2023', month='12') " +

282

"SELECT order_id, customer_id, order_total, order_date " +

283

"FROM hive_catalog.staging.raw_orders " +

284

"WHERE YEAR(order_date) = 2023 AND MONTH(order_date) = 12"

285

);

286

287

// Overwrite existing partition

288

tableEnv.executeSql(

289

"INSERT OVERWRITE hive_catalog.sales.daily_summary " +

290

"PARTITION (date_key='2023-12-01') " +

291

"SELECT customer_id, SUM(order_total) as total_sales " +

292

"FROM hive_catalog.sales.orders " +

293

"WHERE DATE(order_date) = '2023-12-01' " +

294

"GROUP BY customer_id"

295

);

296

```

297

298

```java

299

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

300

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

301

302

// Set up streaming environment for continuous writing

303

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

304

env.enableCheckpointing(60000); // Enable checkpointing for file rolling

305

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

306

307

// Register Hive catalog

308

HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");

309

tableEnv.registerCatalog("hive", hiveCatalog);

310

tableEnv.useCatalog("hive");

311

312

// Create source table from Kafka

313

tableEnv.executeSql(

314

"CREATE TABLE kafka_orders (" +

315

" order_id BIGINT," +

316

" customer_id BIGINT," +

317

" order_total DECIMAL(10,2)," +

318

" order_time TIMESTAMP(3)," +

319

" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +

320

") WITH (" +

321

" 'connector' = 'kafka'," +

322

" 'topic' = 'orders'," +

323

" 'properties.bootstrap.servers' = 'localhost:9092'," +

324

" 'format' = 'json'" +

325

")"

326

);

327

328

// Stream data to partitioned Hive table

329

tableEnv.executeSql(

330

"INSERT INTO hive_catalog.sales.streaming_orders " +

331

"SELECT " +

332

" order_id," +

333

" customer_id," +

334

" order_total," +

335

" order_time," +

336

" DATE_FORMAT(order_time, 'yyyy-MM-dd') as partition_date " +

337

"FROM kafka_orders"

338

);

339

340

env.execute("Hive Streaming Sink Example");

341

```

342

343

## Types

344

345

```java { .api }

346

public interface PartitionComputer<T> {

347

/**

348

* Generate partition values for a record

349

* @param record - Input record to compute partition for

350

* @return Partition path string

351

*/

352

String generatePartValues(T record);

353

}

354

355

public interface BucketAssigner<IN, BucketID> {

356

/**

357

* Assign record to a bucket

358

* @param element - Input element

359

* @param context - Processing context

360

* @return Bucket identifier

361

*/

362

BucketID getBucketId(IN element, Context context);

363

}

364

365

public interface WriterFactory<IN> {

366

/**

367

* Create writer for given context

368

* @param context - Writer initialization context

369

* @return Writer instance

370

* @throws IOException if creation fails

371

*/

372

Writer<IN> createWriter(WriterInitContext context) throws IOException;

373

}

374

375

public interface Writer<IN> {

376

/**

377

* Write element to output

378

* @param element - Element to write

379

* @param context - Processing context

380

* @throws IOException if write fails

381

*/

382

void write(IN element, Context context) throws IOException, InterruptedException;

383

}

384

385

public class ObjectIdentifier {

386

public ObjectIdentifier(String catalogName, String databaseName, String objectName);

387

public String getCatalogName();

388

public String getDatabaseName();

389

public String getObjectName();

390

}

391

392

public interface SinkRuntimeProvider extends DynamicTableSink.RuntimeProvider {

393

// Marker interface for sink providers

394

}

395

```