Flink filesystem connector providing fault-tolerant rolling file sinks for streaming data to HDFS and Hadoop-compatible filesystems
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-filesystem-2-10@1.3.0Apache Flink filesystem connector provides fault-tolerant rolling file sinks for streaming data to HDFS and other Hadoop-compatible filesystems. It offers exactly-once processing guarantees through integration with Flink's checkpointing mechanism and supports multiple file formats and bucketing strategies.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.10</artifactId>
<version>1.3.3</version>
</dependency>// Modern BucketingSink (recommended)
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
// Writers
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
// Legacy RollingSink (deprecated)
import org.apache.flink.streaming.connectors.fs.RollingSink;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
// Create a sink to write strings to HDFS
BucketingSink<String> sink = new BucketingSink<>("/tmp/flink-output");
sink.setWriter(new StringWriter<String>());
sink.setBatchSize(1024 * 1024 * 400); // 400 MB
// Add to streaming job
DataStream<String> textStream = //... your data stream
textStream.addSink(sink);The connector provides two main sink implementations:
Key components include:
Configure and use BucketingSink and RollingSink for fault-tolerant file writing with various batching and bucketing options.
// BucketingSink - modern implementation
public class BucketingSink<T> extends RichSinkFunction<T>
public BucketingSink(String basePath)
public BucketingSink<T> setBatchSize(long batchSize)
public BucketingSink<T> setBucketer(Bucketer<T> bucketer)
public BucketingSink<T> setWriter(Writer<T> writer)Different writer implementations for various file formats including text, Hadoop SequenceFiles, and Avro.
// Writer interface and implementations
public interface Writer<T> extends Serializable
public class StringWriter<T> extends StreamWriterBase<T>
public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>>
public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>Organize output files into buckets based on time, custom logic, or no bucketing at all.
// Bucketing interface and implementations
public interface Bucketer<T> extends Serializable
public class DateTimeBucketer<T> implements Bucketer<T>
public class BasePathBucketer<T> implements Bucketer<T>Supporting interfaces and classes including Clock implementations for time-based operations.
// Utility interfaces and implementations
public interface Clock
public class SystemClock implements Clock