CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-file-sink-common

Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration

Configuration classes provide options for customizing file naming patterns, writer properties, and behavior of the file sink components.

Capabilities

OutputFileConfig

Configuration class for customizing part file naming patterns.

/**
 * Part file name configuration
 * Allows defining a prefix and suffix to the part file name
 */
public class OutputFileConfig implements Serializable {
    /**
     * Initiates the OutputFileConfig with values passed as parameters
     * @param partPrefix the beginning of part file name
     * @param partSuffix the ending of part file name
     */
    public OutputFileConfig(String partPrefix, String partSuffix);
    
    /** 
     * The prefix for the part name
     * @return the part file prefix
     */
    public String getPartPrefix();
    
    /** 
     * The suffix for the part name  
     * @return the part file suffix
     */
    public String getPartSuffix();
    
    /**
     * Creates a builder to create the part file configuration
     * @return new OutputFileConfigBuilder instance
     */
    public static OutputFileConfigBuilder builder();
}

OutputFileConfigBuilder

Builder for creating OutputFileConfig instances with fluent API.

/**
 * Builder to create the part file configuration
 */
public static class OutputFileConfigBuilder {
    /** Default part prefix: "part" */
    private static final String DEFAULT_PART_PREFIX = "part";
    
    /** Default part suffix: "" (empty string) */
    private static final String DEFAULT_PART_SUFFIX = "";
    
    /**
     * Sets the prefix for part files
     * @param prefix the desired prefix
     * @return this builder instance
     */
    public OutputFileConfigBuilder withPartPrefix(String prefix);
    
    /**
     * Sets the suffix for part files
     * @param suffix the desired suffix
     * @return this builder instance
     */
    public OutputFileConfigBuilder withPartSuffix(String suffix);
    
    /**
     * Creates the OutputFileConfig instance
     * @return configured OutputFileConfig
     */
    public OutputFileConfig build();
}

Usage Examples:

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;

// Default configuration: prefix="part", suffix=""
OutputFileConfig defaultConfig = OutputFileConfig.builder().build();
// Generates files like: part-0, part-1, part-2

// Custom prefix and suffix
OutputFileConfig customConfig = OutputFileConfig.builder()
    .withPartPrefix("data")
    .withPartSuffix(".txt")
    .build();
// Generates files like: data-0.txt, data-1.txt, data-2.txt

// JSON files with timestamp prefix
OutputFileConfig jsonConfig = OutputFileConfig.builder()
    .withPartPrefix("events-" + System.currentTimeMillis())
    .withPartSuffix(".json")
    .build();
// Generates files like: events-1640995200000-0.json

// Log files with descriptive naming
OutputFileConfig logConfig = OutputFileConfig.builder()
    .withPartPrefix("application-logs")
    .withPartSuffix(".log")
    .build();
// Generates files like: application-logs-0.log

WriterProperties

Configuration class describing the properties and capabilities of a BucketWriter.

/**
 * Class describing the property of the BucketWriter
 */
public class WriterProperties {
    /**
     * Creates WriterProperties with serializers and resume support flag
     * @param inProgressFileRecoverableSerializer serializer for in-progress file recoverables
     * @param pendingFileRecoverableSerializer serializer for pending file recoverables  
     * @param supportsResume whether the BucketWriter supports appending data to restored files
     */
    public WriterProperties(
        SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
        SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
        boolean supportsResume);
    
    /**
     * @return Whether the BucketWriter supports appending data to the restored in-progress file
     */
    public boolean supportsResume();
    
    /**
     * @return the serializer for the PendingFileRecoverable
     */
    public SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> 
        getPendingFileRecoverableSerializer();
    
    /**
     * @return the serializer for the InProgressFileRecoverable
     */
    public SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> 
        getInProgressFileRecoverableSerializer();
}

Usage Example:

import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
import org.apache.flink.core.io.SimpleVersionedSerializer;

// Example of creating WriterProperties for a custom BucketWriter
public class MyBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
    private final WriterProperties properties;
    
    public MyBucketWriter() {
        // Define serializers for recovery state
        SimpleVersionedSerializer<InProgressFileRecoverable> inProgressSerializer = // ... implementation
        SimpleVersionedSerializer<PendingFileRecoverable> pendingSerializer = // ... implementation
        
        // Configure properties
        this.properties = new WriterProperties(
            inProgressSerializer,
            pendingSerializer,
            true  // This writer supports resume operations
        );
    }
    
    @Override
    public WriterProperties getProperties() {
        return properties;
    }
    
    // ... other BucketWriter methods
}

Configuration Patterns

File Naming Strategies

// Time-based naming
OutputFileConfig timeBasedConfig = OutputFileConfig.builder()
    .withPartPrefix("data-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss")))
    .withPartSuffix(".avro")
    .build();

// Environment-specific naming
String environment = System.getProperty("app.environment", "dev");
OutputFileConfig envConfig = OutputFileConfig.builder()
    .withPartPrefix(environment + "-events")
    .withPartSuffix(".parquet")
    .build();

// Content-type specific naming
OutputFileConfig csvConfig = OutputFileConfig.builder()
    .withPartPrefix("export")
    .withPartSuffix(".csv")
    .build();

Writer Properties Configuration

// High-performance writer with resume support
WriterProperties highPerfProperties = new WriterProperties(
    customInProgressSerializer,
    customPendingSerializer,
    true  // Supports resume for fault tolerance
);

// Simple writer without resume capability
WriterProperties simpleProperties = new WriterProperties(
    basicInProgressSerializer,
    basicPendingSerializer,
    false  // No resume support - simpler but less fault tolerant
);

Integration with File Sink

These configuration classes are typically used when setting up file sinks:

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

// Complete file sink configuration
StreamingFileSink<String> sink = StreamingFileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
    .withRollingPolicy(DefaultRollingPolicy.<String, String>builder()
        .withRolloverInterval(Duration.ofMinutes(15))
        .withInactivityInterval(Duration.ofMinutes(5))
        .build())
    .withOutputFileConfig(OutputFileConfig.builder()
        .withPartPrefix("events")
        .withPartSuffix(".txt")
        .build())
    .build();

Error Handling

  • Invalid prefix or suffix strings may cause file system errors
  • Null values for prefix or suffix are not allowed (checked by Preconditions)
  • Resume capability mismatches between properties and actual implementation will cause runtime failures
  • Serializer failures during recovery will cause job restart
  • File naming conflicts may occur if multiple sinks use the same configuration

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common

docs

bucket-assignment.md

configuration.md

file-writers.md

index.md

rolling-policies.md

tile.json