CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-fs

Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems

Pending
Overview
Eval results
Files

io-streams.mddocs/

I/O Streams

The Hadoop FileSystem package provides high-performance I/O streams optimized for Flink's data processing requirements. These streams support ByteBuffer operations, advanced positioning, connection limiting, and efficient data transfer for both batch and streaming workloads.

Capabilities

HadoopDataInputStream

High-performance input stream with ByteBuffer support and advanced positioning capabilities.

/**
 * Concrete implementation of FSDataInputStream for Hadoop's input streams.
 * Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
 * Implements ByteBufferReadable for zero-copy operations.
 */
public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {
    /**
     * Minimum bytes to skip forward before seeking (performance optimization).
     */
    public static final int MIN_SKIP_BYTES = 1024 * 1024;
    
    /**
     * Creates a HadoopDataInputStream wrapping a Hadoop FSDataInputStream.
     * @param fsDataInputStream the Hadoop input stream to wrap
     */
    public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream);
    
    /**
     * Gets the wrapped Hadoop input stream.
     * @return the underlying Hadoop FSDataInputStream
     */
    public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream();
}

Positioning Operations

Advanced positioning and seeking capabilities for random access I/O.

/**
 * Seeks to the specified position in the stream.
 * @param seekPos position to seek to
 * @throws IOException if seek operation fails
 */
public void seek(long seekPos) throws IOException;

/**
 * Forces a seek operation, bypassing optimization heuristics.
 * @param seekPos position to seek to
 * @throws IOException if seek operation fails
 */
public void forceSeek(long seekPos) throws IOException;

/**
 * Gets the current position in the stream.
 * @return current byte position
 * @throws IOException if operation fails
 */
public long getPos() throws IOException;

/**
 * Skips exactly the specified number of bytes.
 * @param bytes number of bytes to skip
 * @throws IOException if skip operation fails or reaches EOF
 */
public void skipFully(long bytes) throws IOException;

/**
 * Skips up to the specified number of bytes.
 * @param n maximum number of bytes to skip
 * @return actual number of bytes skipped
 * @throws IOException if operation fails
 */
public long skip(long n) throws IOException;

Usage Examples:

import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;

// Open file and seek to specific position
HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/large_file.dat"));

// Seek to 1MB position
inputStream.seek(1024 * 1024);
int byteAtPosition = inputStream.read();

// Get current position
long currentPos = inputStream.getPos();
System.out.println("Current position: " + currentPos);

// Skip forward 100 bytes
inputStream.skipFully(100);

// Force seek (bypasses optimization)
inputStream.forceSeek(2048 * 1024); // 2MB position

inputStream.close();

ByteBuffer Operations

Zero-copy operations using ByteBuffer for high-performance data transfer.

/**
 * Reads data into a ByteBuffer from current position.
 * @param byteBuffer buffer to read data into
 * @return number of bytes read, or -1 if end of stream
 * @throws IOException if read operation fails
 */
public int read(ByteBuffer byteBuffer) throws IOException;

/**
 * Reads data into a ByteBuffer from specified position without changing stream position.
 * @param position absolute position to read from
 * @param byteBuffer buffer to read data into
 * @return number of bytes read, or -1 if end of stream
 * @throws IOException if read operation fails
 */
public int read(long position, ByteBuffer byteBuffer) throws IOException;

Usage Examples:

import java.nio.ByteBuffer;

HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/binary_data.bin"));

// Allocate ByteBuffer for zero-copy operations
ByteBuffer buffer = ByteBuffer.allocateDirect(8192); // 8KB direct buffer

// Read data into ByteBuffer
int bytesRead = inputStream.read(buffer);
while (bytesRead != -1) {
    buffer.flip(); // Prepare for reading
    
    // Process data from buffer
    while (buffer.hasRemaining()) {
        byte b = buffer.get();
        // Process byte
    }
    
    buffer.clear(); // Prepare for next read
    bytesRead = inputStream.read(buffer);
}

// Positional read without changing stream position
ByteBuffer posBuffer = ByteBuffer.allocate(1024);
long savedPosition = inputStream.getPos();
int posRead = inputStream.read(1024 * 1024, posBuffer); // Read from 1MB position
// Stream position remains at savedPosition

inputStream.close();

Standard I/O Operations

Traditional byte array and single byte reading operations.

/**
 * Reads a single byte.
 * @return byte value (0-255) or -1 if end of stream
 * @throws IOException if read operation fails
 */
public int read() throws IOException;

/**
 * Reads data into a byte array.
 * @param buffer byte array to read into
 * @param offset starting offset in the buffer
 * @param length maximum number of bytes to read
 * @return number of bytes read, or -1 if end of stream
 * @throws IOException if read operation fails
 */
public int read(byte[] buffer, int offset, int length) throws IOException;

/**
 * Returns the number of bytes available for reading without blocking.
 * @return estimated number of available bytes
 * @throws IOException if operation fails
 */
public int available() throws IOException;

/**
 * Closes the input stream and releases resources.
 * @throws IOException if close operation fails
 */
public void close() throws IOException;

HadoopDataOutputStream

High-performance output stream with positioning and synchronization capabilities.

/**
 * Concrete implementation of FSDataOutputStream for Hadoop's output streams.
 * Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
 */
public class HadoopDataOutputStream extends FSDataOutputStream {
    /**
     * Creates a HadoopDataOutputStream wrapping a Hadoop FSDataOutputStream.
     * @param fdos the Hadoop output stream to wrap
     */
    public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos);
    
    /**
     * Gets the wrapped Hadoop output stream.
     * @return the underlying Hadoop FSDataOutputStream
     */
    public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream();
}

Output Stream Operations

Writing and positioning operations for output streams.

/**
 * Writes a single byte.
 * @param b byte value to write
 * @throws IOException if write operation fails
 */
public void write(int b) throws IOException;

/**
 * Writes data from a byte array.
 * @param b byte array containing data
 * @param off starting offset in the array
 * @param len number of bytes to write
 * @throws IOException if write operation fails
 */
public void write(byte[] b, int off, int len) throws IOException;

/**
 * Gets the current position in the output stream.
 * @return current byte position
 * @throws IOException if operation fails
 */
public long getPos() throws IOException;

/**
 * Flushes any buffered data to the underlying stream.
 * @throws IOException if flush operation fails
 */
public void flush() throws IOException;

/**
 * Synchronizes data to stable storage (fsync equivalent).
 * @throws IOException if sync operation fails
 */
public void sync() throws IOException;

/**
 * Closes the output stream and releases resources.
 * @throws IOException if close operation fails
 */
public void close() throws IOException;

Usage Examples:

import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;

// Create output stream
HadoopDataOutputStream outputStream = fs.create(
    new Path("hdfs://namenode:9000/data/output.dat"), 
    WriteMode.OVERWRITE
);

// Write single bytes
outputStream.write(65); // Write 'A'
outputStream.write(66); // Write 'B'

// Write byte arrays
byte[] data = "Hello, Hadoop!".getBytes();
outputStream.write(data, 0, data.length);

// Get current position
long position = outputStream.getPos();
System.out.println("Written " + position + " bytes");

// Flush to ensure data is written
outputStream.flush();

// Sync to stable storage (important for durability)
outputStream.sync();

// Close stream
outputStream.close();

Performance Optimizations

The streams include several performance optimizations:

// Smart seeking - only performs actual seek if distance is significant
HadoopDataInputStream inputStream = fs.open(filePath);
inputStream.seek(100); // Small skip, may use skip() instead of seek()
inputStream.seek(2 * 1024 * 1024); // Large skip, will use seek()

// Direct ByteBuffer operations avoid memory copies
ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
int read = inputStream.read(directBuffer);

// Connection limiting prevents resource exhaustion
// (configured at factory level)
Configuration config = new Configuration();
config.setInteger("fs.hdfs.limit.input", 50);  // Max 50 input streams
config.setInteger("fs.hdfs.limit.output", 30); // Max 30 output streams

Error Handling and Resource Management

Proper error handling and resource cleanup patterns:

HadoopDataInputStream inputStream = null;
try {
    inputStream = fs.open(filePath);
    
    // Read operations
    ByteBuffer buffer = ByteBuffer.allocate(8192);
    int bytesRead = inputStream.read(buffer);
    
    // Process data...
    
} catch (IOException e) {
    System.err.println("I/O error: " + e.getMessage());
} finally {
    if (inputStream != null) {
        try {
            inputStream.close();
        } catch (IOException e) {
            System.err.println("Error closing stream: " + e.getMessage());
        }
    }
}

// Try-with-resources pattern (recommended)
try (HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE)) {
    outputStream.write("Data".getBytes());
    outputStream.sync();
    // Stream automatically closed
} catch (IOException e) {
    System.err.println("Write error: " + e.getMessage());
}

Types

// Base stream interfaces
public abstract class FSDataInputStream extends DataInputStream {
    public abstract void seek(long seekPos) throws IOException;
    public abstract long getPos() throws IOException;
}

public abstract class FSDataOutputStream extends DataOutputStream {
    public abstract long getPos() throws IOException;
    public abstract void sync() throws IOException;
}

// ByteBuffer operations interface
public interface ByteBufferReadable {
    int read(ByteBuffer byteBuffer) throws IOException;
}

// Write modes
public enum WriteMode {
    NO_OVERWRITE,
    OVERWRITE
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs

docs

filesystem-factory.md

filesystem-operations.md

hadoop-utilities.md

index.md

io-streams.md

recoverable-writers.md

tile.json