Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.
—
Sources and sinks are the entry and exit points for data in Flink streaming applications. Sources read data from external systems, while sinks write processed data to external systems.
Base interface for all source implementations.
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
interface SourceContext<T> {
void collect(T element);
Object getCheckpointLock();
}
}Source function that can run in parallel across multiple instances.
public interface ParallelSourceFunction<T> extends SourceFunction<T> {
// Inherits all methods from SourceFunction
}Rich source function with access to runtime context and lifecycle methods.
public abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T> {
// Lifecycle methods from AbstractRichFunction
public void open(Configuration parameters) throws Exception;
public void close() throws Exception;
public RuntimeContext getRuntimeContext();
// Must implement SourceFunction methods
public abstract void run(SourceContext<T> ctx) throws Exception;
public abstract void cancel();
}Rich parallel source function combining both capabilities.
public abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T> {
// Inherits all methods from RichSourceFunction and ParallelSourceFunction
}Creates a source from a Java Iterator.
public class FromIteratorFunction<T> implements SourceFunction<T> {
public FromIteratorFunction(Iterator<T> iterator);
@Override
public void run(SourceContext<T> ctx) throws Exception;
@Override
public void cancel();
}Creates a parallel source from a splittable iterator.
public class FromSplittableIteratorFunction<T> implements ParallelSourceFunction<T> {
public FromSplittableIteratorFunction(SplittableIterator<T> iterator);
@Override
public void run(SourceContext<T> ctx) throws Exception;
@Override
public void cancel();
}Reads text data from a TCP socket.
public class SocketTextStreamFunction extends RichSourceFunction<String> {
public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries);
@Override
public void run(SourceContext<String> ctx) throws Exception;
@Override
public void cancel();
}Monitors a file system path for new files and processes them.
public class FileMonitoringFunction<T> extends RichSourceFunction<T> {
public FileMonitoringFunction(String path, FileInputFormat<T> format, FileProcessingMode watchType, long interval);
@Override
public void run(SourceContext<T> ctx) throws Exception;
@Override
public void cancel();
}Base interface for all sink implementations.
public interface SinkFunction<T> extends Function, Serializable {
void invoke(T value) throws Exception;
}Rich sink function with access to runtime context and lifecycle methods.
public abstract class RichSinkFunction<T> extends AbstractRichFunction implements SinkFunction<T> {
// Lifecycle methods from AbstractRichFunction
public void open(Configuration parameters) throws Exception;
public void close() throws Exception;
public RuntimeContext getRuntimeContext();
// Must implement SinkFunction method
public abstract void invoke(T value) throws Exception;
}Prints elements to stdout or stderr.
public class PrintSinkFunction<T> extends RichSinkFunction<T> {
public PrintSinkFunction();
public PrintSinkFunction(boolean stdErr);
public PrintSinkFunction(String sinkIdentifier);
public PrintSinkFunction(String sinkIdentifier, boolean stdErr);
@Override
public void invoke(T record) throws Exception;
}Writes elements to files using a specified WriteFormat.
public class FileSinkFunction<T> extends RichSinkFunction<T> {
public FileSinkFunction(String path, WriteFormat<T> format);
@Override
public void invoke(T value) throws Exception;
}Writes elements using an OutputFormat.
public class WriteSinkFunction<T> extends RichSinkFunction<T> {
public WriteSinkFunction(OutputFormat<T> format);
@Override
public void invoke(T value) throws Exception;
}Writes elements to a socket connection.
public class SocketClientSink<T> extends RichSinkFunction<T> {
public SocketClientSink(String hostName, int port, SerializationSchema<T> schema);
public SocketClientSink(String hostName, int port, SerializationSchema<T> schema, int maxRetry);
@Override
public void invoke(T value) throws Exception;
}Write formats define how data is serialized when writing to files.
Base interface for write formats.
public interface WriteFormat<T> extends Serializable {
void open(int taskNumber, int numTasks) throws IOException;
void writeRecord(T record) throws IOException;
void close() throws IOException;
}Writes elements as text with toString().
public class WriteFormatAsText<T> implements WriteFormat<T> {
public WriteFormatAsText();
public WriteFormatAsText(String charset);
@Override
public void open(int taskNumber, int numTasks) throws IOException;
@Override
public void writeRecord(T record) throws IOException;
@Override
public void close() throws IOException;
}Writes Tuple elements as CSV format.
public class WriteFormatAsCsv<T extends Tuple> implements WriteFormat<T> {
public WriteFormatAsCsv();
public WriteFormatAsCsv(String fieldDelimiter);
public WriteFormatAsCsv(String fieldDelimiter, String charset);
@Override
public void open(int taskNumber, int numTasks) throws IOException;
@Override
public void writeRecord(T record) throws IOException;
@Override
public void close() throws IOException;
}public class NumberSequenceSource implements SourceFunction<Long> {
private long start;
private long end;
private volatile boolean isRunning = true;
public NumberSequenceSource(long start, long end) {
this.start = start;
this.end = end;
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
for (long i = start; i <= end && isRunning; i++) {
ctx.collect(i);
Thread.sleep(1000); // Emit one number per second
}
}
@Override
public void cancel() {
isRunning = false;
}
}
// Usage
DataStream<Long> numbers = env.addSource(new NumberSequenceSource(1, 100));public class StatefulCounterSource extends RichSourceFunction<String> implements Checkpointed<Long> {
private volatile boolean isRunning = true;
private Long count = 0L;
@Override
public void open(Configuration parameters) {
// Initialize resources
System.out.println("Source opened with subtask index: " +
getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect("Count: " + count);
count++;
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return count;
}
@Override
public void restoreState(Long state) {
this.count = state;
}
}public class DatabaseSink extends RichSinkFunction<Tuple2<String, Integer>> {
private Connection connection;
private PreparedStatement statement;
@Override
public void open(Configuration parameters) throws Exception {
// Initialize database connection
connection = DriverManager.getConnection("jdbc:mysql://localhost/test", "user", "pass");
statement = connection.prepareStatement("INSERT INTO counts VALUES (?, ?)");
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
statement.setString(1, value.f0);
statement.setInt(2, value.f1);
statement.executeUpdate();
}
@Override
public void close() throws Exception {
if (statement != null) statement.close();
if (connection != null) connection.close();
}
}
// Usage
wordCounts.addSink(new DatabaseSink());// Text file source
DataStream<String> textStream = env.readTextFile("/path/to/input.txt");
// Socket source
DataStream<String> socketStream = env.socketTextStream("localhost", 9999, "\n");
// File monitoring source
FileInputFormat<String> format = new TextInputFormat(new Path("/path/to/monitor"));
DataStream<String> monitoredStream = env.readFile(format, "/path/to/monitor",
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);
// Text file sink
processedStream.writeAsText("/path/to/output.txt");
// CSV sink
DataStream<Tuple3<String, Integer, Double>> tuples = processedStream
.map(value -> new Tuple3<>(value, value.length(), Math.random()));
tuples.writeAsCsv("/path/to/output.csv");
// Custom file sink with WriteFormat
processedStream.addSink(new FileSinkFunction<>("/path/to/output",
new WriteFormatAsText<String>()));// Print sink
processedStream.print();
// Print to stderr with identifier
processedStream.addSink(new PrintSinkFunction<>("MyStream", true));
// Socket sink
SerializationSchema<String> schema = new SimpleStringSchema();
processedStream.addSink(new SocketClientSink<>("localhost", 9999, schema));// Serialization interfaces
public interface SerializationSchema<T> extends Serializable {
byte[] serialize(T element);
}
public interface DeserializationSchema<T> extends Serializable {
T deserialize(byte[] message) throws IOException;
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}
// Splittable iterator for parallel sources
public interface SplittableIterator<T> extends Iterator<T>, Serializable {
SplittableIterator<T>[] split(int numPartitions);
int getMaximumNumberOfSplits();
}
// Watermark for event time processing
public class Watermark implements Serializable {
public Watermark(long timestamp);
public long getTimestamp();
}
// File processing modes
public enum FileProcessingMode {
PROCESS_ONCE,
PROCESS_CONTINUOUSLY
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-core