0
# Utility Functions
1
2
Helper utilities for common operations, parameter handling, and DataSet manipulation. These utilities simplify common tasks and provide additional functionality for Flink batch programs.
3
4
## Capabilities
5
6
### Parameter Handling
7
8
Utility for handling program parameters from various sources including command line arguments, properties files, and system properties.
9
10
```java { .api }
11
/**
12
* Utility class for handling program parameters
13
*/
14
public class ParameterTool implements Serializable {
15
16
/**
17
* Create ParameterTool from command line arguments
18
* @param args command line arguments in key-value format
19
* @return ParameterTool instance
20
*/
21
public static ParameterTool fromArgs(String[] args);
22
23
/**
24
* Create ParameterTool from properties file
25
* @param path path to the properties file
26
* @return ParameterTool instance
27
* @throws IOException if file cannot be read
28
*/
29
public static ParameterTool fromPropertiesFile(String path) throws IOException;
30
31
/**
32
* Create ParameterTool from properties file with ClassLoader
33
* @param file properties file
34
* @param classLoader class loader to use
35
* @return ParameterTool instance
36
* @throws IOException if file cannot be read
37
*/
38
public static ParameterTool fromPropertiesFile(File file, ClassLoader classLoader) throws IOException;
39
40
/**
41
* Create ParameterTool from Map
42
* @param map map containing key-value pairs
43
* @return ParameterTool instance
44
*/
45
public static ParameterTool fromMap(Map<String, String> map);
46
47
/**
48
* Create ParameterTool from system properties
49
* @return ParameterTool instance with system properties
50
*/
51
public static ParameterTool fromSystemProperties();
52
53
/**
54
* Get parameter value as String
55
* @param key parameter key
56
* @return parameter value or null if not found
57
*/
58
public String get(String key);
59
60
/**
61
* Get parameter value with default
62
* @param key parameter key
63
* @param defaultValue default value if key not found
64
* @return parameter value or default value
65
*/
66
public String get(String key, String defaultValue);
67
68
/**
69
* Get parameter value as integer
70
* @param key parameter key
71
* @return parameter value as integer
72
* @throws NumberFormatException if value is not a valid integer
73
*/
74
public int getInt(String key);
75
76
/**
77
* Get parameter value as integer with default
78
* @param key parameter key
79
* @param defaultValue default value if key not found
80
* @return parameter value as integer or default value
81
*/
82
public int getInt(String key, int defaultValue);
83
84
/**
85
* Get parameter value as long
86
* @param key parameter key
87
* @return parameter value as long
88
*/
89
public long getLong(String key);
90
91
/**
92
* Get parameter value as long with default
93
* @param key parameter key
94
* @param defaultValue default value if key not found
95
* @return parameter value as long or default value
96
*/
97
public long getLong(String key, long defaultValue);
98
99
/**
100
* Get parameter value as double
101
* @param key parameter key
102
* @return parameter value as double
103
*/
104
public double getDouble(String key);
105
106
/**
107
* Get parameter value as double with default
108
* @param key parameter key
109
* @param defaultValue default value if key not found
110
* @return parameter value as double or default value
111
*/
112
public double getDouble(String key, double defaultValue);
113
114
/**
115
* Get parameter value as boolean
116
* @param key parameter key
117
* @return parameter value as boolean
118
*/
119
public boolean getBoolean(String key);
120
121
/**
122
* Get parameter value as boolean with default
123
* @param key parameter key
124
* @param defaultValue default value if key not found
125
* @return parameter value as boolean or default value
126
*/
127
public boolean getBoolean(String key, boolean defaultValue);
128
129
/**
130
* Check if parameter key exists
131
* @param key parameter key
132
* @return true if key exists, false otherwise
133
*/
134
public boolean has(String key);
135
136
/**
137
* Convert to Flink Configuration object
138
* @return Configuration object with all parameters
139
*/
140
public Configuration getConfiguration();
141
142
/**
143
* Convert to Properties object
144
* @return Properties object with all parameters
145
*/
146
public Properties getProperties();
147
}
148
```
149
150
**Usage Examples:**
151
152
```java
153
public static void main(String[] args) throws Exception {
154
// Parse command line arguments
155
ParameterTool params = ParameterTool.fromArgs(args);
156
157
// Get parameters with defaults
158
String inputPath = params.get("input", "/default/input/path");
159
String outputPath = params.get("output", "/default/output/path");
160
int parallelism = params.getInt("parallelism", 1);
161
boolean verbose = params.getBoolean("verbose", false);
162
163
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
164
165
// Use configuration from parameters
166
env.getConfig().setGlobalJobParameters(params);
167
env.setParallelism(parallelism);
168
169
// Create program using parameters
170
DataSet<String> input = env.readTextFile(inputPath);
171
// ... process data ...
172
result.writeAsText(outputPath);
173
174
env.execute("Parameterized Job");
175
}
176
177
// Usage from properties file
178
ParameterTool fileParams = ParameterTool.fromPropertiesFile("/path/to/config.properties");
179
String dbHost = fileParams.get("database.host", "localhost");
180
int dbPort = fileParams.getInt("database.port", 5432);
181
182
// Combine multiple parameter sources
183
ParameterTool systemParams = ParameterTool.fromSystemProperties();
184
ParameterTool combinedParams = fileParams.mergeWith(systemParams);
185
```
186
187
### Multiple Parameter Tool
188
189
Enhanced parameter tool supporting multiple values per key.
190
191
```java { .api }
192
/**
193
* Parameter tool supporting multiple values for the same key
194
*/
195
public class MultipleParameterTool implements Serializable {
196
197
/**
198
* Create from command line arguments allowing multiple values
199
* @param args command line arguments
200
* @return MultipleParameterTool instance
201
*/
202
public static MultipleParameterTool fromArgs(String[] args);
203
204
/**
205
* Get all values for a key
206
* @param key parameter key
207
* @return list of all values for the key
208
*/
209
public List<String> getMultiple(String key);
210
211
/**
212
* Get first value for a key
213
* @param key parameter key
214
* @return first value or null if not found
215
*/
216
public String get(String key);
217
218
/**
219
* Convert to regular ParameterTool (keeps only first value per key)
220
* @return ParameterTool instance
221
*/
222
public ParameterTool getParameterTool();
223
}
224
```
225
226
### DataSet Utilities
227
228
Utility functions for common DataSet operations and manipulations.
229
230
```java { .api }
231
/**
232
* Utility class for DataSet operations
233
*/
234
public class DataSetUtils {
235
236
/**
237
* Zip DataSet elements with sequential index
238
* @param input input DataSet
239
* @return DataSet of Tuple2 with index and original element
240
*/
241
public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input);
242
243
/**
244
* Zip DataSet elements with unique ID
245
* @param input input DataSet
246
* @return DataSet of Tuple2 with unique ID and original element
247
*/
248
public static <T> DataSet<Tuple2<Long, T>> zipWithUniqueId(DataSet<T> input);
249
250
/**
251
* Sample elements from DataSet
252
* @param input input DataSet
253
* @param withReplacement whether to sample with replacement
254
* @param fraction fraction of elements to sample (0.0 to 1.0)
255
* @return DataSet with sampled elements
256
*/
257
public static <T> DataSet<T> sample(DataSet<T> input, boolean withReplacement, double fraction);
258
259
/**
260
* Sample elements from DataSet with random seed
261
* @param input input DataSet
262
* @param withReplacement whether to sample with replacement
263
* @param fraction fraction of elements to sample
264
* @param seed random seed for reproducible sampling
265
* @return DataSet with sampled elements
266
*/
267
public static <T> DataSet<T> sample(DataSet<T> input, boolean withReplacement, double fraction, long seed);
268
269
/**
270
* Sample fixed number of elements from DataSet
271
* @param input input DataSet
272
* @param withReplacement whether to sample with replacement
273
* @param numSamples number of samples to take
274
* @return DataSet with sampled elements
275
*/
276
public static <T> DataSet<T> sampleWithSize(DataSet<T> input, boolean withReplacement, int numSamples);
277
278
/**
279
* Sample fixed number of elements with random seed
280
* @param input input DataSet
281
* @param withReplacement whether to sample with replacement
282
* @param numSamples number of samples to take
283
* @param seed random seed for reproducible sampling
284
* @return DataSet with sampled elements
285
*/
286
public static <T> DataSet<T> sampleWithSize(DataSet<T> input, boolean withReplacement, int numSamples, long seed);
287
288
/**
289
* Count elements in each partition
290
* @param input input DataSet
291
* @return DataSet with partition ID and element count pairs
292
*/
293
public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T> input);
294
}
295
```
296
297
**Usage Examples:**
298
299
```java
300
// Add sequential index to elements
301
DataSet<String> words = env.fromElements("hello", "world", "flink");
302
DataSet<Tuple2<Long, String>> indexed = DataSetUtils.zipWithIndex(words);
303
// Result: (0, "hello"), (1, "world"), (2, "flink")
304
305
// Add unique IDs (not necessarily sequential)
306
DataSet<Tuple2<Long, String>> withIds = DataSetUtils.zipWithUniqueId(words);
307
308
// Sample 50% of elements
309
DataSet<String> largeDatatSet = env.readTextFile("/path/to/large/file.txt");
310
DataSet<String> sample = DataSetUtils.sample(largeDatatSet, false, 0.5);
311
312
// Sample fixed number of elements
313
DataSet<String> fixedSample = DataSetUtils.sampleWithSize(largeDatatSet, false, 1000);
314
315
// Sample with seed for reproducible results
316
DataSet<String> reproducibleSample = DataSetUtils.sample(largeDatatSet, false, 0.1, 12345L);
317
318
// Count elements per partition
319
DataSet<Tuple2<Integer, Long>> partitionCounts = DataSetUtils.countElementsPerPartition(largeDatatSet);
320
```
321
322
### Data Summarization
323
324
Utilities for computing summary statistics on DataSets.
325
326
```java { .api }
327
/**
328
* Interface for column summary statistics
329
*/
330
public interface ColumnSummary {
331
/**
332
* Get total count of elements
333
* @return total element count
334
*/
335
long getTotalCount();
336
337
/**
338
* Get count of null elements
339
* @return null element count
340
*/
341
long getNullCount();
342
343
/**
344
* Get count of non-null elements
345
* @return non-null element count
346
*/
347
long getNonNullCount();
348
}
349
350
/**
351
* Summary statistics for numeric columns
352
* @param <T> numeric type (Integer, Long, Double, etc.)
353
*/
354
public interface NumericColumnSummary<T> extends ColumnSummary {
355
/**
356
* Get minimum value
357
* @return minimum value
358
*/
359
T getMin();
360
361
/**
362
* Get maximum value
363
* @return maximum value
364
*/
365
T getMax();
366
367
/**
368
* Get sum of all values
369
* @return sum
370
*/
371
Double getSum();
372
373
/**
374
* Get mean (average) value
375
* @return mean value
376
*/
377
Double getMean();
378
379
/**
380
* Get variance
381
* @return variance
382
*/
383
Double getVariance();
384
385
/**
386
* Get standard deviation
387
* @return standard deviation
388
*/
389
Double getStandardDeviation();
390
}
391
392
/**
393
* Summary statistics for string columns
394
*/
395
public interface StringColumnSummary extends ColumnSummary {
396
/**
397
* Get minimum string length
398
* @return minimum length
399
*/
400
Integer getMinLength();
401
402
/**
403
* Get maximum string length
404
* @return maximum length
405
*/
406
Integer getMaxLength();
407
408
/**
409
* Get mean string length
410
* @return mean length
411
*/
412
Double getMeanLength();
413
414
/**
415
* Check if all values are numeric
416
* @return true if all non-null values are numeric
417
*/
418
Boolean getIsNumeric();
419
}
420
421
/**
422
* Summary statistics for boolean columns
423
*/
424
public interface BooleanColumnSummary extends ColumnSummary {
425
/**
426
* Get count of true values
427
* @return true count
428
*/
429
Long getTrueCount();
430
431
/**
432
* Get count of false values
433
* @return false count
434
*/
435
Long getFalseCount();
436
}
437
```
438
439
### Function Annotations
440
441
Annotations for optimizing user-defined functions by specifying field forwarding patterns.
442
443
```java { .api }
444
/**
445
* Container class for function annotations
446
*/
447
public class FunctionAnnotation {
448
449
/**
450
* Annotation to specify forwarded fields
451
*/
452
@Target(ElementType.TYPE)
453
@Retention(RetentionPolicy.RUNTIME)
454
public @interface ForwardedFields {
455
String[] value();
456
}
457
458
/**
459
* Annotation for forwarded fields from first input (for two-input functions)
460
*/
461
@Target(ElementType.TYPE)
462
@Retention(RetentionPolicy.RUNTIME)
463
public @interface ForwardedFieldsFirst {
464
String[] value();
465
}
466
467
/**
468
* Annotation for forwarded fields from second input (for two-input functions)
469
*/
470
@Target(ElementType.TYPE)
471
@Retention(RetentionPolicy.RUNTIME)
472
public @interface ForwardedFieldsSecond {
473
String[] value();
474
}
475
476
/**
477
* Annotation to specify non-forwarded fields
478
*/
479
@Target(ElementType.TYPE)
480
@Retention(RetentionPolicy.RUNTIME)
481
public @interface NonForwardedFields {
482
String[] value();
483
}
484
485
/**
486
* Annotation to specify which fields are read by the function
487
*/
488
@Target(ElementType.TYPE)
489
@Retention(RetentionPolicy.RUNTIME)
490
public @interface ReadFields {
491
String[] value();
492
}
493
494
/**
495
* Annotation to specify which fields are not read by the function
496
*/
497
@Target(ElementType.TYPE)
498
@Retention(RetentionPolicy.RUNTIME)
499
public @interface SkippedFields {
500
String[] value();
501
}
502
}
503
```
504
505
**Usage Examples:**
506
507
```java
508
// Function that forwards first field unchanged
509
@FunctionAnnotation.ForwardedFields("0")
510
public static class AddConstantMap implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
511
@Override
512
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
513
return new Tuple2<>(value.f0, value.f1 + 10); // field 0 is forwarded, field 1 is modified
514
}
515
}
516
517
// Join function that forwards fields from both inputs
518
@FunctionAnnotation.ForwardedFieldsFirst("0")
519
@FunctionAnnotation.ForwardedFieldsSecond("1")
520
public static class CombineJoin implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Tuple3<String, Integer, Double>> {
521
@Override
522
public Tuple3<String, Integer, Double> join(Tuple2<String, Integer> first, Tuple2<String, Double> second) {
523
return new Tuple3<>(first.f0, first.f1, second.f1);
524
}
525
}
526
527
// Function that only reads certain fields
528
@FunctionAnnotation.ReadFields("1;2") // only reads fields 1 and 2
529
public static class PartialReader implements MapFunction<Tuple4<String, Integer, Double, Boolean>, String> {
530
@Override
531
public String map(Tuple4<String, Integer, Double, Boolean> value) {
532
return "Value: " + value.f1 + ", " + value.f2; // only uses f1 and f2
533
}
534
}
535
```
536
537
## Types
538
539
```java { .api }
540
import org.apache.flink.api.java.utils.ParameterTool;
541
import org.apache.flink.api.java.utils.MultipleParameterTool;
542
import org.apache.flink.api.java.utils.DataSetUtils;
543
import org.apache.flink.api.java.summarize.*;
544
import org.apache.flink.api.java.functions.FunctionAnnotation;
545
import org.apache.flink.configuration.Configuration;
546
import org.apache.flink.api.java.tuple.Tuple2;
547
import java.util.Properties;
548
import java.util.List;
549
import java.util.Map;
550
import java.io.IOException;
551
```