CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Pending
Overview
Eval results
Files

sources-sinks.mddocs/

Sources and Sinks

Sources and sinks are the entry and exit points for data in Apache Flink streaming applications. Sources ingest data from external systems, while sinks output processed results to external systems.

Capabilities

Built-in Sources

Pre-defined sources for common data ingestion patterns.

// Element-based sources
<T> DataStreamSource<T> fromElements(T... data);
<T> DataStreamSource<T> fromCollection(Collection<T> data);
<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);

// File-based sources
DataStreamSource<String> readTextFile(String filePath);
DataStreamSource<String> readTextFile(String filePath, String charsetName);
DataStreamSource<String> readFile(FileInputFormat<String> inputFormat, String filePath);

// Network sources
DataStreamSource<String> socketTextStream(String hostname, int port);
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry);

// Sequence sources
DataStreamSource<Long> generateSequence(long from, long to);
DataStreamSource<Long> fromSequence(long from, long to);

// Custom sources
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
<T> DataStreamSource<T> addSource(SourceFunction<T> function, String sourceName);
<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);

Usage Examples:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// From elements
DataStream<String> words = env.fromElements("hello", "world", "flink");

// From collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> numberStream = env.fromCollection(numbers);

// From file
DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");

// Socket stream
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// Sequence
DataStream<Long> sequence = env.generateSequence(1, 1000000);

// Custom source
DataStream<Event> eventStream = env.addSource(new CustomEventSource());

Custom Source Functions

Implement custom sources using SourceFunction interface.

/**
 * Interface for source functions
 */
interface SourceFunction<T> extends Function {
    /**
     * Main method to emit elements
     * @param ctx - source context for emitting elements
     */
    void run(SourceContext<T> ctx) throws Exception;
    
    /**
     * Cancel the source
     */
    void cancel();
    
    /**
     * Source context for emitting elements
     */
    interface SourceContext<T> {
        void collect(T element);
        void collectWithTimestamp(T element, long timestamp);
        void emitWatermark(Watermark mark);
        void markAsTemporarilyIdle();
        Object getCheckpointLock();
        void close();
    }
}

/**
 * Rich source function with lifecycle methods
 */
abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T> {
    // Inherits open(), close(), getRuntimeContext()
}

/**
 * Rich parallel source function for parallel sources
 */
abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T> {
    // Can run in parallel with multiple instances
}

Usage Examples:

// Simple custom source
class NumberSource implements SourceFunction<Integer> {
    private volatile boolean running = true;
    private int counter = 0;

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (running && counter < 1000) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(counter++);
            }
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

// Rich source with state
class StatefulSource extends RichSourceFunction<Event> {
    private ListState<Long> offsetState;
    private volatile boolean running = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor<Long> descriptor = 
            new ListStateDescriptor<>("offset", Long.class);
        offsetState = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        // Restore offset from state
        long offset = 0;
        for (Long o : offsetState.get()) {
            offset = o;
        }

        while (running) {
            // Emit event with timestamp
            Event event = fetchNextEvent(offset);
            ctx.collectWithTimestamp(event, event.getTimestamp());
            
            // Emit watermark
            ctx.emitWatermark(new Watermark(event.getTimestamp() - 5000));
            
            offset++;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        offsetState.clear();
        offsetState.add(currentOffset);
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Built-in Sinks

Pre-defined sinks for common output patterns.

// Console output
DataStreamSink<T> print();
DataStreamSink<T> print(String sinkIdentifier);
DataStreamSink<T> printToErr();
DataStreamSink<T> printToErr(String sinkIdentifier);

// File output
DataStreamSink<T> writeAsText(String path);
DataStreamSink<T> writeAsText(String path, WriteMode writeMode);
DataStreamSink<T> writeAsCsv(String path);
DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);
DataStreamSink<T> writeAsCsv(String path, String rowDelimiter, String fieldDelimiter);

// Socket output
DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema);

// Custom sinks
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction, String name);

Usage Examples:

DataStream<String> result = processedStream;

// Print to console
result.print();
result.print("MyOutput");

// Write to file
result.writeAsText("/path/to/output.txt");
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);

// Write CSV
tupleStream.writeAsCsv("/path/to/output.csv", "\n", ",");

// Socket output
result.writeToSocket("localhost", 9999, new SimpleStringSchema());

// Custom sink
result.addSink(new CustomSink());

Custom Sink Functions

Implement custom sinks using SinkFunction interface.

/**
 * Interface for sink functions
 */
interface SinkFunction<IN> extends Function {
    /**
     * Process each element
     * @param value - input element
     * @param context - sink context
     */
    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }
    
    /**
     * Simple invoke method (deprecated in favor of invoke with context)
     * @param value - input element
     */
    default void invoke(IN value) throws Exception {}
    
    /**
     * Sink context interface
     */
    interface Context {
        long currentProcessingTime();
        long currentWatermark();
        Long timestamp();
    }
}

/**
 * Rich sink function with lifecycle methods
 */
abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
    // Inherits open(), close(), getRuntimeContext()
}

Usage Examples:

// Simple custom sink
class DatabaseSink implements SinkFunction<Event> {
    private transient Connection connection;

    @Override
    public void invoke(Event event, Context context) throws Exception {
        if (connection == null) {
            connection = DriverManager.getConnection("jdbc:...");
        }
        
        PreparedStatement stmt = connection.prepareStatement(
            "INSERT INTO events (id, value, timestamp) VALUES (?, ?, ?)"
        );
        stmt.setString(1, event.getId());
        stmt.setString(2, event.getValue());
        stmt.setLong(3, event.getTimestamp());
        stmt.executeUpdate();
    }
}

// Rich sink with connection pooling
class PooledDatabaseSink extends RichSinkFunction<Event> {
    private transient ConnectionPool pool;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        pool = new ConnectionPool();
    }

    @Override
    public void invoke(Event event, Context context) throws Exception {
        try (Connection conn = pool.getConnection()) {
            // Insert logic
        }
    }

    @Override
    public void close() throws Exception {
        if (pool != null) {
            pool.close();
        }
        super.close();
    }
}

Streaming File Sink

Advanced file sink for streaming applications with exactly-once guarantees.

/**
 * File sink for streaming applications
 */
class StreamingFileSink<IN> implements SinkFunction<IN> {
    /**
     * Create row format builder for text-based files
     */
    static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat(
        Path basePath, 
        Encoder<IN> encoder
    );
    
    /**
     * Create bulk format builder for columnar formats (Parquet, ORC)
     */
    static <IN> StreamingFileSink.BulkFormatBuilder<IN, IN> forBulkFormat(
        Path basePath,
        BulkWriter.Factory<IN> writerFactory
    );
}

Usage Examples:

// Row format (text files)
StreamingFileSink<String> textSink = StreamingFileSink
    .forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
        .withMaxPartSize(1024 * 1024 * 1024)
        .build())
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
    .build();

dataStream.addSink(textSink);

// Bulk format (Parquet)
StreamingFileSink<Event> parquetSink = StreamingFileSink
    .forBulkFormat(
        new Path("/path/to/output"),
        ParquetAvroWriters.forReflectRecord(Event.class)
    )
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
    .build();

eventStream.addSink(parquetSink);

Types

Source Function Types

// Base source function
interface SourceFunction<T> extends Function {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
    
    interface SourceContext<T> {
        void collect(T element);
        void collectWithTimestamp(T element, long timestamp);
        void emitWatermark(Watermark mark);
        void markAsTemporarilyIdle();
        Object getCheckpointLock();
        void close();
    }
}

// Parallel source function
interface ParallelSourceFunction<T> extends SourceFunction<T> {
    // Marker interface for sources that can run in parallel
}

// Rich source functions
abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T>;
abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T>;

// Checkpointed source function
interface CheckpointedFunction {
    void snapshotState(FunctionSnapshotContext context) throws Exception;
    void initializeState(FunctionInitializationContext context) throws Exception;
}

Sink Function Types

// Base sink function
interface SinkFunction<IN> extends Function {
    default void invoke(IN value, Context context) throws Exception;
    default void invoke(IN value) throws Exception;
    
    interface Context {
        long currentProcessingTime();
        long currentWatermark();
        Long timestamp();
    }
}

// Rich sink function
abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>;

// Two-phase commit sink function for exactly-once semantics
abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN> 
    implements CheckpointedFunction, CheckpointListener {
    
    abstract TXN beginTransaction() throws Exception;
    abstract void preCommit(TXN transaction) throws Exception;
    abstract void commit(TXN transaction);
    abstract void abort(TXN transaction);
}

Utility Types

// Write modes
enum WriteMode {
    NO_OVERWRITE,  // Fail if file exists
    OVERWRITE      // Overwrite existing files
}

// Watermark
class Watermark implements Serializable {
    public Watermark(long timestamp);
    long getTimestamp();
}

// Encoders for StreamingFileSink
interface Encoder<IN> extends Serializable {
    void encode(IN element, OutputStream stream) throws IOException;
}

class SimpleStringEncoder<IN> implements Encoder<IN> {
    public SimpleStringEncoder();
    public SimpleStringEncoder(String charset);
}

// Bucket assigners
interface BucketAssigner<IN, BucketID> extends Serializable {
    BucketID getBucketId(IN element, Context context);
    
    interface Context {
        long currentProcessingTime();
        long currentWatermark();
        Long timestamp();
    }
}

class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
    public DateTimeBucketAssigner(String formatString);
    public DateTimeBucketAssigner(String formatString, ZoneId zoneId);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11

docs

async-io.md

checkpointing.md

datastream-transformations.md

execution-environment.md

index.md

keyed-streams-state.md

process-functions.md

sources-sinks.md

time-watermarks.md

windowing.md

tile.json