Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.
—
The execution environment provides the entry point for creating and configuring Flink streaming applications. It manages job execution, parallelism settings, and environment-specific configurations.
The main entry point for all streaming applications.
public abstract class StreamExecutionEnvironment {
// Environment creation
public static StreamExecutionEnvironment getExecutionEnvironment();
public static LocalStreamEnvironment createLocalEnvironment();
public static LocalStreamEnvironment createLocalEnvironment(int parallelism);
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles);
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles);
public static void setDefaultLocalParallelism(int parallelism);
// Job execution
public JobExecutionResult execute() throws Exception;
public JobExecutionResult execute(String jobName) throws Exception;
// Configuration
public StreamExecutionEnvironment setParallelism(int parallelism);
public int getParallelism();
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);
public long getBufferTimeout();
public StreamExecutionEnvironment disableOperatorChaining();
public void setNumberOfExecutionRetries(int numberOfExecutionRetries);
public int getNumberOfExecutionRetries();
public void setStateHandleProvider(StateHandleProvider<?> provider);
public StateHandleProvider<?> getStateHandleProvider();
// Checkpointing
public StreamExecutionEnvironment enableCheckpointing(long interval);
public StreamExecutionEnvironment enableCheckpointing();
// Source creation - Element and Collection sources
public DataStreamSource<Long> generateSequence(long from, long to);
public DataStreamSource<Long> generateParallelSequence(long from, long to);
public <OUT> DataStreamSource<OUT> fromElements(OUT... data);
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data);
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo);
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type);
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo);
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type);
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo);
// Source creation - Generic sources
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function);
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName);
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat);
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo);
// Built-in sources - File sources
public DataStreamSource<String> readTextFile(String filePath);
public DataStreamSource<String> readTextFile(String filePath, String charsetName);
public DataStreamSource<StringValue> readTextFileWithValue(String filePath);
public DataStreamSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath);
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval);
public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, Class<OUT> typeClass);
public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, String delimiter, Class<OUT> typeClass);
public DataStream<String> readFileStream(String filePath, long intervalMillis, WatchType watchType);
// Built-in sources - Network sources
public DataStreamSource<String> socketTextStream(String hostname, int port);
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter);
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry);
// Type system and configuration
public ExecutionConfig getConfig();
public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer);
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
public void registerType(Class<?> type);
public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer);
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
// Utility methods
public <F> F clean(F f);
public StreamGraph getStreamGraph();
public String getExecutionPlan();
}Environment for local execution, primarily used for development and testing.
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
public LocalStreamEnvironment();
public LocalStreamEnvironment(Configuration configuration);
@Override
public JobExecutionResult execute() throws Exception;
@Override
public JobExecutionResult execute(String jobName) throws Exception;
}Environment for executing streaming jobs on a remote Flink cluster.
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
public RemoteStreamEnvironment(String host, int port, String... jarFiles);
public RemoteStreamEnvironment(String host, int port, int parallelism, String... jarFiles);
@Override
public JobExecutionResult execute() throws Exception;
@Override
public JobExecutionResult execute(String jobName) throws Exception;
}// Get default environment (local or remote based on context)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(4);
// Set network buffer timeout
env.setBufferTimeout(100);
// Disable operator chaining for debugging
env.disableOperatorChaining();// Create local environment with default parallelism
LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
// Create local environment with specific parallelism
LocalStreamEnvironment localEnv2 = StreamExecutionEnvironment.createLocalEnvironment(2);// Connect to remote cluster
RemoteStreamEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", 6123, "/path/to/job.jar");
// With specific parallelism
RemoteStreamEnvironment remoteEnv2 = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", 6123, 4, "/path/to/job.jar");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// From elements
DataStreamSource<String> stream1 = env.fromElements("hello", "world");
// From collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStreamSource<Integer> stream2 = env.fromCollection(numbers);
// From file
DataStreamSource<String> stream3 = env.readTextFile("/path/to/input.txt");
// From socket
DataStreamSource<String> stream4 = env.socketTextStream("localhost", 9999);
// Custom source
DataStreamSource<String> stream5 = env.addSource(new MyCustomSource());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create your data stream pipeline
DataStream<String> processed = env
.socketTextStream("localhost", 9999)
.map(String::toUpperCase)
.filter(s -> s.length() > 5);
processed.print();
// Execute the job
JobExecutionResult result = env.execute("My Streaming Job");
// Access execution results
System.out.println("Job execution time: " + result.getNetRuntime());public class JobExecutionResult {
public long getNetRuntime();
public long getNetRuntime(TimeUnit desiredUnit);
public Map<String, Object> getAllAccumulatorResults();
public <T> T getAccumulatorResult(String name);
}
public interface StateHandleProvider<T extends StateHandle> extends Serializable {
T createStateHandle(Serializable state) throws Exception;
}
public enum FileProcessingMode {
PROCESS_ONCE,
PROCESS_CONTINUOUSLY
}
public enum WatchType {
PROCESS_ONCE,
REPROCESS_WITH_APPENDED
}
public class StreamGraph {
// Internal representation of the streaming dataflow graph
}
public class StringValue implements Value {
// Flink's StringValue type for efficient string handling
public StringValue();
public StringValue(String value);
public String getValue();
public void setValue(String value);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-core