0
# I/O Streams
1
2
The Hadoop FileSystem package provides high-performance I/O streams optimized for Flink's data processing requirements. These streams support ByteBuffer operations, advanced positioning, connection limiting, and efficient data transfer for both batch and streaming workloads.
3
4
## Capabilities
5
6
### HadoopDataInputStream
7
8
High-performance input stream with ByteBuffer support and advanced positioning capabilities.
9
10
```java { .api }
11
/**
12
* Concrete implementation of FSDataInputStream for Hadoop's input streams.
13
* Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
14
* Implements ByteBufferReadable for zero-copy operations.
15
*/
16
public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {
17
/**
18
* Minimum bytes to skip forward before seeking (performance optimization).
19
*/
20
public static final int MIN_SKIP_BYTES = 1024 * 1024;
21
22
/**
23
* Creates a HadoopDataInputStream wrapping a Hadoop FSDataInputStream.
24
* @param fsDataInputStream the Hadoop input stream to wrap
25
*/
26
public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream);
27
28
/**
29
* Gets the wrapped Hadoop input stream.
30
* @return the underlying Hadoop FSDataInputStream
31
*/
32
public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream();
33
}
34
```
35
36
### Positioning Operations
37
38
Advanced positioning and seeking capabilities for random access I/O.
39
40
```java { .api }
41
/**
42
* Seeks to the specified position in the stream.
43
* @param seekPos position to seek to
44
* @throws IOException if seek operation fails
45
*/
46
public void seek(long seekPos) throws IOException;
47
48
/**
49
* Forces a seek operation, bypassing optimization heuristics.
50
* @param seekPos position to seek to
51
* @throws IOException if seek operation fails
52
*/
53
public void forceSeek(long seekPos) throws IOException;
54
55
/**
56
* Gets the current position in the stream.
57
* @return current byte position
58
* @throws IOException if operation fails
59
*/
60
public long getPos() throws IOException;
61
62
/**
63
* Skips exactly the specified number of bytes.
64
* @param bytes number of bytes to skip
65
* @throws IOException if skip operation fails or reaches EOF
66
*/
67
public void skipFully(long bytes) throws IOException;
68
69
/**
70
* Skips up to the specified number of bytes.
71
* @param n maximum number of bytes to skip
72
* @return actual number of bytes skipped
73
* @throws IOException if operation fails
74
*/
75
public long skip(long n) throws IOException;
76
```
77
78
**Usage Examples:**
79
80
```java
81
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
82
83
// Open file and seek to specific position
84
HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/large_file.dat"));
85
86
// Seek to 1MB position
87
inputStream.seek(1024 * 1024);
88
int byteAtPosition = inputStream.read();
89
90
// Get current position
91
long currentPos = inputStream.getPos();
92
System.out.println("Current position: " + currentPos);
93
94
// Skip forward 100 bytes
95
inputStream.skipFully(100);
96
97
// Force seek (bypasses optimization)
98
inputStream.forceSeek(2048 * 1024); // 2MB position
99
100
inputStream.close();
101
```
102
103
### ByteBuffer Operations
104
105
Zero-copy operations using ByteBuffer for high-performance data transfer.
106
107
```java { .api }
108
/**
109
* Reads data into a ByteBuffer from current position.
110
* @param byteBuffer buffer to read data into
111
* @return number of bytes read, or -1 if end of stream
112
* @throws IOException if read operation fails
113
*/
114
public int read(ByteBuffer byteBuffer) throws IOException;
115
116
/**
117
* Reads data into a ByteBuffer from specified position without changing stream position.
118
* @param position absolute position to read from
119
* @param byteBuffer buffer to read data into
120
* @return number of bytes read, or -1 if end of stream
121
* @throws IOException if read operation fails
122
*/
123
public int read(long position, ByteBuffer byteBuffer) throws IOException;
124
```
125
126
**Usage Examples:**
127
128
```java
129
import java.nio.ByteBuffer;
130
131
HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/binary_data.bin"));
132
133
// Allocate ByteBuffer for zero-copy operations
134
ByteBuffer buffer = ByteBuffer.allocateDirect(8192); // 8KB direct buffer
135
136
// Read data into ByteBuffer
137
int bytesRead = inputStream.read(buffer);
138
while (bytesRead != -1) {
139
buffer.flip(); // Prepare for reading
140
141
// Process data from buffer
142
while (buffer.hasRemaining()) {
143
byte b = buffer.get();
144
// Process byte
145
}
146
147
buffer.clear(); // Prepare for next read
148
bytesRead = inputStream.read(buffer);
149
}
150
151
// Positional read without changing stream position
152
ByteBuffer posBuffer = ByteBuffer.allocate(1024);
153
long savedPosition = inputStream.getPos();
154
int posRead = inputStream.read(1024 * 1024, posBuffer); // Read from 1MB position
155
// Stream position remains at savedPosition
156
157
inputStream.close();
158
```
159
160
### Standard I/O Operations
161
162
Traditional byte array and single byte reading operations.
163
164
```java { .api }
165
/**
166
* Reads a single byte.
167
* @return byte value (0-255) or -1 if end of stream
168
* @throws IOException if read operation fails
169
*/
170
public int read() throws IOException;
171
172
/**
173
* Reads data into a byte array.
174
* @param buffer byte array to read into
175
* @param offset starting offset in the buffer
176
* @param length maximum number of bytes to read
177
* @return number of bytes read, or -1 if end of stream
178
* @throws IOException if read operation fails
179
*/
180
public int read(byte[] buffer, int offset, int length) throws IOException;
181
182
/**
183
* Returns the number of bytes available for reading without blocking.
184
* @return estimated number of available bytes
185
* @throws IOException if operation fails
186
*/
187
public int available() throws IOException;
188
189
/**
190
* Closes the input stream and releases resources.
191
* @throws IOException if close operation fails
192
*/
193
public void close() throws IOException;
194
```
195
196
### HadoopDataOutputStream
197
198
High-performance output stream with positioning and synchronization capabilities.
199
200
```java { .api }
201
/**
202
* Concrete implementation of FSDataOutputStream for Hadoop's output streams.
203
* Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
204
*/
205
public class HadoopDataOutputStream extends FSDataOutputStream {
206
/**
207
* Creates a HadoopDataOutputStream wrapping a Hadoop FSDataOutputStream.
208
* @param fdos the Hadoop output stream to wrap
209
*/
210
public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos);
211
212
/**
213
* Gets the wrapped Hadoop output stream.
214
* @return the underlying Hadoop FSDataOutputStream
215
*/
216
public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream();
217
}
218
```
219
220
### Output Stream Operations
221
222
Writing and positioning operations for output streams.
223
224
```java { .api }
225
/**
226
* Writes a single byte.
227
* @param b byte value to write
228
* @throws IOException if write operation fails
229
*/
230
public void write(int b) throws IOException;
231
232
/**
233
* Writes data from a byte array.
234
* @param b byte array containing data
235
* @param off starting offset in the array
236
* @param len number of bytes to write
237
* @throws IOException if write operation fails
238
*/
239
public void write(byte[] b, int off, int len) throws IOException;
240
241
/**
242
* Gets the current position in the output stream.
243
* @return current byte position
244
* @throws IOException if operation fails
245
*/
246
public long getPos() throws IOException;
247
248
/**
249
* Flushes any buffered data to the underlying stream.
250
* @throws IOException if flush operation fails
251
*/
252
public void flush() throws IOException;
253
254
/**
255
* Synchronizes data to stable storage (fsync equivalent).
256
* @throws IOException if sync operation fails
257
*/
258
public void sync() throws IOException;
259
260
/**
261
* Closes the output stream and releases resources.
262
* @throws IOException if close operation fails
263
*/
264
public void close() throws IOException;
265
```
266
267
**Usage Examples:**
268
269
```java
270
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
271
272
// Create output stream
273
HadoopDataOutputStream outputStream = fs.create(
274
new Path("hdfs://namenode:9000/data/output.dat"),
275
WriteMode.OVERWRITE
276
);
277
278
// Write single bytes
279
outputStream.write(65); // Write 'A'
280
outputStream.write(66); // Write 'B'
281
282
// Write byte arrays
283
byte[] data = "Hello, Hadoop!".getBytes();
284
outputStream.write(data, 0, data.length);
285
286
// Get current position
287
long position = outputStream.getPos();
288
System.out.println("Written " + position + " bytes");
289
290
// Flush to ensure data is written
291
outputStream.flush();
292
293
// Sync to stable storage (important for durability)
294
outputStream.sync();
295
296
// Close stream
297
outputStream.close();
298
```
299
300
### Performance Optimizations
301
302
The streams include several performance optimizations:
303
304
```java
305
// Smart seeking - only performs actual seek if distance is significant
306
HadoopDataInputStream inputStream = fs.open(filePath);
307
inputStream.seek(100); // Small skip, may use skip() instead of seek()
308
inputStream.seek(2 * 1024 * 1024); // Large skip, will use seek()
309
310
// Direct ByteBuffer operations avoid memory copies
311
ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
312
int read = inputStream.read(directBuffer);
313
314
// Connection limiting prevents resource exhaustion
315
// (configured at factory level)
316
Configuration config = new Configuration();
317
config.setInteger("fs.hdfs.limit.input", 50); // Max 50 input streams
318
config.setInteger("fs.hdfs.limit.output", 30); // Max 30 output streams
319
```
320
321
### Error Handling and Resource Management
322
323
Proper error handling and resource cleanup patterns:
324
325
```java
326
HadoopDataInputStream inputStream = null;
327
try {
328
inputStream = fs.open(filePath);
329
330
// Read operations
331
ByteBuffer buffer = ByteBuffer.allocate(8192);
332
int bytesRead = inputStream.read(buffer);
333
334
// Process data...
335
336
} catch (IOException e) {
337
System.err.println("I/O error: " + e.getMessage());
338
} finally {
339
if (inputStream != null) {
340
try {
341
inputStream.close();
342
} catch (IOException e) {
343
System.err.println("Error closing stream: " + e.getMessage());
344
}
345
}
346
}
347
348
// Try-with-resources pattern (recommended)
349
try (HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE)) {
350
outputStream.write("Data".getBytes());
351
outputStream.sync();
352
// Stream automatically closed
353
} catch (IOException e) {
354
System.err.println("Write error: " + e.getMessage());
355
}
356
```
357
358
## Types
359
360
```java { .api }
361
// Base stream interfaces
362
public abstract class FSDataInputStream extends DataInputStream {
363
public abstract void seek(long seekPos) throws IOException;
364
public abstract long getPos() throws IOException;
365
}
366
367
public abstract class FSDataOutputStream extends DataOutputStream {
368
public abstract long getPos() throws IOException;
369
public abstract void sync() throws IOException;
370
}
371
372
// ByteBuffer operations interface
373
public interface ByteBufferReadable {
374
int read(ByteBuffer byteBuffer) throws IOException;
375
}
376
377
// Write modes
378
public enum WriteMode {
379
NO_OVERWRITE,
380
OVERWRITE
381
}
382
```