CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-filesystem-2-10

Flink filesystem connector providing fault-tolerant rolling file sinks for streaming data to HDFS and Hadoop-compatible filesystems

Pending
Overview
Eval results
Files

writers.mddocs/

File Writers

Writers handle the actual file I/O operations for different data formats. The filesystem connector provides several built-in writer implementations and allows custom writers through the Writer interface.

Writer Interface

The base interface that all writers must implement.

public interface Writer<T> extends Serializable

Core Methods

void open(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException

Initializes the writer for a newly opened bucket file.

Parameters:

  • fs - The Hadoop FileSystem containing the file
  • path - The Path of the newly opened file
void write(T element) throws IOException

Writes one element to the bucket file.

Parameters:

  • element - The element to write
long flush() throws IOException

Flushes internally held data and returns the offset for file truncation during recovery.

Returns: The file offset that the file must be truncated to at recovery

long getPos() throws IOException

Retrieves the current position (size) of the output file.

Returns: Current file position in bytes

void close() throws IOException

Closes the writer and associated resources. Safe to call multiple times.

Writer<T> duplicate()

Creates a duplicate of this writer for parallel sink instances.

Returns: A new Writer instance

StringWriter

Writes elements as strings with newline separation using configurable character encoding.

Constructors

public StringWriter()

Creates a StringWriter using UTF-8 encoding.

public StringWriter(String charsetName)

Creates a StringWriter with the specified character encoding.

Parameters:

  • charsetName - Character set name (e.g., "UTF-8", "ISO-8859-1")

Usage Example

import org.apache.flink.streaming.connectors.fs.StringWriter;

// UTF-8 encoding (default)
StringWriter<String> writer = new StringWriter<>();

// Custom encoding
StringWriter<String> writer = new StringWriter<>("ISO-8859-1");

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(writer);

SequenceFileWriter

Writes Hadoop SequenceFiles for Tuple2 elements containing Hadoop Writable types.

Type Parameters

  • K extends Writable - Key type (must implement Hadoop Writable)
  • V extends Writable - Value type (must implement Hadoop Writable)

Constructors

public SequenceFileWriter()

Creates a SequenceFileWriter without compression.

public SequenceFileWriter(String compressionCodecName)

Creates a SequenceFileWriter with the specified compression codec.

Parameters:

  • compressionCodecName - Name of the compression codec (e.g., "org.apache.hadoop.io.compress.GzipCodec")
public SequenceFileWriter(String compressionCodecName, org.apache.hadoop.io.SequenceFile.CompressionType compressionType)

Creates a SequenceFileWriter with full compression control.

Parameters:

  • compressionCodecName - Name of the compression codec
  • compressionType - Type of compression (NONE, RECORD, BLOCK)

Usage Example

import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

// No compression
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>();

// With Gzip compression
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(
    "org.apache.hadoop.io.compress.GzipCodec"
);

// Full compression control
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(
    "org.apache.hadoop.io.compress.GzipCodec", 
    SequenceFile.CompressionType.BLOCK
);

BucketingSink<Tuple2<LongWritable, Text>> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(writer);

AvroKeyValueSinkWriter

Writes Avro key-value records for Tuple2 elements using specified Avro schemas.

Type Parameters

  • K - Key type
  • V - Value type

Constructors

public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema)

Creates an AvroKeyValueSinkWriter with the specified schemas.

Parameters:

  • keySchema - Avro schema for keys
  • valueSchema - Avro schema for values
public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory)

Creates an AvroKeyValueSinkWriter with compression.

Parameters:

  • keySchema - Avro schema for keys
  • valueSchema - Avro schema for values
  • codecFactory - Avro compression codec factory

Additional Configuration Methods

public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval)

Parameters:

  • syncInterval - Sync interval for Avro files
public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval, Map<String, String> metadata)

Parameters:

  • metadata - Metadata map for Avro files

Usage Example

import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.flink.api.java.tuple.Tuple2;

// Define Avro schemas
Schema keySchema = Schema.create(Schema.Type.LONG);
Schema valueSchema = Schema.create(Schema.Type.STRING);

// Basic writer
AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(keySchema, valueSchema);

// With compression
AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(
    keySchema, valueSchema, CodecFactory.snappyCodec()
);

BucketingSink<Tuple2<Long, String>> sink = new BucketingSink<>("/tmp/output");
sink.setWriter(writer);

StreamWriterBase

Abstract base class providing common functionality for writer implementations.

public abstract class StreamWriterBase<T> implements Writer<T>

This class provides default implementations for common writer operations and can be extended to create custom writers.

Custom Writer Implementation

import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

public class CustomWriter<T> extends StreamWriterBase<T> {
    private transient FSDataOutputStream outputStream;
    
    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);
        this.outputStream = fs.create(path);
    }
    
    @Override
    public void write(T element) throws IOException {
        // Custom write logic
        String data = processElement(element);
        outputStream.writeBytes(data);
    }
    
    @Override
    public Writer<T> duplicate() {
        return new CustomWriter<>();
    }
    
    private String processElement(T element) {
        // Custom processing logic
        return element.toString() + "\\n";
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-filesystem-2-10

docs

bucketers.md

index.md

sinks.md

utilities.md

writers.md

tile.json