Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications
—
Bucket assignment determines how streaming data elements are organized into logical buckets (directories) in the file system. The package provides flexible bucket assignment strategies and supports custom implementations.
Core interface for implementing bucket assignment logic.
/**
* Interface for determining bucket assignment of streaming elements
* @param <IN> The type of input elements
* @param <BucketID> The type of bucket identifier (must have correct hashCode() and equals())
*/
public interface BucketAssigner<IN, BucketID> extends Serializable {
/**
* Returns the identifier of the bucket the provided element should be put into
* @param element The current element being processed
* @param context The context used by the current bucket assigner
* @return Bucket identifier for the element
*/
BucketID getBucketId(IN element, BucketAssigner.Context context);
/**
* Returns a serializer capable of serializing/deserializing bucket IDs
* @return SimpleVersionedSerializer for bucket IDs
*/
SimpleVersionedSerializer<BucketID> getSerializer();
}Provides contextual information for bucket assignment decisions.
/**
* Context that BucketAssigner can use for getting additional data about input records
* Context is only valid for the duration of a getBucketId() call
*/
public interface Context {
/** Returns the current processing time */
long currentProcessingTime();
/** Returns the current event-time watermark */
long currentWatermark();
/**
* Returns the timestamp of the current input record
* @return timestamp in milliseconds or null if element has no assigned timestamp
*/
Long timestamp();
}Built-in bucket assigner that creates buckets based on system time using date/time patterns.
/**
* BucketAssigner that assigns to buckets based on current system time
* Creates directories of the form: /{basePath}/{dateTimePath}/
* Uses DateTimeFormatter with configurable format string and timezone
*/
public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
/** Creates DateTimeBucketAssigner with default format "yyyy-MM-dd--HH" */
public DateTimeBucketAssigner();
/**
* Creates DateTimeBucketAssigner with custom date/time format string
* @param formatString Format string for DateTimeFormatter to determine bucket path
*/
public DateTimeBucketAssigner(String formatString);
/**
* Creates DateTimeBucketAssigner with default format using specified timezone
* @param zoneId The timezone for DateTimeFormatter
*/
public DateTimeBucketAssigner(ZoneId zoneId);
/**
* Creates DateTimeBucketAssigner with custom format and timezone
* @param formatString Format string for DateTimeFormatter
* @param zoneId The timezone for DateTimeFormatter
*/
public DateTimeBucketAssigner(String formatString, ZoneId zoneId);
@Override
public String getBucketId(IN element, BucketAssigner.Context context);
@Override
public SimpleVersionedSerializer<String> getSerializer();
}Usage Examples:
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import java.time.ZoneId;
// Default hourly bucketing: "yyyy-MM-dd--HH"
BucketAssigner<String, String> hourlyAssigner = new DateTimeBucketAssigner<>();
// Daily bucketing
BucketAssigner<String, String> dailyAssigner =
new DateTimeBucketAssigner<>("yyyy-MM-dd");
// Hourly bucketing with custom timezone
BucketAssigner<String, String> timezoneAssigner =
new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("America/New_York"));
// Minute-level bucketing
BucketAssigner<String, String> minuteAssigner =
new DateTimeBucketAssigner<>("yyyy-MM-dd/HH/mm");
// Example bucket paths generated:
// "2023-12-31--14" (hourly)
// "2023-12-31" (daily)
// "2023-12-31/14/30" (minute-level)Simple bucket assigner that writes all files to the base path without additional bucketing.
/**
* BucketAssigner that does not perform any bucketing of files
* All files are written to the base path
*/
public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {
@Override
public String getBucketId(T element, BucketAssigner.Context context);
@Override
public SimpleVersionedSerializer<String> getSerializer();
}Usage Example:
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
// No bucketing - all files in base directory
BucketAssigner<MyEvent, String> noBucketing = new BasePathBucketAssigner<>();Utility serializer for string-based bucket identifiers.
/**
* SimpleVersionedSerializer implementation for Strings
*/
public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer<String> {
/** Singleton instance */
public static final SimpleVersionedStringSerializer INSTANCE;
@Override
public int getVersion();
@Override
public byte[] serialize(String value);
@Override
public String deserialize(int version, byte[] serialized) throws IOException;
}You can implement custom bucket assignment logic by implementing the BucketAssigner interface:
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.core.io.SimpleVersionedSerializer;
public class CustomBucketAssigner implements BucketAssigner<MyEvent, String> {
@Override
public String getBucketId(MyEvent element, BucketAssigner.Context context) {
// Custom logic based on element properties
if (element.getPriority() == Priority.HIGH) {
return "high-priority/" + element.getCategory();
} else {
return "normal/" + element.getCategory();
}
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}getBucketId() should not return nullhashCode() and equals() methodstoString() of the bucket ID becomes part of the file pathInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common