or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdconfiguration.mddata-source.mdfunctions.mdindex.mdtable-api.md

configuration.mddocs/

0

# Configuration

1

2

Comprehensive configuration system for tuning Hive connector behavior, performance optimization, and feature toggles. The configuration options control various aspects of data reading, writing, and metadata operations.

3

4

## Capabilities

5

6

### HiveOptions

7

8

Main configuration class containing all Hive connector-specific options.

9

10

```java { .api }

11

/**

12

* Configuration options for Hive connector behavior

13

*/

14

public class HiveOptions {

15

16

/** Fallback to MapReduce reader when vectorized reader fails */

17

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;

18

19

/** Automatically infer source parallelism based on file splits */

20

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;

21

22

/** Maximum parallelism when inferring source parallelism */

23

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;

24

25

/** Fallback to MapReduce writer when vectorized writer fails */

26

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;

27

}

28

```

29

30

## Configuration Categories

31

32

### Source Configuration

33

34

Options for controlling how data is read from Hive tables.

35

36

```java { .api }

37

/** Enable fallback to MapReduce reader for compatibility */

38

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =

39

ConfigOptions.key("table.exec.hive.fallback-mapred-reader")

40

.booleanType()

41

.defaultValue(false)

42

.withDescription("Whether to fallback to MapReduce reader when vectorized reader fails");

43

44

/** Automatically infer parallelism from input splits */

45

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =

46

ConfigOptions.key("table.exec.hive.infer-source-parallelism")

47

.booleanType()

48

.defaultValue(true)

49

.withDescription("Whether to infer source parallelism based on number of file splits");

50

51

/** Maximum inferred parallelism limit */

52

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =

53

ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")

54

.intType()

55

.defaultValue(1000)

56

.withDescription("Maximum parallelism that can be inferred for Hive sources");

57

```

58

59

**Usage Examples:**

60

61

```java

62

import org.apache.flink.configuration.Configuration;

63

import org.apache.flink.table.api.TableEnvironment;

64

65

// Configure table environment

66

Configuration config = new Configuration();

67

68

// Enable MapReduce reader fallback for compatibility

69

config.setBoolean("table.exec.hive.fallback-mapred-reader", true);

70

71

// Configure parallelism inference

72

config.setBoolean("table.exec.hive.infer-source-parallelism", true);

73

config.setInteger("table.exec.hive.infer-source-parallelism.max", 500);

74

75

// Apply configuration to table environment

76

TableEnvironment tableEnv = TableEnvironment.create(

77

EnvironmentSettings.newInstance().withConfiguration(config).build()

78

);

79

```

80

81

### Sink Configuration

82

83

Options for controlling how data is written to Hive tables.

84

85

```java { .api }

86

/** Enable fallback to MapReduce writer for compatibility */

87

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =

88

ConfigOptions.key("table.exec.hive.fallback-mapred-writer")

89

.booleanType()

90

.defaultValue(true)

91

.withDescription("Whether to fallback to MapReduce writer when vectorized writer fails");

92

```

93

94

**Usage Examples:**

95

96

```java

97

// Enable MapReduce writer fallback

98

config.setBoolean("table.exec.hive.fallback-mapred-writer", true);

99

100

// Write to Hive table with fallback enabled

101

tableEnv.executeSql("INSERT INTO hive_table SELECT * FROM source_table");

102

```

103

104

### Streaming Configuration

105

106

Options specific to streaming mode operations.

107

108

```java

109

// Configure streaming-specific options

110

config.setString("table.exec.source.idle-timeout", "10s");

111

config.setString("partition.discovery.interval-millis", "60000"); // 1 minute

112

config.setString("source.monitor-interval", "30s");

113

```

114

115

### Performance Tuning

116

117

Configuration options for optimizing performance based on workload characteristics.

118

119

```java

120

// Optimize for large batch workloads

121

config.setBoolean("table.exec.hive.infer-source-parallelism", true);

122

config.setInteger("table.exec.hive.infer-source-parallelism.max", 2000);

123

124

// Optimize for small files

125

config.setBoolean("table.exec.hive.fallback-mapred-reader", false); // Use vectorized reader

126

127

// Memory optimization

128

config.setString("table.exec.resource.default-parallelism", "4");

129

```

130

131

## Global Flink Configuration

132

133

### Table API Configuration

134

135

Configure table-level behavior that affects Hive integration:

136

137

```java

138

Configuration config = new Configuration();

139

140

// Source configuration

141

config.setString("table.exec.source.idle-timeout", "0"); // No timeout for batch

142

config.setBoolean("table.exec.source.parallelism-inference.enabled", true);

143

144

// Sink configuration

145

config.setBoolean("table.exec.sink.not-null-enforcer", false);

146

config.setString("table.exec.sink.upsert-materialize", "none");

147

148

// General table configuration

149

config.setString("table.sql-dialect", "hive"); // Use Hive SQL dialect

150

```

151

152

### Execution Configuration

153

154

Configure Flink execution parameters that impact Hive connector performance:

155

156

```java

157

// Checkpointing for streaming jobs

158

config.setString("execution.checkpointing.interval", "300s");

159

config.setString("state.backend", "rocksdb");

160

161

// Memory configuration

162

config.setString("taskmanager.memory.process.size", "4gb");

163

config.setString("jobmanager.memory.process.size", "1gb");

164

165

// Parallelism configuration

166

config.setInteger("parallelism.default", 4);

167

```

168

169

## Configuration Examples

170

171

### Batch Processing Configuration

172

173

Optimal configuration for large batch processing workloads:

174

175

```java

176

Configuration batchConfig = new Configuration();

177

178

// Disable streaming features

179

batchConfig.setString("table.exec.source.idle-timeout", "0");

180

181

// Optimize parallelism

182

batchConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);

183

batchConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 1000);

184

185

// Use vectorized readers for performance

186

batchConfig.setBoolean("table.exec.hive.fallback-mapred-reader", false);

187

188

// Memory optimization

189

batchConfig.setString("table.exec.resource.default-parallelism", "8");

190

```

191

192

### Streaming Configuration

193

194

Configuration for continuous streaming from Hive tables:

195

196

```java

197

Configuration streamConfig = new Configuration();

198

199

// Enable streaming features

200

streamConfig.setString("table.exec.source.idle-timeout", "30s");

201

202

// Partition monitoring

203

streamConfig.setString("partition.discovery.interval-millis", "60000");

204

205

// Moderate parallelism for streaming

206

streamConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);

207

streamConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 200);

208

209

// Checkpointing

210

streamConfig.setString("execution.checkpointing.interval", "300s");

211

```

212

213

### High-Throughput Configuration

214

215

Configuration for maximum throughput scenarios:

216

217

```java

218

Configuration highThroughputConfig = new Configuration();

219

220

// Maximize parallelism

221

highThroughputConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 2000);

222

highThroughputConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);

223

224

// Use vectorized operations

225

highThroughputConfig.setBoolean("table.exec.hive.fallback-mapred-reader", false);

226

highThroughputConfig.setBoolean("table.exec.hive.fallback-mapred-writer", false);

227

228

// Memory optimization

229

highThroughputConfig.setString("taskmanager.memory.process.size", "8gb");

230

highThroughputConfig.setString("taskmanager.memory.managed.fraction", "0.6");

231

```

232

233

### Compatibility Configuration

234

235

Configuration for maximum compatibility with various Hive setups:

236

237

```java

238

Configuration compatConfig = new Configuration();

239

240

// Enable fallbacks for compatibility

241

compatConfig.setBoolean("table.exec.hive.fallback-mapred-reader", true);

242

compatConfig.setBoolean("table.exec.hive.fallback-mapred-writer", true);

243

244

// Conservative parallelism

245

compatConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 100);

246

247

// Use Hive SQL dialect

248

compatConfig.setString("table.sql-dialect", "hive");

249

```

250

251

## Dynamic Configuration

252

253

### Runtime Configuration Changes

254

255

Some configuration options can be changed at runtime:

256

257

```java

258

// Change configuration during job execution

259

tableEnv.getConfig().getConfiguration()

260

.setBoolean("table.exec.hive.fallback-mapred-reader", true);

261

262

// Apply configuration to specific queries

263

tableEnv.executeSql("SET 'table.exec.hive.infer-source-parallelism.max' = '500'");

264

```

265

266

### Per-Table Configuration

267

268

Configure options for specific Hive tables:

269

270

```java

271

// Create table with specific options

272

tableEnv.executeSql(

273

"CREATE TABLE hive_table (...) " +

274

"WITH (" +

275

"'connector' = 'hive', " +

276

"'table.exec.hive.fallback-mapred-reader' = 'true'" +

277

")"

278

);

279

```