Flink filesystem connector providing fault-tolerant rolling file sinks for streaming data to HDFS and Hadoop-compatible filesystems
—
The filesystem connector provides two main sink implementations for writing streaming data to Hadoop-compatible filesystems with fault tolerance and exactly-once semantics.
Modern sink implementation that can write to multiple buckets concurrently, providing better performance and more flexible bucketing strategies.
public BucketingSink(String basePath)Creates a new BucketingSink with the specified base path where bucket directories will be created.
Parameters:
basePath - Base directory path where all buckets will be createdpublic BucketingSink<T> setBatchSize(long batchSize)Sets the maximum size for part files before rolling to a new file (default: 384MB).
Parameters:
batchSize - Maximum file size in bytes before rollingReturns: The BucketingSink instance for method chaining
public BucketingSink<T> setBucketer(Bucketer<T> bucketer)Sets the bucketing strategy for organizing files into directories.
Parameters:
bucketer - Bucketing strategy implementationReturns: The BucketingSink instance for method chaining
public BucketingSink<T> setWriter(Writer<T> writer)Sets the writer implementation for handling file I/O.
Parameters:
writer - Writer implementation for the specific data formatReturns: The BucketingSink instance for method chaining
public BucketingSink<T> setPartPrefix(String partPrefix)Sets the prefix for part file names (default: "part").
Parameters:
partPrefix - Prefix string for part filesReturns: The BucketingSink instance for method chaining
public BucketingSink<T> setInactiveBucketCheckInterval(long interval)Sets the interval for checking inactive buckets (default: 60000ms).
Parameters:
interval - Check interval in millisecondsReturns: The BucketingSink instance for method chaining
public BucketingSink<T> setInactiveBucketThreshold(long threshold)Sets the threshold for considering buckets inactive (default: 60000ms).
Parameters:
threshold - Inactivity threshold in millisecondsReturns: The BucketingSink instance for method chaining
public BucketingSink<T> setInProgressSuffix(String inProgressSuffix)Sets suffix for files currently being written to.
public BucketingSink<T> setInProgressPrefix(String inProgressPrefix)Sets prefix for files currently being written to.
public BucketingSink<T> setPendingSuffix(String pendingSuffix)Sets suffix for files waiting for checkpoint confirmation.
public BucketingSink<T> setPendingPrefix(String pendingPrefix)Sets prefix for files waiting for checkpoint confirmation.
public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix)Sets suffix for valid-length tracking files.
public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix)Sets prefix for valid-length tracking files.
public BucketingSink<T> setFSConfig(org.apache.flink.configuration.Configuration config)Sets Flink configuration for the filesystem.
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config)Sets Hadoop configuration for the filesystem.
public BucketingSink<T> setAsyncTimeout(long timeout)Sets timeout for asynchronous operations.
Parameters:
timeout - Timeout in millisecondsReturns: The BucketingSink instance for method chaining
Legacy sink implementation that maintains a single active bucket at a time.
Note: RollingSink is deprecated. Use BucketingSink for new applications.
@Deprecated
public RollingSink(String basePath)Creates a new RollingSink with the specified base path.
The RollingSink provides similar configuration methods to BucketingSink but with different bucketing behavior:
@Deprecated
public RollingSink<T> setBatchSize(long batchSize)
@Deprecated
public RollingSink<T> setBucketer(org.apache.flink.streaming.connectors.fs.Bucketer bucketer)
@Deprecated
public RollingSink<T> setWriter(Writer<T> writer)import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(new StringWriter<String>())
.setBatchSize(1024 * 1024 * 64); // 64MB files
dataStream.addSink(sink);import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd/HH"))
.setWriter(new StringWriter<String>());
dataStream.addSink(sink);BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setPartPrefix("data")
.setInProgressSuffix(".tmp")
.setPendingSuffix(".pending")
.setWriter(new StringWriter<String>());
dataStream.addSink(sink);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-filesystem-2-10