0
# Table Sinks
1
2
Writing data to Hive tables with support for partitioning, multiple file formats, streaming ingestion with compaction, and seamless integration with Hive metastore for metadata management.
3
4
## Capabilities
5
6
### HiveTableSink
7
8
Primary table sink for writing data to Hive tables with comprehensive partitioning and format support.
9
10
```java { .api }
11
/**
12
* Table sink for writing data to Hive tables
13
* Supports partitioning, overwrite modes, and various file formats
14
*/
15
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
16
/**
17
* Creates HiveTableSink for writing to Hive tables
18
* @param conf - Flink configuration
19
* @param jobConf - Hadoop job configuration
20
* @param identifier - Table identifier
21
* @param catalogTable - Catalog table metadata
22
* @param configuredParallelism - Configured sink parallelism (can be null)
23
*/
24
public HiveTableSink(ReadableConfig conf, JobConf jobConf, ObjectIdentifier identifier, CatalogTable catalogTable, Integer configuredParallelism);
25
26
/**
27
* Get the sink runtime provider for writing data
28
* @param context - Context for sink operation
29
* @return SinkRuntimeProvider for data stream sink creation
30
*/
31
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
32
33
/**
34
* Get the changelog mode supported by this sink
35
* @return ChangelogMode indicating supported change types
36
*/
37
public ChangelogMode getChangelogMode();
38
39
/**
40
* Copy this sink with different configuration
41
* @return New HiveTableSink instance
42
*/
43
public DynamicTableSink copy();
44
45
/**
46
* Get string summary of this table sink
47
* @return Human-readable description
48
*/
49
public String asSummaryString();
50
}
51
```
52
53
### Partitioning Support
54
55
Interface for configuring partitioned writes to Hive tables.
56
57
```java { .api }
58
/**
59
* Apply static partition specification
60
* Sets fixed partition values for all written records
61
* @param partition - Map of partition key to value
62
* @return New HiveTableSink with static partitioning applied
63
*/
64
public DynamicTableSink applyStaticPartition(Map<String, String> partition);
65
66
/**
67
* Check if sink requires partition grouping
68
* @param supportsGrouping - Whether grouping is supported by the runtime
69
* @return true if partition grouping is required
70
*/
71
public boolean requiresPartitionGrouping(boolean supportsGrouping);
72
```
73
74
### Overwrite Support
75
76
Interface for configuring overwrite behavior when writing to existing data.
77
78
```java { .api }
79
/**
80
* Apply overwrite mode configuration
81
* Controls whether existing data should be overwritten
82
* @param overwrite - true to enable overwrite mode
83
* @return New DynamicTableSink with overwrite configuration
84
*/
85
public DynamicTableSink applyOverwrite(boolean overwrite);
86
```
87
88
### Writer Factory Classes
89
90
Factory classes for creating writers for different file formats and configurations.
91
92
```java { .api }
93
/**
94
* Factory for creating Hive bulk writers
95
* Handles format-specific writer creation and configuration
96
*/
97
public class HiveBulkWriterFactory implements BulkWriter.Factory<RowData> {
98
/**
99
* Create HiveBulkWriterFactory for specific format
100
* @param jobConf - Hadoop job configuration
101
* @param tableSchema - Schema of the table
102
* @param hiveShim - Hive version compatibility shim
103
* @param isCompressed - Whether output should be compressed
104
*/
105
public HiveBulkWriterFactory(JobConf jobConf, TableSchema tableSchema, HiveShim hiveShim, boolean isCompressed);
106
107
/**
108
* Create bulk writer for given file
109
* @param out - Output stream to write to
110
* @return BulkWriter instance for writing records
111
* @throws IOException if writer creation fails
112
*/
113
public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;
114
}
115
116
/**
117
* Factory for creating Hive output formats
118
* Provides MapReduce-compatible output format creation
119
*/
120
public class HiveOutputFormatFactory {
121
/**
122
* Create output format for Hive table
123
* @param jobConf - Hadoop job configuration
124
* @param catalogTable - Catalog table metadata
125
* @param storageDescriptor - Hive table storage descriptor
126
* @param partitionSpec - Partition specification (can be empty)
127
* @param isOverwrite - Whether to overwrite existing data
128
* @return OutputFormat instance for writing
129
*/
130
public static OutputFormat<NullWritable, RowData> createOutputFormat(
131
JobConf jobConf,
132
CatalogTable catalogTable,
133
StorageDescriptor storageDescriptor,
134
Map<String, String> partitionSpec,
135
boolean isOverwrite
136
);
137
}
138
139
/**
140
* Factory for creating generic Hive writers
141
* Abstracts writer creation across different formats
142
*/
143
public class HiveWriterFactory implements WriterFactory<RowData> {
144
/**
145
* Create HiveWriterFactory with configuration
146
* @param jobConf - Hadoop job configuration
147
* @param catalogTable - Catalog table metadata
148
* @param isOverwrite - Whether to overwrite existing data
149
* @param staticPartSpec - Static partition specification
150
*/
151
public HiveWriterFactory(JobConf jobConf, CatalogTable catalogTable, boolean isOverwrite, LinkedHashMap<String, String> staticPartSpec);
152
153
/**
154
* Create writer for specific partition
155
* @param context - Writer context with partition info
156
* @return Writer instance for the partition
157
* @throws IOException if writer creation fails
158
*/
159
public Writer<RowData> createWriter(WriterInitContext context) throws IOException;
160
}
161
```
162
163
### Streaming Sink Configuration
164
165
Configuration classes for streaming sinks with file rolling and compaction.
166
167
```java { .api }
168
/**
169
* Configure streaming file sink with rolling policies
170
* @param basePath - Base path for writing files
171
* @param writerFactory - Factory for creating bulk writers
172
* @param bucketAssigner - Function to assign records to buckets/partitions
173
* @param rollingPolicy - Policy for when to roll files
174
* @param outputFileConfig - Configuration for output file naming
175
* @return Configured StreamingFileSink
176
*/
177
public static <T> StreamingFileSink<T> createStreamingSink(
178
Path basePath,
179
BulkWriter.Factory<T> writerFactory,
180
BucketAssigner<T, String> bucketAssigner,
181
RollingPolicy<T, String> rollingPolicy,
182
OutputFileConfig outputFileConfig
183
);
184
```
185
186
### Partition and Bucket Assignment
187
188
Classes for managing data distribution across partitions and files.
189
190
```java { .api }
191
/**
192
* Assigns records to partition buckets based on partition keys
193
*/
194
public class HiveRowDataPartitionComputer implements PartitionComputer<RowData> {
195
/**
196
* Create partition computer for Hive table
197
* @param hiveShim - Hive version compatibility shim
198
* @param defaultPartName - Default name for null partition values
199
* @param fieldNames - Names of all table fields
200
* @param fieldTypes - Types of all table fields
201
* @param partitionColumns - Names of partition columns
202
*/
203
public HiveRowDataPartitionComputer(HiveShim hiveShim, String defaultPartName, String[] fieldNames, DataType[] fieldTypes, String[] partitionColumns);
204
205
/**
206
* Generate partition path for given record
207
* @param in - Input record
208
* @return Partition path string
209
*/
210
public String generatePartValues(RowData in);
211
}
212
213
/**
214
* Legacy partition computer for Row objects
215
*/
216
public class HiveRowPartitionComputer implements PartitionComputer<Row> {
217
public HiveRowPartitionComputer(HiveShim hiveShim, String defaultPartName, String[] fieldNames, TypeInformation<?>[] fieldTypes, String[] partitionColumns);
218
public String generatePartValues(Row in);
219
}
220
```
221
222
### File and Checkpoint Management
223
224
Classes for managing file lifecycle and streaming checkpoints.
225
226
```java { .api }
227
/**
228
* Configuration for output file naming
229
*/
230
public class OutputFileConfig {
231
/**
232
* Create output file configuration
233
* @param partPrefix - Prefix for part files
234
* @param partSuffix - Suffix for part files
235
*/
236
public OutputFileConfig(String partPrefix, String partSuffix);
237
238
public String getPartPrefix();
239
public String getPartSuffix();
240
}
241
242
/**
243
* Rolling policy based on checkpoints
244
* Files are rolled when checkpoints occur
245
*/
246
public class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
247
/**
248
* Check if file should be rolled
249
* @param partFileState - Current state of the part file
250
* @param element - Current element being processed
251
* @param processingTime - Current processing time
252
* @return true if file should be rolled
253
*/
254
public boolean shouldRollOnEvent(PartFileInfo partFileState, IN element, long processingTime);
255
256
/**
257
* Check if file should be rolled on processing time
258
* @param partFileState - Current state of the part file
259
* @param processingTime - Current processing time
260
* @return true if file should be rolled
261
*/
262
public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long processingTime);
263
}
264
```
265
266
**Usage Examples:**
267
268
```java
269
import org.apache.flink.table.api.TableEnvironment;
270
import org.apache.flink.table.catalog.hive.HiveCatalog;
271
272
// Set up table environment with Hive catalog
273
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
274
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
275
tableEnv.registerCatalog("hive", hiveCatalog);
276
tableEnv.useCatalog("hive");
277
278
// Insert data into partitioned Hive table
279
tableEnv.executeSql(
280
"INSERT INTO hive_catalog.sales.orders " +
281
"PARTITION (year='2023', month='12') " +
282
"SELECT order_id, customer_id, order_total, order_date " +
283
"FROM hive_catalog.staging.raw_orders " +
284
"WHERE YEAR(order_date) = 2023 AND MONTH(order_date) = 12"
285
);
286
287
// Overwrite existing partition
288
tableEnv.executeSql(
289
"INSERT OVERWRITE hive_catalog.sales.daily_summary " +
290
"PARTITION (date_key='2023-12-01') " +
291
"SELECT customer_id, SUM(order_total) as total_sales " +
292
"FROM hive_catalog.sales.orders " +
293
"WHERE DATE(order_date) = '2023-12-01' " +
294
"GROUP BY customer_id"
295
);
296
```
297
298
```java
299
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
300
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
301
302
// Set up streaming environment for continuous writing
303
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
304
env.enableCheckpointing(60000); // Enable checkpointing for file rolling
305
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
306
307
// Register Hive catalog
308
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
309
tableEnv.registerCatalog("hive", hiveCatalog);
310
tableEnv.useCatalog("hive");
311
312
// Create source table from Kafka
313
tableEnv.executeSql(
314
"CREATE TABLE kafka_orders (" +
315
" order_id BIGINT," +
316
" customer_id BIGINT," +
317
" order_total DECIMAL(10,2)," +
318
" order_time TIMESTAMP(3)," +
319
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
320
") WITH (" +
321
" 'connector' = 'kafka'," +
322
" 'topic' = 'orders'," +
323
" 'properties.bootstrap.servers' = 'localhost:9092'," +
324
" 'format' = 'json'" +
325
")"
326
);
327
328
// Stream data to partitioned Hive table
329
tableEnv.executeSql(
330
"INSERT INTO hive_catalog.sales.streaming_orders " +
331
"SELECT " +
332
" order_id," +
333
" customer_id," +
334
" order_total," +
335
" order_time," +
336
" DATE_FORMAT(order_time, 'yyyy-MM-dd') as partition_date " +
337
"FROM kafka_orders"
338
);
339
340
env.execute("Hive Streaming Sink Example");
341
```
342
343
## Types
344
345
```java { .api }
346
public interface PartitionComputer<T> {
347
/**
348
* Generate partition values for a record
349
* @param record - Input record to compute partition for
350
* @return Partition path string
351
*/
352
String generatePartValues(T record);
353
}
354
355
public interface BucketAssigner<IN, BucketID> {
356
/**
357
* Assign record to a bucket
358
* @param element - Input element
359
* @param context - Processing context
360
* @return Bucket identifier
361
*/
362
BucketID getBucketId(IN element, Context context);
363
}
364
365
public interface WriterFactory<IN> {
366
/**
367
* Create writer for given context
368
* @param context - Writer initialization context
369
* @return Writer instance
370
* @throws IOException if creation fails
371
*/
372
Writer<IN> createWriter(WriterInitContext context) throws IOException;
373
}
374
375
public interface Writer<IN> {
376
/**
377
* Write element to output
378
* @param element - Element to write
379
* @param context - Processing context
380
* @throws IOException if write fails
381
*/
382
void write(IN element, Context context) throws IOException, InterruptedException;
383
}
384
385
public class ObjectIdentifier {
386
public ObjectIdentifier(String catalogName, String databaseName, String objectName);
387
public String getCatalogName();
388
public String getDatabaseName();
389
public String getObjectName();
390
}
391
392
public interface SinkRuntimeProvider extends DynamicTableSink.RuntimeProvider {
393
// Marker interface for sink providers
394
}
395
```