0
# Configuration
1
2
Comprehensive configuration system for tuning Hive connector behavior, performance optimization, and feature toggles. The configuration options control various aspects of data reading, writing, and metadata operations.
3
4
## Capabilities
5
6
### HiveOptions
7
8
Main configuration class containing all Hive connector-specific options.
9
10
```java { .api }
11
/**
12
* Configuration options for Hive connector behavior
13
*/
14
public class HiveOptions {
15
16
/** Fallback to MapReduce reader when vectorized reader fails */
17
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
18
19
/** Automatically infer source parallelism based on file splits */
20
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
21
22
/** Maximum parallelism when inferring source parallelism */
23
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
24
25
/** Fallback to MapReduce writer when vectorized writer fails */
26
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
27
}
28
```
29
30
## Configuration Categories
31
32
### Source Configuration
33
34
Options for controlling how data is read from Hive tables.
35
36
```java { .api }
37
/** Enable fallback to MapReduce reader for compatibility */
38
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =
39
ConfigOptions.key("table.exec.hive.fallback-mapred-reader")
40
.booleanType()
41
.defaultValue(false)
42
.withDescription("Whether to fallback to MapReduce reader when vectorized reader fails");
43
44
/** Automatically infer parallelism from input splits */
45
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
46
ConfigOptions.key("table.exec.hive.infer-source-parallelism")
47
.booleanType()
48
.defaultValue(true)
49
.withDescription("Whether to infer source parallelism based on number of file splits");
50
51
/** Maximum inferred parallelism limit */
52
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
53
ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")
54
.intType()
55
.defaultValue(1000)
56
.withDescription("Maximum parallelism that can be inferred for Hive sources");
57
```
58
59
**Usage Examples:**
60
61
```java
62
import org.apache.flink.configuration.Configuration;
63
import org.apache.flink.table.api.TableEnvironment;
64
65
// Configure table environment
66
Configuration config = new Configuration();
67
68
// Enable MapReduce reader fallback for compatibility
69
config.setBoolean("table.exec.hive.fallback-mapred-reader", true);
70
71
// Configure parallelism inference
72
config.setBoolean("table.exec.hive.infer-source-parallelism", true);
73
config.setInteger("table.exec.hive.infer-source-parallelism.max", 500);
74
75
// Apply configuration to table environment
76
TableEnvironment tableEnv = TableEnvironment.create(
77
EnvironmentSettings.newInstance().withConfiguration(config).build()
78
);
79
```
80
81
### Sink Configuration
82
83
Options for controlling how data is written to Hive tables.
84
85
```java { .api }
86
/** Enable fallback to MapReduce writer for compatibility */
87
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
88
ConfigOptions.key("table.exec.hive.fallback-mapred-writer")
89
.booleanType()
90
.defaultValue(true)
91
.withDescription("Whether to fallback to MapReduce writer when vectorized writer fails");
92
```
93
94
**Usage Examples:**
95
96
```java
97
// Enable MapReduce writer fallback
98
config.setBoolean("table.exec.hive.fallback-mapred-writer", true);
99
100
// Write to Hive table with fallback enabled
101
tableEnv.executeSql("INSERT INTO hive_table SELECT * FROM source_table");
102
```
103
104
### Streaming Configuration
105
106
Options specific to streaming mode operations.
107
108
```java
109
// Configure streaming-specific options
110
config.setString("table.exec.source.idle-timeout", "10s");
111
config.setString("partition.discovery.interval-millis", "60000"); // 1 minute
112
config.setString("source.monitor-interval", "30s");
113
```
114
115
### Performance Tuning
116
117
Configuration options for optimizing performance based on workload characteristics.
118
119
```java
120
// Optimize for large batch workloads
121
config.setBoolean("table.exec.hive.infer-source-parallelism", true);
122
config.setInteger("table.exec.hive.infer-source-parallelism.max", 2000);
123
124
// Optimize for small files
125
config.setBoolean("table.exec.hive.fallback-mapred-reader", false); // Use vectorized reader
126
127
// Memory optimization
128
config.setString("table.exec.resource.default-parallelism", "4");
129
```
130
131
## Global Flink Configuration
132
133
### Table API Configuration
134
135
Configure table-level behavior that affects Hive integration:
136
137
```java
138
Configuration config = new Configuration();
139
140
// Source configuration
141
config.setString("table.exec.source.idle-timeout", "0"); // No timeout for batch
142
config.setBoolean("table.exec.source.parallelism-inference.enabled", true);
143
144
// Sink configuration
145
config.setBoolean("table.exec.sink.not-null-enforcer", false);
146
config.setString("table.exec.sink.upsert-materialize", "none");
147
148
// General table configuration
149
config.setString("table.sql-dialect", "hive"); // Use Hive SQL dialect
150
```
151
152
### Execution Configuration
153
154
Configure Flink execution parameters that impact Hive connector performance:
155
156
```java
157
// Checkpointing for streaming jobs
158
config.setString("execution.checkpointing.interval", "300s");
159
config.setString("state.backend", "rocksdb");
160
161
// Memory configuration
162
config.setString("taskmanager.memory.process.size", "4gb");
163
config.setString("jobmanager.memory.process.size", "1gb");
164
165
// Parallelism configuration
166
config.setInteger("parallelism.default", 4);
167
```
168
169
## Configuration Examples
170
171
### Batch Processing Configuration
172
173
Optimal configuration for large batch processing workloads:
174
175
```java
176
Configuration batchConfig = new Configuration();
177
178
// Disable streaming features
179
batchConfig.setString("table.exec.source.idle-timeout", "0");
180
181
// Optimize parallelism
182
batchConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);
183
batchConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 1000);
184
185
// Use vectorized readers for performance
186
batchConfig.setBoolean("table.exec.hive.fallback-mapred-reader", false);
187
188
// Memory optimization
189
batchConfig.setString("table.exec.resource.default-parallelism", "8");
190
```
191
192
### Streaming Configuration
193
194
Configuration for continuous streaming from Hive tables:
195
196
```java
197
Configuration streamConfig = new Configuration();
198
199
// Enable streaming features
200
streamConfig.setString("table.exec.source.idle-timeout", "30s");
201
202
// Partition monitoring
203
streamConfig.setString("partition.discovery.interval-millis", "60000");
204
205
// Moderate parallelism for streaming
206
streamConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);
207
streamConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 200);
208
209
// Checkpointing
210
streamConfig.setString("execution.checkpointing.interval", "300s");
211
```
212
213
### High-Throughput Configuration
214
215
Configuration for maximum throughput scenarios:
216
217
```java
218
Configuration highThroughputConfig = new Configuration();
219
220
// Maximize parallelism
221
highThroughputConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 2000);
222
highThroughputConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);
223
224
// Use vectorized operations
225
highThroughputConfig.setBoolean("table.exec.hive.fallback-mapred-reader", false);
226
highThroughputConfig.setBoolean("table.exec.hive.fallback-mapred-writer", false);
227
228
// Memory optimization
229
highThroughputConfig.setString("taskmanager.memory.process.size", "8gb");
230
highThroughputConfig.setString("taskmanager.memory.managed.fraction", "0.6");
231
```
232
233
### Compatibility Configuration
234
235
Configuration for maximum compatibility with various Hive setups:
236
237
```java
238
Configuration compatConfig = new Configuration();
239
240
// Enable fallbacks for compatibility
241
compatConfig.setBoolean("table.exec.hive.fallback-mapred-reader", true);
242
compatConfig.setBoolean("table.exec.hive.fallback-mapred-writer", true);
243
244
// Conservative parallelism
245
compatConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 100);
246
247
// Use Hive SQL dialect
248
compatConfig.setString("table.sql-dialect", "hive");
249
```
250
251
## Dynamic Configuration
252
253
### Runtime Configuration Changes
254
255
Some configuration options can be changed at runtime:
256
257
```java
258
// Change configuration during job execution
259
tableEnv.getConfig().getConfiguration()
260
.setBoolean("table.exec.hive.fallback-mapred-reader", true);
261
262
// Apply configuration to specific queries
263
tableEnv.executeSql("SET 'table.exec.hive.infer-source-parallelism.max' = '500'");
264
```
265
266
### Per-Table Configuration
267
268
Configure options for specific Hive tables:
269
270
```java
271
// Create table with specific options
272
tableEnv.executeSql(
273
"CREATE TABLE hive_table (...) " +
274
"WITH (" +
275
"'connector' = 'hive', " +
276
"'table.exec.hive.fallback-mapred-reader' = 'true'" +
277
")"
278
);
279
```