Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
Sources and sinks are the entry and exit points for data in Apache Flink streaming applications. Sources ingest data from external systems, while sinks output processed results to external systems.
Pre-defined sources for common data ingestion patterns.
// Element-based sources
<T> DataStreamSource<T> fromElements(T... data);
<T> DataStreamSource<T> fromCollection(Collection<T> data);
<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);
// File-based sources
DataStreamSource<String> readTextFile(String filePath);
DataStreamSource<String> readTextFile(String filePath, String charsetName);
DataStreamSource<String> readFile(FileInputFormat<String> inputFormat, String filePath);
// Network sources
DataStreamSource<String> socketTextStream(String hostname, int port);
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry);
// Sequence sources
DataStreamSource<Long> generateSequence(long from, long to);
DataStreamSource<Long> fromSequence(long from, long to);
// Custom sources
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
<T> DataStreamSource<T> addSource(SourceFunction<T> function, String sourceName);
<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);Usage Examples:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// From elements
DataStream<String> words = env.fromElements("hello", "world", "flink");
// From collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> numberStream = env.fromCollection(numbers);
// From file
DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");
// Socket stream
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
// Sequence
DataStream<Long> sequence = env.generateSequence(1, 1000000);
// Custom source
DataStream<Event> eventStream = env.addSource(new CustomEventSource());Implement custom sources using SourceFunction interface.
/**
* Interface for source functions
*/
interface SourceFunction<T> extends Function {
/**
* Main method to emit elements
* @param ctx - source context for emitting elements
*/
void run(SourceContext<T> ctx) throws Exception;
/**
* Cancel the source
*/
void cancel();
/**
* Source context for emitting elements
*/
interface SourceContext<T> {
void collect(T element);
void collectWithTimestamp(T element, long timestamp);
void emitWatermark(Watermark mark);
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
/**
* Rich source function with lifecycle methods
*/
abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T> {
// Inherits open(), close(), getRuntimeContext()
}
/**
* Rich parallel source function for parallel sources
*/
abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T> {
// Can run in parallel with multiple instances
}Usage Examples:
// Simple custom source
class NumberSource implements SourceFunction<Integer> {
private volatile boolean running = true;
private int counter = 0;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running && counter < 1000) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(counter++);
}
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
}
// Rich source with state
class StatefulSource extends RichSourceFunction<Event> {
private ListState<Long> offsetState;
private volatile boolean running = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<Long> descriptor =
new ListStateDescriptor<>("offset", Long.class);
offsetState = getRuntimeContext().getListState(descriptor);
}
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// Restore offset from state
long offset = 0;
for (Long o : offsetState.get()) {
offset = o;
}
while (running) {
// Emit event with timestamp
Event event = fetchNextEvent(offset);
ctx.collectWithTimestamp(event, event.getTimestamp());
// Emit watermark
ctx.emitWatermark(new Watermark(event.getTimestamp() - 5000));
offset++;
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
offsetState.clear();
offsetState.add(currentOffset);
}
@Override
public void cancel() {
running = false;
}
}Pre-defined sinks for common output patterns.
// Console output
DataStreamSink<T> print();
DataStreamSink<T> print(String sinkIdentifier);
DataStreamSink<T> printToErr();
DataStreamSink<T> printToErr(String sinkIdentifier);
// File output
DataStreamSink<T> writeAsText(String path);
DataStreamSink<T> writeAsText(String path, WriteMode writeMode);
DataStreamSink<T> writeAsCsv(String path);
DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);
DataStreamSink<T> writeAsCsv(String path, String rowDelimiter, String fieldDelimiter);
// Socket output
DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema);
// Custom sinks
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction, String name);Usage Examples:
DataStream<String> result = processedStream;
// Print to console
result.print();
result.print("MyOutput");
// Write to file
result.writeAsText("/path/to/output.txt");
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);
// Write CSV
tupleStream.writeAsCsv("/path/to/output.csv", "\n", ",");
// Socket output
result.writeToSocket("localhost", 9999, new SimpleStringSchema());
// Custom sink
result.addSink(new CustomSink());Implement custom sinks using SinkFunction interface.
/**
* Interface for sink functions
*/
interface SinkFunction<IN> extends Function {
/**
* Process each element
* @param value - input element
* @param context - sink context
*/
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
/**
* Simple invoke method (deprecated in favor of invoke with context)
* @param value - input element
*/
default void invoke(IN value) throws Exception {}
/**
* Sink context interface
*/
interface Context {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}
/**
* Rich sink function with lifecycle methods
*/
abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
// Inherits open(), close(), getRuntimeContext()
}Usage Examples:
// Simple custom sink
class DatabaseSink implements SinkFunction<Event> {
private transient Connection connection;
@Override
public void invoke(Event event, Context context) throws Exception {
if (connection == null) {
connection = DriverManager.getConnection("jdbc:...");
}
PreparedStatement stmt = connection.prepareStatement(
"INSERT INTO events (id, value, timestamp) VALUES (?, ?, ?)"
);
stmt.setString(1, event.getId());
stmt.setString(2, event.getValue());
stmt.setLong(3, event.getTimestamp());
stmt.executeUpdate();
}
}
// Rich sink with connection pooling
class PooledDatabaseSink extends RichSinkFunction<Event> {
private transient ConnectionPool pool;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
pool = new ConnectionPool();
}
@Override
public void invoke(Event event, Context context) throws Exception {
try (Connection conn = pool.getConnection()) {
// Insert logic
}
}
@Override
public void close() throws Exception {
if (pool != null) {
pool.close();
}
super.close();
}
}Advanced file sink for streaming applications with exactly-once guarantees.
/**
* File sink for streaming applications
*/
class StreamingFileSink<IN> implements SinkFunction<IN> {
/**
* Create row format builder for text-based files
*/
static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat(
Path basePath,
Encoder<IN> encoder
);
/**
* Create bulk format builder for columnar formats (Parquet, ORC)
*/
static <IN> StreamingFileSink.BulkFormatBuilder<IN, IN> forBulkFormat(
Path basePath,
BulkWriter.Factory<IN> writerFactory
);
}Usage Examples:
// Row format (text files)
StreamingFileSink<String> textSink = StreamingFileSink
.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
.build();
dataStream.addSink(textSink);
// Bulk format (Parquet)
StreamingFileSink<Event> parquetSink = StreamingFileSink
.forBulkFormat(
new Path("/path/to/output"),
ParquetAvroWriters.forReflectRecord(Event.class)
)
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
.build();
eventStream.addSink(parquetSink);// Base source function
interface SourceFunction<T> extends Function {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
interface SourceContext<T> {
void collect(T element);
void collectWithTimestamp(T element, long timestamp);
void emitWatermark(Watermark mark);
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
// Parallel source function
interface ParallelSourceFunction<T> extends SourceFunction<T> {
// Marker interface for sources that can run in parallel
}
// Rich source functions
abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T>;
abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T>;
// Checkpointed source function
interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
}// Base sink function
interface SinkFunction<IN> extends Function {
default void invoke(IN value, Context context) throws Exception;
default void invoke(IN value) throws Exception;
interface Context {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}
// Rich sink function
abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>;
// Two-phase commit sink function for exactly-once semantics
abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
abstract TXN beginTransaction() throws Exception;
abstract void preCommit(TXN transaction) throws Exception;
abstract void commit(TXN transaction);
abstract void abort(TXN transaction);
}// Write modes
enum WriteMode {
NO_OVERWRITE, // Fail if file exists
OVERWRITE // Overwrite existing files
}
// Watermark
class Watermark implements Serializable {
public Watermark(long timestamp);
long getTimestamp();
}
// Encoders for StreamingFileSink
interface Encoder<IN> extends Serializable {
void encode(IN element, OutputStream stream) throws IOException;
}
class SimpleStringEncoder<IN> implements Encoder<IN> {
public SimpleStringEncoder();
public SimpleStringEncoder(String charset);
}
// Bucket assigners
interface BucketAssigner<IN, BucketID> extends Serializable {
BucketID getBucketId(IN element, Context context);
interface Context {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}
class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
public DateTimeBucketAssigner(String formatString);
public DateTimeBucketAssigner(String formatString, ZoneId zoneId);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11