Flink filesystem connector providing fault-tolerant rolling file sinks for streaming data to HDFS and Hadoop-compatible filesystems
—
Writers handle the actual file I/O operations for different data formats. The filesystem connector provides several built-in writer implementations and allows custom writers through the Writer interface.
The base interface that all writers must implement.
public interface Writer<T> extends Serializablevoid open(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOExceptionInitializes the writer for a newly opened bucket file.
Parameters:
fs - The Hadoop FileSystem containing the filepath - The Path of the newly opened filevoid write(T element) throws IOExceptionWrites one element to the bucket file.
Parameters:
element - The element to writelong flush() throws IOExceptionFlushes internally held data and returns the offset for file truncation during recovery.
Returns: The file offset that the file must be truncated to at recovery
long getPos() throws IOExceptionRetrieves the current position (size) of the output file.
Returns: Current file position in bytes
void close() throws IOExceptionCloses the writer and associated resources. Safe to call multiple times.
Writer<T> duplicate()Creates a duplicate of this writer for parallel sink instances.
Returns: A new Writer instance
Writes elements as strings with newline separation using configurable character encoding.
public StringWriter()Creates a StringWriter using UTF-8 encoding.
public StringWriter(String charsetName)Creates a StringWriter with the specified character encoding.
Parameters:
charsetName - Character set name (e.g., "UTF-8", "ISO-8859-1")import org.apache.flink.streaming.connectors.fs.StringWriter;
// UTF-8 encoding (default)
StringWriter<String> writer = new StringWriter<>();
// Custom encoding
StringWriter<String> writer = new StringWriter<>("ISO-8859-1");
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(writer);Writes Hadoop SequenceFiles for Tuple2 elements containing Hadoop Writable types.
K extends Writable - Key type (must implement Hadoop Writable)V extends Writable - Value type (must implement Hadoop Writable)public SequenceFileWriter()Creates a SequenceFileWriter without compression.
public SequenceFileWriter(String compressionCodecName)Creates a SequenceFileWriter with the specified compression codec.
Parameters:
compressionCodecName - Name of the compression codec (e.g., "org.apache.hadoop.io.compress.GzipCodec")public SequenceFileWriter(String compressionCodecName, org.apache.hadoop.io.SequenceFile.CompressionType compressionType)Creates a SequenceFileWriter with full compression control.
Parameters:
compressionCodecName - Name of the compression codeccompressionType - Type of compression (NONE, RECORD, BLOCK)import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
// No compression
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>();
// With Gzip compression
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(
"org.apache.hadoop.io.compress.GzipCodec"
);
// Full compression control
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(
"org.apache.hadoop.io.compress.GzipCodec",
SequenceFile.CompressionType.BLOCK
);
BucketingSink<Tuple2<LongWritable, Text>> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(writer);Writes Avro key-value records for Tuple2 elements using specified Avro schemas.
K - Key typeV - Value typepublic AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema)Creates an AvroKeyValueSinkWriter with the specified schemas.
Parameters:
keySchema - Avro schema for keysvalueSchema - Avro schema for valuespublic AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory)Creates an AvroKeyValueSinkWriter with compression.
Parameters:
keySchema - Avro schema for keysvalueSchema - Avro schema for valuescodecFactory - Avro compression codec factorypublic AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval)Parameters:
syncInterval - Sync interval for Avro filespublic AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval, Map<String, String> metadata)Parameters:
metadata - Metadata map for Avro filesimport org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.flink.api.java.tuple.Tuple2;
// Define Avro schemas
Schema keySchema = Schema.create(Schema.Type.LONG);
Schema valueSchema = Schema.create(Schema.Type.STRING);
// Basic writer
AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(keySchema, valueSchema);
// With compression
AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(
keySchema, valueSchema, CodecFactory.snappyCodec()
);
BucketingSink<Tuple2<Long, String>> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(writer);Abstract base class providing common functionality for writer implementations.
public abstract class StreamWriterBase<T> implements Writer<T>This class provides default implementations for common writer operations and can be extended to create custom writers.
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
public class CustomWriter<T> extends StreamWriterBase<T> {
private transient FSDataOutputStream outputStream;
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.outputStream = fs.create(path);
}
@Override
public void write(T element) throws IOException {
// Custom write logic
String data = processElement(element);
outputStream.writeBytes(data);
}
@Override
public Writer<T> duplicate() {
return new CustomWriter<>();
}
private String processElement(T element) {
// Custom processing logic
return element.toString() + "\\n";
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-filesystem-2-10