Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications
—
Configuration classes provide options for customizing file naming patterns, writer properties, and behavior of the file sink components.
Configuration class for customizing part file naming patterns.
/**
* Part file name configuration
* Allows defining a prefix and suffix to the part file name
*/
public class OutputFileConfig implements Serializable {
/**
* Initiates the OutputFileConfig with values passed as parameters
* @param partPrefix the beginning of part file name
* @param partSuffix the ending of part file name
*/
public OutputFileConfig(String partPrefix, String partSuffix);
/**
* The prefix for the part name
* @return the part file prefix
*/
public String getPartPrefix();
/**
* The suffix for the part name
* @return the part file suffix
*/
public String getPartSuffix();
/**
* Creates a builder to create the part file configuration
* @return new OutputFileConfigBuilder instance
*/
public static OutputFileConfigBuilder builder();
}Builder for creating OutputFileConfig instances with fluent API.
/**
* Builder to create the part file configuration
*/
public static class OutputFileConfigBuilder {
/** Default part prefix: "part" */
private static final String DEFAULT_PART_PREFIX = "part";
/** Default part suffix: "" (empty string) */
private static final String DEFAULT_PART_SUFFIX = "";
/**
* Sets the prefix for part files
* @param prefix the desired prefix
* @return this builder instance
*/
public OutputFileConfigBuilder withPartPrefix(String prefix);
/**
* Sets the suffix for part files
* @param suffix the desired suffix
* @return this builder instance
*/
public OutputFileConfigBuilder withPartSuffix(String suffix);
/**
* Creates the OutputFileConfig instance
* @return configured OutputFileConfig
*/
public OutputFileConfig build();
}Usage Examples:
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
// Default configuration: prefix="part", suffix=""
OutputFileConfig defaultConfig = OutputFileConfig.builder().build();
// Generates files like: part-0, part-1, part-2
// Custom prefix and suffix
OutputFileConfig customConfig = OutputFileConfig.builder()
.withPartPrefix("data")
.withPartSuffix(".txt")
.build();
// Generates files like: data-0.txt, data-1.txt, data-2.txt
// JSON files with timestamp prefix
OutputFileConfig jsonConfig = OutputFileConfig.builder()
.withPartPrefix("events-" + System.currentTimeMillis())
.withPartSuffix(".json")
.build();
// Generates files like: events-1640995200000-0.json
// Log files with descriptive naming
OutputFileConfig logConfig = OutputFileConfig.builder()
.withPartPrefix("application-logs")
.withPartSuffix(".log")
.build();
// Generates files like: application-logs-0.logConfiguration class describing the properties and capabilities of a BucketWriter.
/**
* Class describing the property of the BucketWriter
*/
public class WriterProperties {
/**
* Creates WriterProperties with serializers and resume support flag
* @param inProgressFileRecoverableSerializer serializer for in-progress file recoverables
* @param pendingFileRecoverableSerializer serializer for pending file recoverables
* @param supportsResume whether the BucketWriter supports appending data to restored files
*/
public WriterProperties(
SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
boolean supportsResume);
/**
* @return Whether the BucketWriter supports appending data to the restored in-progress file
*/
public boolean supportsResume();
/**
* @return the serializer for the PendingFileRecoverable
*/
public SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>
getPendingFileRecoverableSerializer();
/**
* @return the serializer for the InProgressFileRecoverable
*/
public SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>
getInProgressFileRecoverableSerializer();
}Usage Example:
import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
import org.apache.flink.core.io.SimpleVersionedSerializer;
// Example of creating WriterProperties for a custom BucketWriter
public class MyBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
private final WriterProperties properties;
public MyBucketWriter() {
// Define serializers for recovery state
SimpleVersionedSerializer<InProgressFileRecoverable> inProgressSerializer = // ... implementation
SimpleVersionedSerializer<PendingFileRecoverable> pendingSerializer = // ... implementation
// Configure properties
this.properties = new WriterProperties(
inProgressSerializer,
pendingSerializer,
true // This writer supports resume operations
);
}
@Override
public WriterProperties getProperties() {
return properties;
}
// ... other BucketWriter methods
}// Time-based naming
OutputFileConfig timeBasedConfig = OutputFileConfig.builder()
.withPartPrefix("data-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss")))
.withPartSuffix(".avro")
.build();
// Environment-specific naming
String environment = System.getProperty("app.environment", "dev");
OutputFileConfig envConfig = OutputFileConfig.builder()
.withPartPrefix(environment + "-events")
.withPartSuffix(".parquet")
.build();
// Content-type specific naming
OutputFileConfig csvConfig = OutputFileConfig.builder()
.withPartPrefix("export")
.withPartSuffix(".csv")
.build();// High-performance writer with resume support
WriterProperties highPerfProperties = new WriterProperties(
customInProgressSerializer,
customPendingSerializer,
true // Supports resume for fault tolerance
);
// Simple writer without resume capability
WriterProperties simpleProperties = new WriterProperties(
basicInProgressSerializer,
basicPendingSerializer,
false // No resume support - simpler but less fault tolerant
);These configuration classes are typically used when setting up file sinks:
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
// Complete file sink configuration
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
.withRollingPolicy(DefaultRollingPolicy.<String, String>builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.build())
.withOutputFileConfig(OutputFileConfig.builder()
.withPartPrefix("events")
.withPartSuffix(".txt")
.build())
.build();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common