CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-core

Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.

Pending
Overview
Eval results
Files

execution-environment.mddocs/

Execution Environment

The execution environment provides the entry point for creating and configuring Flink streaming applications. It manages job execution, parallelism settings, and environment-specific configurations.

StreamExecutionEnvironment

The main entry point for all streaming applications.

public abstract class StreamExecutionEnvironment {
    // Environment creation
    public static StreamExecutionEnvironment getExecutionEnvironment();
    public static LocalStreamEnvironment createLocalEnvironment();
    public static LocalStreamEnvironment createLocalEnvironment(int parallelism);
    public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles);
    public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles);
    public static void setDefaultLocalParallelism(int parallelism);
    
    // Job execution
    public JobExecutionResult execute() throws Exception;
    public JobExecutionResult execute(String jobName) throws Exception;
    
    // Configuration
    public StreamExecutionEnvironment setParallelism(int parallelism);
    public int getParallelism();
    public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);
    public long getBufferTimeout();
    public StreamExecutionEnvironment disableOperatorChaining();
    public void setNumberOfExecutionRetries(int numberOfExecutionRetries);
    public int getNumberOfExecutionRetries();
    public void setStateHandleProvider(StateHandleProvider<?> provider);
    public StateHandleProvider<?> getStateHandleProvider();
    
    // Checkpointing
    public StreamExecutionEnvironment enableCheckpointing(long interval);
    public StreamExecutionEnvironment enableCheckpointing();
    
    // Source creation - Element and Collection sources
    public DataStreamSource<Long> generateSequence(long from, long to);
    public DataStreamSource<Long> generateParallelSequence(long from, long to);
    public <OUT> DataStreamSource<OUT> fromElements(OUT... data);
    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data);
    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo);
    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type);
    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo);
    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type);
    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo);
    
    // Source creation - Generic sources
    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function);
    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName);
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat);
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo);
    
    // Built-in sources - File sources
    public DataStreamSource<String> readTextFile(String filePath);
    public DataStreamSource<String> readTextFile(String filePath, String charsetName);
    public DataStreamSource<StringValue> readTextFileWithValue(String filePath);
    public DataStreamSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath);
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval);
    public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, Class<OUT> typeClass);
    public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, String delimiter, Class<OUT> typeClass);
    public DataStream<String> readFileStream(String filePath, long intervalMillis, WatchType watchType);
    
    // Built-in sources - Network sources
    public DataStreamSource<String> socketTextStream(String hostname, int port);
    public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter);
    public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry);
    
    // Type system and configuration
    public ExecutionConfig getConfig();
    public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer);
    public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
    public void registerType(Class<?> type);
    public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer);
    public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
    
    // Utility methods
    public <F> F clean(F f);
    public StreamGraph getStreamGraph();
    public String getExecutionPlan();
}

LocalStreamEnvironment

Environment for local execution, primarily used for development and testing.

public class LocalStreamEnvironment extends StreamExecutionEnvironment {
    public LocalStreamEnvironment();
    public LocalStreamEnvironment(Configuration configuration);
    
    @Override
    public JobExecutionResult execute() throws Exception;
    
    @Override
    public JobExecutionResult execute(String jobName) throws Exception;
}

RemoteStreamEnvironment

Environment for executing streaming jobs on a remote Flink cluster.

public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
    public RemoteStreamEnvironment(String host, int port, String... jarFiles);
    public RemoteStreamEnvironment(String host, int port, int parallelism, String... jarFiles);
    
    @Override
    public JobExecutionResult execute() throws Exception;
    
    @Override
    public JobExecutionResult execute(String jobName) throws Exception;
}

Usage Examples

Basic Environment Setup

// Get default environment (local or remote based on context)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Set parallelism
env.setParallelism(4);

// Set network buffer timeout
env.setBufferTimeout(100);

// Disable operator chaining for debugging
env.disableOperatorChaining();

Local Environment

// Create local environment with default parallelism
LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

// Create local environment with specific parallelism
LocalStreamEnvironment localEnv2 = StreamExecutionEnvironment.createLocalEnvironment(2);

Remote Environment

// Connect to remote cluster
RemoteStreamEnvironment remoteEnv = StreamExecutionEnvironment
    .createRemoteEnvironment("localhost", 6123, "/path/to/job.jar");

// With specific parallelism
RemoteStreamEnvironment remoteEnv2 = StreamExecutionEnvironment
    .createRemoteEnvironment("localhost", 6123, 4, "/path/to/job.jar");

Creating Sources

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// From elements
DataStreamSource<String> stream1 = env.fromElements("hello", "world");

// From collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStreamSource<Integer> stream2 = env.fromCollection(numbers);

// From file
DataStreamSource<String> stream3 = env.readTextFile("/path/to/input.txt");

// From socket
DataStreamSource<String> stream4 = env.socketTextStream("localhost", 9999);

// Custom source
DataStreamSource<String> stream5 = env.addSource(new MyCustomSource());

Job Execution

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create your data stream pipeline
DataStream<String> processed = env
    .socketTextStream("localhost", 9999)
    .map(String::toUpperCase)
    .filter(s -> s.length() > 5);

processed.print();

// Execute the job
JobExecutionResult result = env.execute("My Streaming Job");

// Access execution results
System.out.println("Job execution time: " + result.getNetRuntime());

Types

public class JobExecutionResult {
    public long getNetRuntime();
    public long getNetRuntime(TimeUnit desiredUnit);
    public Map<String, Object> getAllAccumulatorResults();
    public <T> T getAccumulatorResult(String name);
}

public interface StateHandleProvider<T extends StateHandle> extends Serializable {
    T createStateHandle(Serializable state) throws Exception;
}

public enum FileProcessingMode {
    PROCESS_ONCE,
    PROCESS_CONTINUOUSLY
}

public enum WatchType {
    PROCESS_ONCE,
    REPROCESS_WITH_APPENDED
}

public class StreamGraph {
    // Internal representation of the streaming dataflow graph
}

public class StringValue implements Value {
    // Flink's StringValue type for efficient string handling
    public StringValue();
    public StringValue(String value);
    public String getValue();
    public void setValue(String value);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-core

docs

checkpointing-state.md

datastream-operations.md

execution-environment.md

index.md

sources-and-sinks.md

stream-operators.md

windowing.md

tile.json