or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md

function-module.mddocs/

0

# Function Module Integration

1

2

Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. The function module provides seamless compatibility with Hive's extensive library of built-in functions and enables registration of custom user-defined functions.

3

4

## Capabilities

5

6

### HiveModule

7

8

Module implementation that provides access to Hive's built-in functions within Flink SQL.

9

10

```java { .api }

11

/**

12

* Module providing Hive built-in functions for Flink SQL

13

* Enables transparent use of Hive functions in Flink queries

14

*/

15

public class HiveModule implements Module {

16

17

/**

18

* Create HiveModule with default Hive version (detected from classpath)

19

*/

20

public HiveModule();

21

22

/**

23

* Create HiveModule with specific Hive version

24

* @param hiveVersion Hive version string (e.g., "2.3.9")

25

*/

26

public HiveModule(String hiveVersion);

27

28

/**

29

* Create HiveModule with specific Hive version and class loader

30

* @param hiveVersion Hive version string (e.g., "2.3.9")

31

* @param classLoader Class loader for function loading

32

*/

33

public HiveModule(String hiveVersion, ClassLoader classLoader);

34

35

/**

36

* Create HiveModule with specific Hive version, configuration, and class loader

37

* @param hiveVersion Hive version string (e.g., "2.3.9")

38

* @param config Readable configuration for module settings

39

* @param classLoader Class loader for function loading

40

*/

41

public HiveModule(String hiveVersion, ReadableConfig config, ClassLoader classLoader);

42

43

/**

44

* List all available Hive built-in functions

45

* @return Set of function names available through this module

46

*/

47

public Set<String> listFunctions();

48

49

/**

50

* Get function definition for a specific function name

51

* @param name Function name (case-insensitive)

52

* @return Optional containing function definition if available

53

*/

54

public Optional<FunctionDefinition> getFunctionDefinition(String name);

55

}

56

```

57

58

**Usage Examples:**

59

60

```java

61

import org.apache.flink.table.module.hive.HiveModule;

62

import org.apache.flink.table.api.TableEnvironment;

63

64

// Create table environment

65

TableEnvironment tableEnv = TableEnvironment.create(settings);

66

67

// Load Hive module with default version

68

HiveModule hiveModule = new HiveModule();

69

tableEnv.loadModule("hive", hiveModule);

70

71

// Or specify explicit Hive version

72

HiveModule specificModule = new HiveModule("2.3.9");

73

tableEnv.loadModule("hive", specificModule);

74

75

// List available functions

76

Set<String> functions = hiveModule.listFunctions();

77

System.out.println("Available Hive functions: " + functions.size());

78

79

// Check if specific function exists

80

Optional<FunctionDefinition> concatWs = hiveModule.getFunctionDefinition("concat_ws");

81

if (concatWs.isPresent()) {

82

System.out.println("concat_ws function is available");

83

}

84

85

// Use Hive functions in SQL queries

86

Table result = tableEnv.sqlQuery("""

87

SELECT

88

concat_ws('|', first_name, last_name) as full_name,

89

upper(email) as email_upper,

90

size(split(address, ' ')) as address_parts,

91

from_unixtime(created_timestamp) as created_date,

92

if(active = 1, 'ACTIVE', 'INACTIVE') as status

93

FROM users

94

""");

95

```

96

97

98

### Available Hive Functions

99

100

The HiveModule provides access to a comprehensive set of Hive built-in functions:

101

102

#### String Functions

103

104

```sql

105

-- String manipulation functions

106

SELECT

107

concat('Hello', ' ', 'World') as greeting,

108

concat_ws('|', col1, col2, col3) as pipe_separated,

109

upper(name) as name_upper,

110

lower(email) as email_lower,

111

length(description) as desc_length,

112

substr(text, 1, 10) as first_10_chars,

113

trim(padded_string) as trimmed,

114

regexp_replace(phone, '[^0-9]', '') as clean_phone,

115

split(csv_data, ',') as array_values,

116

reverse(string_col) as reversed

117

FROM my_table;

118

```

119

120

#### Date and Time Functions

121

122

```sql

123

-- Date/time manipulation functions

124

SELECT

125

from_unixtime(unix_timestamp) as formatted_date,

126

unix_timestamp('2024-01-01 12:00:00') as unix_ts,

127

year(date_col) as year_part,

128

month(date_col) as month_part,

129

day(date_col) as day_part,

130

date_add(date_col, 30) as thirty_days_later,

131

date_sub(current_date(), 7) as week_ago,

132

datediff(end_date, start_date) as days_between,

133

date_format(datetime_col, 'yyyy-MM-dd') as formatted

134

FROM events;

135

```

136

137

#### Mathematical Functions

138

139

```sql

140

-- Mathematical and statistical functions

141

SELECT

142

abs(negative_value) as absolute,

143

round(decimal_value, 2) as rounded,

144

ceil(float_value) as ceiling,

145

floor(float_value) as floored,

146

greatest(val1, val2, val3) as maximum,

147

least(val1, val2, val3) as minimum,

148

rand() as random_value,

149

pow(base, exponent) as power,

150

sqrt(number) as square_root,

151

sin(angle) as sine_value

152

FROM calculations;

153

```

154

155

#### Collection Functions

156

157

```sql

158

-- Array and map functions

159

SELECT

160

size(array_col) as array_length,

161

array_contains(tags, 'important') as has_important_tag,

162

sort_array(string_array) as sorted_array,

163

map_keys(properties) as property_keys,

164

map_values(properties) as property_values,

165

explode(array_col) as individual_elements

166

FROM structured_data;

167

```

168

169

#### Type Conversion Functions

170

171

```sql

172

-- Type casting and conversion functions

173

SELECT

174

cast(string_number as int) as integer_value,

175

cast(timestamp_col as date) as date_only,

176

string(numeric_col) as string_representation,

177

int(boolean_col) as boolean_as_int,

178

double(string_decimal) as decimal_value

179

FROM mixed_types;

180

```

181

182

#### Conditional Functions

183

184

```sql

185

-- Conditional and null-handling functions

186

SELECT

187

if(score > 80, 'PASS', 'FAIL') as result,

188

case

189

when grade >= 90 then 'A'

190

when grade >= 80 then 'B'

191

when grade >= 70 then 'C'

192

else 'F'

193

end as letter_grade,

194

coalesce(preferred_name, first_name, 'Unknown') as display_name,

195

nvl(optional_field, 'N/A') as with_default,

196

isnull(nullable_col) as is_null_check,

197

isnotnull(nullable_col) as is_not_null_check

198

FROM student_grades;

199

```

200

201

## Advanced Function Integration

202

203

### Custom UDF Registration

204

205

While HiveModule provides built-in functions, you can also register custom Hive UDFs:

206

207

```java

208

// Register custom Hive UDF

209

tableEnv.createTemporaryFunction("my_custom_udf", MyHiveUDF.class);

210

211

// Use in SQL queries

212

Table result = tableEnv.sqlQuery("""

213

SELECT

214

user_id,

215

my_custom_udf(input_data) as processed_data

216

FROM user_data

217

""");

218

```

219

220

### Function Resolution Order

221

222

Configure module loading order to control function resolution:

223

224

```java

225

// Load modules in specific order

226

tableEnv.loadModule("hive", new HiveModule("2.3.9"));

227

tableEnv.loadModule("core", CoreModule.INSTANCE);

228

229

// List loaded modules

230

String[] modules = tableEnv.listModules();

231

System.out.println("Loaded modules: " + Arrays.toString(modules));

232

233

// Use module order for function resolution

234

// Functions in earlier modules take precedence

235

```

236

237

### Function Catalog Integration

238

239

Combine with Hive catalog for comprehensive function access:

240

241

```java

242

// Set up both catalog and module

243

HiveCatalog catalog = new HiveCatalog("hive", "default", "/etc/hive/conf", null, "2.3.9");

244

HiveModule module = new HiveModule("2.3.9");

245

246

tableEnv.registerCatalog("hive", catalog);

247

tableEnv.loadModule("hive", module);

248

tableEnv.useCatalog("hive");

249

250

// Access both built-in and user-defined functions

251

Table result = tableEnv.sqlQuery("""

252

SELECT

253

-- Built-in Hive function from module

254

concat_ws('|', first_name, last_name) as full_name,

255

-- Custom UDF registered in catalog

256

my_database.my_custom_function(data) as processed,

257

-- Standard Flink function

258

CURRENT_TIMESTAMP as processing_time

259

FROM user_profiles

260

""");

261

```

262

263

### Performance Considerations

264

265

```java

266

// Configure function execution for performance

267

Configuration config = new Configuration();

268

269

// Enable object reuse for UDF performance

270

config.setBoolean("table.exec.resource.default-parallelism", true);

271

272

// Configure state backend for stateful UDFs

273

config.setString("state.backend", "rocksdb");

274

config.setString("state.checkpoints.dir", "hdfs://namenode:9000/checkpoints");

275

276

TableEnvironment tableEnv = TableEnvironment.create(

277

EnvironmentSettings.newInstance()

278

.withConfiguration(config)

279

.build()

280

);

281

```

282

283

### Complex Function Usage Examples

284

285

```java

286

// Complex data processing with Hive functions

287

Table processedData = tableEnv.sqlQuery("""

288

WITH parsed_logs AS (

289

SELECT

290

regexp_extract(log_line, '(\\d{4}-\\d{2}-\\d{2})', 1) as log_date,

291

regexp_extract(log_line, 'level=(\\w+)', 1) as log_level,

292

split(log_line, '\\|') as log_parts,

293

size(split(log_line, '\\|')) as part_count

294

FROM raw_logs

295

WHERE log_line IS NOT NULL

296

),

297

enhanced_logs AS (

298

SELECT

299

*,

300

from_unixtime(unix_timestamp(log_date, 'yyyy-MM-dd')) as parsed_date,

301

if(log_level IN ('ERROR', 'FATAL'), 1, 0) as is_error,

302

map('date', log_date, 'level', log_level) as log_metadata

303

FROM parsed_logs

304

WHERE part_count >= 3

305

)

306

SELECT

307

date_format(parsed_date, 'yyyy-MM') as month,

308

log_level,

309

count(*) as log_count,

310

sum(is_error) as error_count,

311

collect_list(log_metadata) as monthly_logs

312

FROM enhanced_logs

313

GROUP BY date_format(parsed_date, 'yyyy-MM'), log_level

314

ORDER BY month DESC, log_level

315

""");

316

317

// Execute and print results

318

processedData.execute().print();

319

```

320

321

## Integration Patterns

322

323

### SQL DDL Function Definition

324

325

```sql

326

-- Create temporary function from Java class

327

CREATE TEMPORARY FUNCTION my_hash AS 'com.company.udfs.HashFunction';

328

329

-- Create catalog function (persisted in Hive metastore)

330

CREATE FUNCTION my_catalog.analytics.custom_aggregator AS 'com.company.udfs.CustomAggregator'

331

USING JAR 'hdfs://namenode:9000/udf-jars/custom-functions.jar';

332

```

333

334

### Streaming Function Usage

335

336

```java

337

// Use Hive functions in streaming queries

338

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

339

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

340

341

// Load Hive module for streaming

342

tableEnv.loadModule("hive", new HiveModule("2.3.9"));

343

344

// Process streaming data with Hive functions

345

Table streamResult = tableEnv.sqlQuery("""

346

SELECT

347

window_start,

348

window_end,

349

concat_ws(':', user_id, session_id) as user_session,

350

count(*) as event_count,

351

collect_list(event_type) as event_types,

352

max(from_unixtime(event_timestamp)) as latest_event

353

FROM TABLE(

354

HOP(TABLE source_stream, DESCRIPTOR(event_time), INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)

355

)

356

GROUP BY window_start, window_end, user_id, session_id

357

""");

358

```