Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems
—
The Hadoop FileSystem package provides high-performance I/O streams optimized for Flink's data processing requirements. These streams support ByteBuffer operations, advanced positioning, connection limiting, and efficient data transfer for both batch and streaming workloads.
High-performance input stream with ByteBuffer support and advanced positioning capabilities.
/**
* Concrete implementation of FSDataInputStream for Hadoop's input streams.
* Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
* Implements ByteBufferReadable for zero-copy operations.
*/
public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {
/**
* Minimum bytes to skip forward before seeking (performance optimization).
*/
public static final int MIN_SKIP_BYTES = 1024 * 1024;
/**
* Creates a HadoopDataInputStream wrapping a Hadoop FSDataInputStream.
* @param fsDataInputStream the Hadoop input stream to wrap
*/
public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream);
/**
* Gets the wrapped Hadoop input stream.
* @return the underlying Hadoop FSDataInputStream
*/
public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream();
}Advanced positioning and seeking capabilities for random access I/O.
/**
* Seeks to the specified position in the stream.
* @param seekPos position to seek to
* @throws IOException if seek operation fails
*/
public void seek(long seekPos) throws IOException;
/**
* Forces a seek operation, bypassing optimization heuristics.
* @param seekPos position to seek to
* @throws IOException if seek operation fails
*/
public void forceSeek(long seekPos) throws IOException;
/**
* Gets the current position in the stream.
* @return current byte position
* @throws IOException if operation fails
*/
public long getPos() throws IOException;
/**
* Skips exactly the specified number of bytes.
* @param bytes number of bytes to skip
* @throws IOException if skip operation fails or reaches EOF
*/
public void skipFully(long bytes) throws IOException;
/**
* Skips up to the specified number of bytes.
* @param n maximum number of bytes to skip
* @return actual number of bytes skipped
* @throws IOException if operation fails
*/
public long skip(long n) throws IOException;Usage Examples:
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
// Open file and seek to specific position
HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/large_file.dat"));
// Seek to 1MB position
inputStream.seek(1024 * 1024);
int byteAtPosition = inputStream.read();
// Get current position
long currentPos = inputStream.getPos();
System.out.println("Current position: " + currentPos);
// Skip forward 100 bytes
inputStream.skipFully(100);
// Force seek (bypasses optimization)
inputStream.forceSeek(2048 * 1024); // 2MB position
inputStream.close();Zero-copy operations using ByteBuffer for high-performance data transfer.
/**
* Reads data into a ByteBuffer from current position.
* @param byteBuffer buffer to read data into
* @return number of bytes read, or -1 if end of stream
* @throws IOException if read operation fails
*/
public int read(ByteBuffer byteBuffer) throws IOException;
/**
* Reads data into a ByteBuffer from specified position without changing stream position.
* @param position absolute position to read from
* @param byteBuffer buffer to read data into
* @return number of bytes read, or -1 if end of stream
* @throws IOException if read operation fails
*/
public int read(long position, ByteBuffer byteBuffer) throws IOException;Usage Examples:
import java.nio.ByteBuffer;
HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/binary_data.bin"));
// Allocate ByteBuffer for zero-copy operations
ByteBuffer buffer = ByteBuffer.allocateDirect(8192); // 8KB direct buffer
// Read data into ByteBuffer
int bytesRead = inputStream.read(buffer);
while (bytesRead != -1) {
buffer.flip(); // Prepare for reading
// Process data from buffer
while (buffer.hasRemaining()) {
byte b = buffer.get();
// Process byte
}
buffer.clear(); // Prepare for next read
bytesRead = inputStream.read(buffer);
}
// Positional read without changing stream position
ByteBuffer posBuffer = ByteBuffer.allocate(1024);
long savedPosition = inputStream.getPos();
int posRead = inputStream.read(1024 * 1024, posBuffer); // Read from 1MB position
// Stream position remains at savedPosition
inputStream.close();Traditional byte array and single byte reading operations.
/**
* Reads a single byte.
* @return byte value (0-255) or -1 if end of stream
* @throws IOException if read operation fails
*/
public int read() throws IOException;
/**
* Reads data into a byte array.
* @param buffer byte array to read into
* @param offset starting offset in the buffer
* @param length maximum number of bytes to read
* @return number of bytes read, or -1 if end of stream
* @throws IOException if read operation fails
*/
public int read(byte[] buffer, int offset, int length) throws IOException;
/**
* Returns the number of bytes available for reading without blocking.
* @return estimated number of available bytes
* @throws IOException if operation fails
*/
public int available() throws IOException;
/**
* Closes the input stream and releases resources.
* @throws IOException if close operation fails
*/
public void close() throws IOException;High-performance output stream with positioning and synchronization capabilities.
/**
* Concrete implementation of FSDataOutputStream for Hadoop's output streams.
* Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
*/
public class HadoopDataOutputStream extends FSDataOutputStream {
/**
* Creates a HadoopDataOutputStream wrapping a Hadoop FSDataOutputStream.
* @param fdos the Hadoop output stream to wrap
*/
public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos);
/**
* Gets the wrapped Hadoop output stream.
* @return the underlying Hadoop FSDataOutputStream
*/
public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream();
}Writing and positioning operations for output streams.
/**
* Writes a single byte.
* @param b byte value to write
* @throws IOException if write operation fails
*/
public void write(int b) throws IOException;
/**
* Writes data from a byte array.
* @param b byte array containing data
* @param off starting offset in the array
* @param len number of bytes to write
* @throws IOException if write operation fails
*/
public void write(byte[] b, int off, int len) throws IOException;
/**
* Gets the current position in the output stream.
* @return current byte position
* @throws IOException if operation fails
*/
public long getPos() throws IOException;
/**
* Flushes any buffered data to the underlying stream.
* @throws IOException if flush operation fails
*/
public void flush() throws IOException;
/**
* Synchronizes data to stable storage (fsync equivalent).
* @throws IOException if sync operation fails
*/
public void sync() throws IOException;
/**
* Closes the output stream and releases resources.
* @throws IOException if close operation fails
*/
public void close() throws IOException;Usage Examples:
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
// Create output stream
HadoopDataOutputStream outputStream = fs.create(
new Path("hdfs://namenode:9000/data/output.dat"),
WriteMode.OVERWRITE
);
// Write single bytes
outputStream.write(65); // Write 'A'
outputStream.write(66); // Write 'B'
// Write byte arrays
byte[] data = "Hello, Hadoop!".getBytes();
outputStream.write(data, 0, data.length);
// Get current position
long position = outputStream.getPos();
System.out.println("Written " + position + " bytes");
// Flush to ensure data is written
outputStream.flush();
// Sync to stable storage (important for durability)
outputStream.sync();
// Close stream
outputStream.close();The streams include several performance optimizations:
// Smart seeking - only performs actual seek if distance is significant
HadoopDataInputStream inputStream = fs.open(filePath);
inputStream.seek(100); // Small skip, may use skip() instead of seek()
inputStream.seek(2 * 1024 * 1024); // Large skip, will use seek()
// Direct ByteBuffer operations avoid memory copies
ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
int read = inputStream.read(directBuffer);
// Connection limiting prevents resource exhaustion
// (configured at factory level)
Configuration config = new Configuration();
config.setInteger("fs.hdfs.limit.input", 50); // Max 50 input streams
config.setInteger("fs.hdfs.limit.output", 30); // Max 30 output streamsProper error handling and resource cleanup patterns:
HadoopDataInputStream inputStream = null;
try {
inputStream = fs.open(filePath);
// Read operations
ByteBuffer buffer = ByteBuffer.allocate(8192);
int bytesRead = inputStream.read(buffer);
// Process data...
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
System.err.println("Error closing stream: " + e.getMessage());
}
}
}
// Try-with-resources pattern (recommended)
try (HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE)) {
outputStream.write("Data".getBytes());
outputStream.sync();
// Stream automatically closed
} catch (IOException e) {
System.err.println("Write error: " + e.getMessage());
}// Base stream interfaces
public abstract class FSDataInputStream extends DataInputStream {
public abstract void seek(long seekPos) throws IOException;
public abstract long getPos() throws IOException;
}
public abstract class FSDataOutputStream extends DataOutputStream {
public abstract long getPos() throws IOException;
public abstract void sync() throws IOException;
}
// ByteBuffer operations interface
public interface ByteBufferReadable {
int read(ByteBuffer byteBuffer) throws IOException;
}
// Write modes
public enum WriteMode {
NO_OVERWRITE,
OVERWRITE
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs