CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-core

Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.

Pending
Overview
Eval results
Files

sources-and-sinks.mddocs/

Sources and Sinks

Sources and sinks are the entry and exit points for data in Flink streaming applications. Sources read data from external systems, while sinks write processed data to external systems.

Source Functions

SourceFunction<T>

Base interface for all source implementations.

public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
    
    interface SourceContext<T> {
        void collect(T element);
        Object getCheckpointLock();
    }
}

ParallelSourceFunction<T>

Source function that can run in parallel across multiple instances.

public interface ParallelSourceFunction<T> extends SourceFunction<T> {
    // Inherits all methods from SourceFunction
}

RichSourceFunction<T>

Rich source function with access to runtime context and lifecycle methods.

public abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T> {
    // Lifecycle methods from AbstractRichFunction
    public void open(Configuration parameters) throws Exception;
    public void close() throws Exception;
    public RuntimeContext getRuntimeContext();
    
    // Must implement SourceFunction methods
    public abstract void run(SourceContext<T> ctx) throws Exception;
    public abstract void cancel();
}

RichParallelSourceFunction<T>

Rich parallel source function combining both capabilities.

public abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T> {
    // Inherits all methods from RichSourceFunction and ParallelSourceFunction
}

Built-in Source Implementations

FromIteratorFunction<T>

Creates a source from a Java Iterator.

public class FromIteratorFunction<T> implements SourceFunction<T> {
    public FromIteratorFunction(Iterator<T> iterator);
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception;
    
    @Override
    public void cancel();
}

FromSplittableIteratorFunction<T>

Creates a parallel source from a splittable iterator.

public class FromSplittableIteratorFunction<T> implements ParallelSourceFunction<T> {
    public FromSplittableIteratorFunction(SplittableIterator<T> iterator);
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception;
    
    @Override
    public void cancel();
}

SocketTextStreamFunction

Reads text data from a TCP socket.

public class SocketTextStreamFunction extends RichSourceFunction<String> {
    public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries);
    
    @Override
    public void run(SourceContext<String> ctx) throws Exception;
    
    @Override
    public void cancel();
}

FileMonitoringFunction<T>

Monitors a file system path for new files and processes them.

public class FileMonitoringFunction<T> extends RichSourceFunction<T> {
    public FileMonitoringFunction(String path, FileInputFormat<T> format, FileProcessingMode watchType, long interval);
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception;
    
    @Override
    public void cancel();
}

Sink Functions

SinkFunction<T>

Base interface for all sink implementations.

public interface SinkFunction<T> extends Function, Serializable {
    void invoke(T value) throws Exception;
}

RichSinkFunction<T>

Rich sink function with access to runtime context and lifecycle methods.

public abstract class RichSinkFunction<T> extends AbstractRichFunction implements SinkFunction<T> {
    // Lifecycle methods from AbstractRichFunction
    public void open(Configuration parameters) throws Exception;
    public void close() throws Exception;
    public RuntimeContext getRuntimeContext();
    
    // Must implement SinkFunction method
    public abstract void invoke(T value) throws Exception;
}

Built-in Sink Implementations

PrintSinkFunction<T>

Prints elements to stdout or stderr.

public class PrintSinkFunction<T> extends RichSinkFunction<T> {
    public PrintSinkFunction();
    public PrintSinkFunction(boolean stdErr);
    public PrintSinkFunction(String sinkIdentifier);
    public PrintSinkFunction(String sinkIdentifier, boolean stdErr);
    
    @Override
    public void invoke(T record) throws Exception;
}

FileSinkFunction<T>

Writes elements to files using a specified WriteFormat.

public class FileSinkFunction<T> extends RichSinkFunction<T> {
    public FileSinkFunction(String path, WriteFormat<T> format);
    
    @Override
    public void invoke(T value) throws Exception;
}

WriteSinkFunction<T>

Writes elements using an OutputFormat.

public class WriteSinkFunction<T> extends RichSinkFunction<T> {
    public WriteSinkFunction(OutputFormat<T> format);
    
    @Override
    public void invoke(T value) throws Exception;
}

SocketClientSink<T>

Writes elements to a socket connection.

public class SocketClientSink<T> extends RichSinkFunction<T> {
    public SocketClientSink(String hostName, int port, SerializationSchema<T> schema);
    public SocketClientSink(String hostName, int port, SerializationSchema<T> schema, int maxRetry);
    
    @Override
    public void invoke(T value) throws Exception;
}

Write Formats

Write formats define how data is serialized when writing to files.

WriteFormat<T>

Base interface for write formats.

public interface WriteFormat<T> extends Serializable {
    void open(int taskNumber, int numTasks) throws IOException;
    void writeRecord(T record) throws IOException;
    void close() throws IOException;
}

WriteFormatAsText<T>

Writes elements as text with toString().

public class WriteFormatAsText<T> implements WriteFormat<T> {
    public WriteFormatAsText();
    public WriteFormatAsText(String charset);
    
    @Override
    public void open(int taskNumber, int numTasks) throws IOException;
    
    @Override
    public void writeRecord(T record) throws IOException;
    
    @Override
    public void close() throws IOException;
}

WriteFormatAsCsv<T>

Writes Tuple elements as CSV format.

public class WriteFormatAsCsv<T extends Tuple> implements WriteFormat<T> {
    public WriteFormatAsCsv();
    public WriteFormatAsCsv(String fieldDelimiter);
    public WriteFormatAsCsv(String fieldDelimiter, String charset);
    
    @Override
    public void open(int taskNumber, int numTasks) throws IOException;
    
    @Override
    public void writeRecord(T record) throws IOException;
    
    @Override
    public void close() throws IOException;
}

Usage Examples

Custom Source Function

public class NumberSequenceSource implements SourceFunction<Long> {
    private long start;
    private long end;
    private volatile boolean isRunning = true;
    
    public NumberSequenceSource(long start, long end) {
        this.start = start;
        this.end = end;
    }
    
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        for (long i = start; i <= end && isRunning; i++) {
            ctx.collect(i);
            Thread.sleep(1000); // Emit one number per second
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
}

// Usage
DataStream<Long> numbers = env.addSource(new NumberSequenceSource(1, 100));

Rich Source with State

public class StatefulCounterSource extends RichSourceFunction<String> implements Checkpointed<Long> {
    private volatile boolean isRunning = true;
    private Long count = 0L;
    
    @Override
    public void open(Configuration parameters) {
        // Initialize resources
        System.out.println("Source opened with subtask index: " + 
            getRuntimeContext().getIndexOfThisSubtask());
    }
    
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect("Count: " + count);
                count++;
            }
            Thread.sleep(1000);
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
    
    @Override
    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
        return count;
    }
    
    @Override
    public void restoreState(Long state) {
        this.count = state;
    }
}

Custom Sink Function

public class DatabaseSink extends RichSinkFunction<Tuple2<String, Integer>> {
    private Connection connection;
    private PreparedStatement statement;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialize database connection
        connection = DriverManager.getConnection("jdbc:mysql://localhost/test", "user", "pass");
        statement = connection.prepareStatement("INSERT INTO counts VALUES (?, ?)");
    }
    
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        statement.setString(1, value.f0);
        statement.setInt(2, value.f1);
        statement.executeUpdate();
    }
    
    @Override
    public void close() throws Exception {
        if (statement != null) statement.close();
        if (connection != null) connection.close();
    }
}

// Usage
wordCounts.addSink(new DatabaseSink());

File Sources and Sinks

// Text file source
DataStream<String> textStream = env.readTextFile("/path/to/input.txt");

// Socket source
DataStream<String> socketStream = env.socketTextStream("localhost", 9999, "\n");

// File monitoring source
FileInputFormat<String> format = new TextInputFormat(new Path("/path/to/monitor"));
DataStream<String> monitoredStream = env.readFile(format, "/path/to/monitor", 
    FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);

// Text file sink
processedStream.writeAsText("/path/to/output.txt");

// CSV sink
DataStream<Tuple3<String, Integer, Double>> tuples = processedStream
    .map(value -> new Tuple3<>(value, value.length(), Math.random()));
tuples.writeAsCsv("/path/to/output.csv");

// Custom file sink with WriteFormat
processedStream.addSink(new FileSinkFunction<>("/path/to/output", 
    new WriteFormatAsText<String>()));

Built-in Sinks

// Print sink
processedStream.print();

// Print to stderr with identifier
processedStream.addSink(new PrintSinkFunction<>("MyStream", true));

// Socket sink
SerializationSchema<String> schema = new SimpleStringSchema();
processedStream.addSink(new SocketClientSink<>("localhost", 9999, schema));

Types

// Serialization interfaces
public interface SerializationSchema<T> extends Serializable {
    byte[] serialize(T element);
}

public interface DeserializationSchema<T> extends Serializable {
    T deserialize(byte[] message) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

// Splittable iterator for parallel sources
public interface SplittableIterator<T> extends Iterator<T>, Serializable {
    SplittableIterator<T>[] split(int numPartitions);
    int getMaximumNumberOfSplits();
}

// Watermark for event time processing
public class Watermark implements Serializable {
    public Watermark(long timestamp);
    public long getTimestamp();
}

// File processing modes
public enum FileProcessingMode {
    PROCESS_ONCE,
    PROCESS_CONTINUOUSLY
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-core

docs

checkpointing-state.md

datastream-operations.md

execution-environment.md

index.md

sources-and-sinks.md

stream-operators.md

windowing.md

tile.json