Apache Flink Java API - Core Java APIs for Apache Flink's batch and stream processing framework
—
Helper utilities for common operations, parameter handling, and DataSet manipulation. These utilities simplify common tasks and provide additional functionality for Flink batch programs.
Utility for handling program parameters from various sources including command line arguments, properties files, and system properties.
/**
* Utility class for handling program parameters
*/
public class ParameterTool implements Serializable {
/**
* Create ParameterTool from command line arguments
* @param args command line arguments in key-value format
* @return ParameterTool instance
*/
public static ParameterTool fromArgs(String[] args);
/**
* Create ParameterTool from properties file
* @param path path to the properties file
* @return ParameterTool instance
* @throws IOException if file cannot be read
*/
public static ParameterTool fromPropertiesFile(String path) throws IOException;
/**
* Create ParameterTool from properties file with ClassLoader
* @param file properties file
* @param classLoader class loader to use
* @return ParameterTool instance
* @throws IOException if file cannot be read
*/
public static ParameterTool fromPropertiesFile(File file, ClassLoader classLoader) throws IOException;
/**
* Create ParameterTool from Map
* @param map map containing key-value pairs
* @return ParameterTool instance
*/
public static ParameterTool fromMap(Map<String, String> map);
/**
* Create ParameterTool from system properties
* @return ParameterTool instance with system properties
*/
public static ParameterTool fromSystemProperties();
/**
* Get parameter value as String
* @param key parameter key
* @return parameter value or null if not found
*/
public String get(String key);
/**
* Get parameter value with default
* @param key parameter key
* @param defaultValue default value if key not found
* @return parameter value or default value
*/
public String get(String key, String defaultValue);
/**
* Get parameter value as integer
* @param key parameter key
* @return parameter value as integer
* @throws NumberFormatException if value is not a valid integer
*/
public int getInt(String key);
/**
* Get parameter value as integer with default
* @param key parameter key
* @param defaultValue default value if key not found
* @return parameter value as integer or default value
*/
public int getInt(String key, int defaultValue);
/**
* Get parameter value as long
* @param key parameter key
* @return parameter value as long
*/
public long getLong(String key);
/**
* Get parameter value as long with default
* @param key parameter key
* @param defaultValue default value if key not found
* @return parameter value as long or default value
*/
public long getLong(String key, long defaultValue);
/**
* Get parameter value as double
* @param key parameter key
* @return parameter value as double
*/
public double getDouble(String key);
/**
* Get parameter value as double with default
* @param key parameter key
* @param defaultValue default value if key not found
* @return parameter value as double or default value
*/
public double getDouble(String key, double defaultValue);
/**
* Get parameter value as boolean
* @param key parameter key
* @return parameter value as boolean
*/
public boolean getBoolean(String key);
/**
* Get parameter value as boolean with default
* @param key parameter key
* @param defaultValue default value if key not found
* @return parameter value as boolean or default value
*/
public boolean getBoolean(String key, boolean defaultValue);
/**
* Check if parameter key exists
* @param key parameter key
* @return true if key exists, false otherwise
*/
public boolean has(String key);
/**
* Convert to Flink Configuration object
* @return Configuration object with all parameters
*/
public Configuration getConfiguration();
/**
* Convert to Properties object
* @return Properties object with all parameters
*/
public Properties getProperties();
}Usage Examples:
public static void main(String[] args) throws Exception {
// Parse command line arguments
ParameterTool params = ParameterTool.fromArgs(args);
// Get parameters with defaults
String inputPath = params.get("input", "/default/input/path");
String outputPath = params.get("output", "/default/output/path");
int parallelism = params.getInt("parallelism", 1);
boolean verbose = params.getBoolean("verbose", false);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use configuration from parameters
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(parallelism);
// Create program using parameters
DataSet<String> input = env.readTextFile(inputPath);
// ... process data ...
result.writeAsText(outputPath);
env.execute("Parameterized Job");
}
// Usage from properties file
ParameterTool fileParams = ParameterTool.fromPropertiesFile("/path/to/config.properties");
String dbHost = fileParams.get("database.host", "localhost");
int dbPort = fileParams.getInt("database.port", 5432);
// Combine multiple parameter sources
ParameterTool systemParams = ParameterTool.fromSystemProperties();
ParameterTool combinedParams = fileParams.mergeWith(systemParams);Enhanced parameter tool supporting multiple values per key.
/**
* Parameter tool supporting multiple values for the same key
*/
public class MultipleParameterTool implements Serializable {
/**
* Create from command line arguments allowing multiple values
* @param args command line arguments
* @return MultipleParameterTool instance
*/
public static MultipleParameterTool fromArgs(String[] args);
/**
* Get all values for a key
* @param key parameter key
* @return list of all values for the key
*/
public List<String> getMultiple(String key);
/**
* Get first value for a key
* @param key parameter key
* @return first value or null if not found
*/
public String get(String key);
/**
* Convert to regular ParameterTool (keeps only first value per key)
* @return ParameterTool instance
*/
public ParameterTool getParameterTool();
}Utility functions for common DataSet operations and manipulations.
/**
* Utility class for DataSet operations
*/
public class DataSetUtils {
/**
* Zip DataSet elements with sequential index
* @param input input DataSet
* @return DataSet of Tuple2 with index and original element
*/
public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input);
/**
* Zip DataSet elements with unique ID
* @param input input DataSet
* @return DataSet of Tuple2 with unique ID and original element
*/
public static <T> DataSet<Tuple2<Long, T>> zipWithUniqueId(DataSet<T> input);
/**
* Sample elements from DataSet
* @param input input DataSet
* @param withReplacement whether to sample with replacement
* @param fraction fraction of elements to sample (0.0 to 1.0)
* @return DataSet with sampled elements
*/
public static <T> DataSet<T> sample(DataSet<T> input, boolean withReplacement, double fraction);
/**
* Sample elements from DataSet with random seed
* @param input input DataSet
* @param withReplacement whether to sample with replacement
* @param fraction fraction of elements to sample
* @param seed random seed for reproducible sampling
* @return DataSet with sampled elements
*/
public static <T> DataSet<T> sample(DataSet<T> input, boolean withReplacement, double fraction, long seed);
/**
* Sample fixed number of elements from DataSet
* @param input input DataSet
* @param withReplacement whether to sample with replacement
* @param numSamples number of samples to take
* @return DataSet with sampled elements
*/
public static <T> DataSet<T> sampleWithSize(DataSet<T> input, boolean withReplacement, int numSamples);
/**
* Sample fixed number of elements with random seed
* @param input input DataSet
* @param withReplacement whether to sample with replacement
* @param numSamples number of samples to take
* @param seed random seed for reproducible sampling
* @return DataSet with sampled elements
*/
public static <T> DataSet<T> sampleWithSize(DataSet<T> input, boolean withReplacement, int numSamples, long seed);
/**
* Count elements in each partition
* @param input input DataSet
* @return DataSet with partition ID and element count pairs
*/
public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T> input);
}Usage Examples:
// Add sequential index to elements
DataSet<String> words = env.fromElements("hello", "world", "flink");
DataSet<Tuple2<Long, String>> indexed = DataSetUtils.zipWithIndex(words);
// Result: (0, "hello"), (1, "world"), (2, "flink")
// Add unique IDs (not necessarily sequential)
DataSet<Tuple2<Long, String>> withIds = DataSetUtils.zipWithUniqueId(words);
// Sample 50% of elements
DataSet<String> largeDatatSet = env.readTextFile("/path/to/large/file.txt");
DataSet<String> sample = DataSetUtils.sample(largeDatatSet, false, 0.5);
// Sample fixed number of elements
DataSet<String> fixedSample = DataSetUtils.sampleWithSize(largeDatatSet, false, 1000);
// Sample with seed for reproducible results
DataSet<String> reproducibleSample = DataSetUtils.sample(largeDatatSet, false, 0.1, 12345L);
// Count elements per partition
DataSet<Tuple2<Integer, Long>> partitionCounts = DataSetUtils.countElementsPerPartition(largeDatatSet);Utilities for computing summary statistics on DataSets.
/**
* Interface for column summary statistics
*/
public interface ColumnSummary {
/**
* Get total count of elements
* @return total element count
*/
long getTotalCount();
/**
* Get count of null elements
* @return null element count
*/
long getNullCount();
/**
* Get count of non-null elements
* @return non-null element count
*/
long getNonNullCount();
}
/**
* Summary statistics for numeric columns
* @param <T> numeric type (Integer, Long, Double, etc.)
*/
public interface NumericColumnSummary<T> extends ColumnSummary {
/**
* Get minimum value
* @return minimum value
*/
T getMin();
/**
* Get maximum value
* @return maximum value
*/
T getMax();
/**
* Get sum of all values
* @return sum
*/
Double getSum();
/**
* Get mean (average) value
* @return mean value
*/
Double getMean();
/**
* Get variance
* @return variance
*/
Double getVariance();
/**
* Get standard deviation
* @return standard deviation
*/
Double getStandardDeviation();
}
/**
* Summary statistics for string columns
*/
public interface StringColumnSummary extends ColumnSummary {
/**
* Get minimum string length
* @return minimum length
*/
Integer getMinLength();
/**
* Get maximum string length
* @return maximum length
*/
Integer getMaxLength();
/**
* Get mean string length
* @return mean length
*/
Double getMeanLength();
/**
* Check if all values are numeric
* @return true if all non-null values are numeric
*/
Boolean getIsNumeric();
}
/**
* Summary statistics for boolean columns
*/
public interface BooleanColumnSummary extends ColumnSummary {
/**
* Get count of true values
* @return true count
*/
Long getTrueCount();
/**
* Get count of false values
* @return false count
*/
Long getFalseCount();
}Annotations for optimizing user-defined functions by specifying field forwarding patterns.
/**
* Container class for function annotations
*/
public class FunctionAnnotation {
/**
* Annotation to specify forwarded fields
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ForwardedFields {
String[] value();
}
/**
* Annotation for forwarded fields from first input (for two-input functions)
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ForwardedFieldsFirst {
String[] value();
}
/**
* Annotation for forwarded fields from second input (for two-input functions)
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ForwardedFieldsSecond {
String[] value();
}
/**
* Annotation to specify non-forwarded fields
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface NonForwardedFields {
String[] value();
}
/**
* Annotation to specify which fields are read by the function
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadFields {
String[] value();
}
/**
* Annotation to specify which fields are not read by the function
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SkippedFields {
String[] value();
}
}Usage Examples:
// Function that forwards first field unchanged
@FunctionAnnotation.ForwardedFields("0")
public static class AddConstantMap implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
return new Tuple2<>(value.f0, value.f1 + 10); // field 0 is forwarded, field 1 is modified
}
}
// Join function that forwards fields from both inputs
@FunctionAnnotation.ForwardedFieldsFirst("0")
@FunctionAnnotation.ForwardedFieldsSecond("1")
public static class CombineJoin implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Tuple3<String, Integer, Double>> {
@Override
public Tuple3<String, Integer, Double> join(Tuple2<String, Integer> first, Tuple2<String, Double> second) {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}
// Function that only reads certain fields
@FunctionAnnotation.ReadFields("1;2") // only reads fields 1 and 2
public static class PartialReader implements MapFunction<Tuple4<String, Integer, Double, Boolean>, String> {
@Override
public String map(Tuple4<String, Integer, Double, Boolean> value) {
return "Value: " + value.f1 + ", " + value.f2; // only uses f1 and f2
}
}import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.summarize.*;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Properties;
import java.util.List;
import java.util.Map;
import java.io.IOException;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-java