Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
The StreamExecutionEnvironment is the main entry point for creating and configuring Apache Flink streaming applications. It provides methods to create data streams from various sources, configure runtime settings, and execute streaming jobs.
Create different types of execution environments based on your deployment needs.
/**
* Get the default execution environment, which is determined based on the context.
* In an IDE or as a regular program: creates a local environment
* In a cluster: creates the cluster environment
*/
static StreamExecutionEnvironment getExecutionEnvironment();
/**
* Create a local execution environment for testing and development
* @param parallelism - the parallelism for the local environment
*/
static StreamExecutionEnvironment createLocalEnvironment(int parallelism);
/**
* Create a local execution environment with default parallelism
*/
static StreamExecutionEnvironment createLocalEnvironment();
/**
* Create a remote execution environment for cluster deployment
* @param host - the host of the JobManager
* @param port - the port of the JobManager
* @param jarFiles - JAR files to be shipped to the cluster
*/
static StreamExecutionEnvironment createRemoteEnvironment(
String host,
int port,
String... jarFiles
);Usage Examples:
// Get default environment (most common)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create local environment for testing
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(4);
// Create remote environment for cluster
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
"jobmanager-host", 6123, "my-job.jar"
);Create data streams from various built-in sources.
/**
* Create a DataStream from the given elements
* @param data - the elements to create the stream from
*/
<T> DataStreamSource<T> fromElements(T... data);
/**
* Create a DataStream from a collection
* @param data - the collection to create the stream from
*/
<T> DataStreamSource<T> fromCollection(Collection<T> data);
/**
* Create a DataStream from a collection with type information
* @param data - the collection to create the stream from
* @param typeInfo - explicit type information
*/
<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);
/**
* Add a custom source function to create a data stream
* @param function - the source function
*/
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
/**
* Add a custom source function with type information
* @param function - the source function
* @param typeInfo - explicit type information
*/
<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);
/**
* Read text from a socket connection
* @param hostname - the hostname to connect to
* @param port - the port to connect to
*/
DataStreamSource<String> socketTextStream(String hostname, int port);
/**
* Read text from a socket with custom delimiter
* @param hostname - the hostname to connect to
* @param port - the port to connect to
* @param delimiter - the delimiter to split records
*/
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);
/**
* Read the entire text file from the file system
* @param filePath - the path to the file
*/
DataStreamSource<String> readTextFile(String filePath);
/**
* Read text file with character set
* @param filePath - the path to the file
* @param charsetName - the character set name
*/
DataStreamSource<String> readTextFile(String filePath, String charsetName);
/**
* Generate a sequence of numbers (deprecated)
* @deprecated Use fromSequence() instead
* @param from - start of sequence (inclusive)
* @param to - end of sequence (inclusive)
*/
@Deprecated
DataStreamSource<Long> generateSequence(long from, long to);
/**
* Create a sequence of numbers from 'from' to 'to'
* @param from - start of sequence (inclusive)
* @param to - end of sequence (inclusive)
* @return DataStream of Long values
*/
DataStreamSource<Long> fromSequence(long from, long to);
/**
* Create a DataStream from a modern unified Source
* @param source - the Source to read from
* @param timestampsAndWatermarks - watermark strategy for event time
* @param sourceName - name of the source
* @return DataStream from the source
*/
<T> DataStreamSource<T> fromSource(
Source<T, ?, ?> source,
WatermarkStrategy<T> timestampsAndWatermarks,
String sourceName
);Usage Examples:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// From elements
DataStream<String> elements = env.fromElements("hello", "world", "flink");
// From collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> fromList = env.fromCollection(numbers);
// Socket stream
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
// File stream
DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");
// Sequence
DataStream<Long> sequence = env.generateSequence(1, 1000);
// Custom source
DataStream<MyEvent> customStream = env.addSource(new MyCustomSource());Execute streaming jobs and handle execution results.
/**
* Execute the streaming job
* @return JobExecutionResult containing execution information
*/
JobExecutionResult execute() throws Exception;
/**
* Execute the streaming job with a custom name
* @param jobName - the name of the job
* @return JobExecutionResult containing execution information
*/
JobExecutionResult execute(String jobName) throws Exception;
/**
* Execute the streaming job asynchronously
* @return JobClient for managing the running job
*/
JobClient executeAsync() throws Exception;
/**
* Execute the streaming job asynchronously with a custom name
* @param jobName - the name of the job
* @return JobClient for managing the running job
*/
JobClient executeAsync(String jobName) throws Exception;Usage Examples:
// Execute with default name
JobExecutionResult result = env.execute();
// Execute with custom name
JobExecutionResult result = env.execute("My Streaming Job");
// Access execution results
System.out.println("Job ID: " + result.getJobID());
System.out.println("Execution Time: " + result.getNetRuntime());Configure runtime settings and execution parameters.
/**
* Set the parallelism for operations executed through this environment
* @param parallelism - the parallelism
*/
StreamExecutionEnvironment setParallelism(int parallelism);
/**
* Get the default parallelism
*/
int getParallelism();
/**
* Set the maximum parallelism
* @param maxParallelism - the maximum parallelism
*/
StreamExecutionEnvironment setMaxParallelism(int maxParallelism);
/**
* Get the maximum parallelism
*/
int getMaxParallelism();
/**
* Enable checkpointing for fault tolerance
* @param interval - checkpoint interval in milliseconds
*/
StreamExecutionEnvironment enableCheckpointing(long interval);
/**
* Enable checkpointing with mode
* @param interval - checkpoint interval in milliseconds
* @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)
*/
StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);
/**
* Get the checkpoint configuration
*/
CheckpointConfig getCheckpointConfig();
/**
* Set the time characteristic for the application (deprecated)
* @deprecated Time characteristics are deprecated. Use source-based watermark assignment instead.
* @param characteristic - the time characteristic (EventTime, ProcessingTime, IngestionTime)
*/
@Deprecated
StreamExecutionEnvironment setStreamTimeCharacteristic(TimeCharacteristic characteristic);
/**
* Set the runtime execution mode (batch or streaming)
* @param executionMode - BATCH for bounded data, STREAMING for unbounded data
* @return StreamExecutionEnvironment for method chaining
*/
StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode executionMode);
/**
* Get the time characteristic
*/
TimeCharacteristic getStreamTimeCharacteristic();
/**
* Set the buffer timeout for network buffers
* @param timeoutMillis - timeout in milliseconds
*/
StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);
/**
* Get the buffer timeout
*/
long getBufferTimeout();Usage Examples:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(4);
env.setMaxParallelism(128);
// Enable checkpointing
env.enableCheckpointing(5000); // every 5 seconds
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// Configure checkpointing
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
// Set time characteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Set buffer timeout
env.setBufferTimeout(100);Access and modify execution configuration settings.
/**
* Get the execution configuration
*/
ExecutionConfig getConfig();Usage Examples:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig config = env.getConfig();
// Configure execution settings
config.setAutoWatermarkInterval(1000);
config.setLatencyTrackingInterval(2000);
config.enableObjectReuse();
config.setGlobalJobParameters(ParameterTool.fromArgs(args));abstract class StreamExecutionEnvironment {
// Factory methods, source creation, execution, and configuration methods as above
}
class LocalStreamEnvironment extends StreamExecutionEnvironment {
// Local execution environment implementation
}
class RemoteStreamEnvironment extends StreamExecutionEnvironment {
// Remote cluster execution environment implementation
}
// Data source type
class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
// Represents a data source in the streaming topology
}enum TimeCharacteristic {
ProcessingTime, // Processing time semantics
IngestionTime, // Ingestion time semantics
EventTime // Event time semantics
}
enum CheckpointingMode {
EXACTLY_ONCE, // Exactly-once processing guarantees
AT_LEAST_ONCE // At-least-once processing guarantees
}
class ExecutionConfig {
// Configuration for job execution
void setAutoWatermarkInterval(long interval);
void setLatencyTrackingInterval(long interval);
void enableObjectReuse();
void setGlobalJobParameters(GlobalJobParameters parameters);
}
class CheckpointConfig {
// Configuration for checkpointing
void setCheckpointingMode(CheckpointingMode mode);
void setMinPauseBetweenCheckpoints(long minPause);
void setCheckpointTimeout(long timeout);
void setMaxConcurrentCheckpoints(int maxConcurrent);
}class JobExecutionResult {
// Result of job execution
JobID getJobID();
long getNetRuntime();
long getNetRuntime(TimeUnit desiredUnit);
Map<String, Object> getAllAccumulatorResults();
<T> T getAccumulatorResult(String accumulatorName);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11