Flink filesystem connector providing fault-tolerant rolling file sinks for streaming data to HDFS and Hadoop-compatible filesystems
—
Bucketers determine how streaming data is organized into directory structures within the base path. They control when new bucket directories are created and how elements are routed to appropriate buckets.
The modern bucketing interface used with BucketingSink.
public interface Bucketer<T> extends SerializablePath getBucketPath(org.apache.flink.streaming.connectors.fs.Clock clock, org.apache.hadoop.fs.Path basePath, T element)Returns the complete bucket path for the provided element.
Parameters:
clock - Clock implementation for getting current timebasePath - Base directory containing all bucketselement - Current element being processedReturns: Complete Path where the element should be written, including basePath and subtask index
Creates buckets based on date and time patterns, organizing files into time-based directory structures.
public DateTimeBucketer()Creates a DateTimeBucketer with default format "yyyy-MM-dd--HH" (hourly buckets).
public DateTimeBucketer(String formatString)Creates a DateTimeBucketer with custom date format pattern.
Parameters:
formatString - Java SimpleDateFormat pattern stringimport org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
// Hourly buckets (default): /basePath/2023-12-25--14/
DateTimeBucketer<String> hourly = new DateTimeBucketer<>();
// Daily buckets: /basePath/2023-12-25/
DateTimeBucketer<String> daily = new DateTimeBucketer<>("yyyy-MM-dd");
// Minute-level buckets: /basePath/2023/12/25/14/30/
DateTimeBucketer<String> minutely = new DateTimeBucketer<>("yyyy/MM/dd/HH/mm");
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setBucketer(hourly);| Pattern | Example Output | Description |
|---|---|---|
yyyy-MM-dd--HH | 2023-12-25--14 | Default: hourly buckets |
yyyy-MM-dd | 2023-12-25 | Daily buckets |
yyyy/MM/dd/HH | 2023/12/25/14 | Hierarchical hourly |
yyyy-MM-dd/HH/mm | 2023-12-25/14/30 | Minute-level buckets |
yyyy/MM | 2023/12 | Monthly buckets |
'year='yyyy'/month='MM'/day='dd | year=2023/month=12/day=25 | Hive-style partitioning |
Uses the base path as the bucket directory without creating subdirectories. All files are written directly to the base path.
public BasePathBucketer()Creates a BasePathBucketer that writes all files to the base directory.
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
// All files written directly to /tmp/output/
BasePathBucketer<String> bucketer = new BasePathBucketer<>();
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setBucketer(bucketer);The original bucketing interface used with RollingSink.
@Deprecated
public interface Bucketer extends Serializable@Deprecated
boolean shouldStartNewBucket(org.apache.hadoop.fs.Path basePath, org.apache.hadoop.fs.Path currentBucketPath)Determines if a new bucket should be started.
@Deprecated
org.apache.hadoop.fs.Path getNextBucketPath(org.apache.hadoop.fs.Path basePath)Returns the path for the next bucket.
@Deprecated
public class DateTimeBucketer implements BucketerLegacy time-based bucketing for RollingSink.
@Deprecated
public class NonRollingBucketer implements BucketerLegacy single-bucket strategy for RollingSink.
You can create custom bucketing strategies by implementing the Bucketer interface:
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.hadoop.fs.Path;
public class CustomBucketer<T> implements Bucketer<T> {
@Override
public Path getBucketPath(Clock clock, Path basePath, T element) {
// Custom bucketing logic based on element properties
String bucketName = determineBucketName(element);
return new Path(basePath, bucketName);
}
private String determineBucketName(T element) {
// Example: bucket by string length
if (element instanceof String) {
String str = (String) element;
return "length-" + str.length();
}
return "default";
}
}// High-volume streams: Use larger time windows
DateTimeBucketer<String> hourly = new DateTimeBucketer<>("yyyy-MM-dd--HH");
// Medium-volume streams: Smaller windows acceptable
DateTimeBucketer<String> tenMinute = new DateTimeBucketer<>("yyyy-MM-dd--HH-mm");
// Low-volume streams: May use very granular bucketing
DateTimeBucketer<String> perMinute = new DateTimeBucketer<>("yyyy/MM/dd/HH/mm");// Large buckets with smaller batch sizes for faster file rotation
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd")) // Daily buckets
.setBatchSize(64 * 1024 * 1024); // 64MB files// Create Hive-compatible partition structure
DateTimeBucketer<String> hiveBucketer = new DateTimeBucketer<>(
"'year='yyyy'/month='MM'/day='dd'/hour='HH"
);
// Results in: /basePath/year=2023/month=12/day=25/hour=14/
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setBucketer(hiveBucketer);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-filesystem-2-10