or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-factory.mdfilesystem-operations.mdhadoop-utilities.mdindex.mdio-streams.mdrecoverable-writers.md

io-streams.mddocs/

0

# I/O Streams

1

2

The Hadoop FileSystem package provides high-performance I/O streams optimized for Flink's data processing requirements. These streams support ByteBuffer operations, advanced positioning, connection limiting, and efficient data transfer for both batch and streaming workloads.

3

4

## Capabilities

5

6

### HadoopDataInputStream

7

8

High-performance input stream with ByteBuffer support and advanced positioning capabilities.

9

10

```java { .api }

11

/**

12

* Concrete implementation of FSDataInputStream for Hadoop's input streams.

13

* Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).

14

* Implements ByteBufferReadable for zero-copy operations.

15

*/

16

public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {

17

/**

18

* Minimum bytes to skip forward before seeking (performance optimization).

19

*/

20

public static final int MIN_SKIP_BYTES = 1024 * 1024;

21

22

/**

23

* Creates a HadoopDataInputStream wrapping a Hadoop FSDataInputStream.

24

* @param fsDataInputStream the Hadoop input stream to wrap

25

*/

26

public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream);

27

28

/**

29

* Gets the wrapped Hadoop input stream.

30

* @return the underlying Hadoop FSDataInputStream

31

*/

32

public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream();

33

}

34

```

35

36

### Positioning Operations

37

38

Advanced positioning and seeking capabilities for random access I/O.

39

40

```java { .api }

41

/**

42

* Seeks to the specified position in the stream.

43

* @param seekPos position to seek to

44

* @throws IOException if seek operation fails

45

*/

46

public void seek(long seekPos) throws IOException;

47

48

/**

49

* Forces a seek operation, bypassing optimization heuristics.

50

* @param seekPos position to seek to

51

* @throws IOException if seek operation fails

52

*/

53

public void forceSeek(long seekPos) throws IOException;

54

55

/**

56

* Gets the current position in the stream.

57

* @return current byte position

58

* @throws IOException if operation fails

59

*/

60

public long getPos() throws IOException;

61

62

/**

63

* Skips exactly the specified number of bytes.

64

* @param bytes number of bytes to skip

65

* @throws IOException if skip operation fails or reaches EOF

66

*/

67

public void skipFully(long bytes) throws IOException;

68

69

/**

70

* Skips up to the specified number of bytes.

71

* @param n maximum number of bytes to skip

72

* @return actual number of bytes skipped

73

* @throws IOException if operation fails

74

*/

75

public long skip(long n) throws IOException;

76

```

77

78

**Usage Examples:**

79

80

```java

81

import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;

82

83

// Open file and seek to specific position

84

HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/large_file.dat"));

85

86

// Seek to 1MB position

87

inputStream.seek(1024 * 1024);

88

int byteAtPosition = inputStream.read();

89

90

// Get current position

91

long currentPos = inputStream.getPos();

92

System.out.println("Current position: " + currentPos);

93

94

// Skip forward 100 bytes

95

inputStream.skipFully(100);

96

97

// Force seek (bypasses optimization)

98

inputStream.forceSeek(2048 * 1024); // 2MB position

99

100

inputStream.close();

101

```

102

103

### ByteBuffer Operations

104

105

Zero-copy operations using ByteBuffer for high-performance data transfer.

106

107

```java { .api }

108

/**

109

* Reads data into a ByteBuffer from current position.

110

* @param byteBuffer buffer to read data into

111

* @return number of bytes read, or -1 if end of stream

112

* @throws IOException if read operation fails

113

*/

114

public int read(ByteBuffer byteBuffer) throws IOException;

115

116

/**

117

* Reads data into a ByteBuffer from specified position without changing stream position.

118

* @param position absolute position to read from

119

* @param byteBuffer buffer to read data into

120

* @return number of bytes read, or -1 if end of stream

121

* @throws IOException if read operation fails

122

*/

123

public int read(long position, ByteBuffer byteBuffer) throws IOException;

124

```

125

126

**Usage Examples:**

127

128

```java

129

import java.nio.ByteBuffer;

130

131

HadoopDataInputStream inputStream = fs.open(new Path("hdfs://namenode:9000/data/binary_data.bin"));

132

133

// Allocate ByteBuffer for zero-copy operations

134

ByteBuffer buffer = ByteBuffer.allocateDirect(8192); // 8KB direct buffer

135

136

// Read data into ByteBuffer

137

int bytesRead = inputStream.read(buffer);

138

while (bytesRead != -1) {

139

buffer.flip(); // Prepare for reading

140

141

// Process data from buffer

142

while (buffer.hasRemaining()) {

143

byte b = buffer.get();

144

// Process byte

145

}

146

147

buffer.clear(); // Prepare for next read

148

bytesRead = inputStream.read(buffer);

149

}

150

151

// Positional read without changing stream position

152

ByteBuffer posBuffer = ByteBuffer.allocate(1024);

153

long savedPosition = inputStream.getPos();

154

int posRead = inputStream.read(1024 * 1024, posBuffer); // Read from 1MB position

155

// Stream position remains at savedPosition

156

157

inputStream.close();

158

```

159

160

### Standard I/O Operations

161

162

Traditional byte array and single byte reading operations.

163

164

```java { .api }

165

/**

166

* Reads a single byte.

167

* @return byte value (0-255) or -1 if end of stream

168

* @throws IOException if read operation fails

169

*/

170

public int read() throws IOException;

171

172

/**

173

* Reads data into a byte array.

174

* @param buffer byte array to read into

175

* @param offset starting offset in the buffer

176

* @param length maximum number of bytes to read

177

* @return number of bytes read, or -1 if end of stream

178

* @throws IOException if read operation fails

179

*/

180

public int read(byte[] buffer, int offset, int length) throws IOException;

181

182

/**

183

* Returns the number of bytes available for reading without blocking.

184

* @return estimated number of available bytes

185

* @throws IOException if operation fails

186

*/

187

public int available() throws IOException;

188

189

/**

190

* Closes the input stream and releases resources.

191

* @throws IOException if close operation fails

192

*/

193

public void close() throws IOException;

194

```

195

196

### HadoopDataOutputStream

197

198

High-performance output stream with positioning and synchronization capabilities.

199

200

```java { .api }

201

/**

202

* Concrete implementation of FSDataOutputStream for Hadoop's output streams.

203

* Supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).

204

*/

205

public class HadoopDataOutputStream extends FSDataOutputStream {

206

/**

207

* Creates a HadoopDataOutputStream wrapping a Hadoop FSDataOutputStream.

208

* @param fdos the Hadoop output stream to wrap

209

*/

210

public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos);

211

212

/**

213

* Gets the wrapped Hadoop output stream.

214

* @return the underlying Hadoop FSDataOutputStream

215

*/

216

public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream();

217

}

218

```

219

220

### Output Stream Operations

221

222

Writing and positioning operations for output streams.

223

224

```java { .api }

225

/**

226

* Writes a single byte.

227

* @param b byte value to write

228

* @throws IOException if write operation fails

229

*/

230

public void write(int b) throws IOException;

231

232

/**

233

* Writes data from a byte array.

234

* @param b byte array containing data

235

* @param off starting offset in the array

236

* @param len number of bytes to write

237

* @throws IOException if write operation fails

238

*/

239

public void write(byte[] b, int off, int len) throws IOException;

240

241

/**

242

* Gets the current position in the output stream.

243

* @return current byte position

244

* @throws IOException if operation fails

245

*/

246

public long getPos() throws IOException;

247

248

/**

249

* Flushes any buffered data to the underlying stream.

250

* @throws IOException if flush operation fails

251

*/

252

public void flush() throws IOException;

253

254

/**

255

* Synchronizes data to stable storage (fsync equivalent).

256

* @throws IOException if sync operation fails

257

*/

258

public void sync() throws IOException;

259

260

/**

261

* Closes the output stream and releases resources.

262

* @throws IOException if close operation fails

263

*/

264

public void close() throws IOException;

265

```

266

267

**Usage Examples:**

268

269

```java

270

import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;

271

272

// Create output stream

273

HadoopDataOutputStream outputStream = fs.create(

274

new Path("hdfs://namenode:9000/data/output.dat"),

275

WriteMode.OVERWRITE

276

);

277

278

// Write single bytes

279

outputStream.write(65); // Write 'A'

280

outputStream.write(66); // Write 'B'

281

282

// Write byte arrays

283

byte[] data = "Hello, Hadoop!".getBytes();

284

outputStream.write(data, 0, data.length);

285

286

// Get current position

287

long position = outputStream.getPos();

288

System.out.println("Written " + position + " bytes");

289

290

// Flush to ensure data is written

291

outputStream.flush();

292

293

// Sync to stable storage (important for durability)

294

outputStream.sync();

295

296

// Close stream

297

outputStream.close();

298

```

299

300

### Performance Optimizations

301

302

The streams include several performance optimizations:

303

304

```java

305

// Smart seeking - only performs actual seek if distance is significant

306

HadoopDataInputStream inputStream = fs.open(filePath);

307

inputStream.seek(100); // Small skip, may use skip() instead of seek()

308

inputStream.seek(2 * 1024 * 1024); // Large skip, will use seek()

309

310

// Direct ByteBuffer operations avoid memory copies

311

ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);

312

int read = inputStream.read(directBuffer);

313

314

// Connection limiting prevents resource exhaustion

315

// (configured at factory level)

316

Configuration config = new Configuration();

317

config.setInteger("fs.hdfs.limit.input", 50); // Max 50 input streams

318

config.setInteger("fs.hdfs.limit.output", 30); // Max 30 output streams

319

```

320

321

### Error Handling and Resource Management

322

323

Proper error handling and resource cleanup patterns:

324

325

```java

326

HadoopDataInputStream inputStream = null;

327

try {

328

inputStream = fs.open(filePath);

329

330

// Read operations

331

ByteBuffer buffer = ByteBuffer.allocate(8192);

332

int bytesRead = inputStream.read(buffer);

333

334

// Process data...

335

336

} catch (IOException e) {

337

System.err.println("I/O error: " + e.getMessage());

338

} finally {

339

if (inputStream != null) {

340

try {

341

inputStream.close();

342

} catch (IOException e) {

343

System.err.println("Error closing stream: " + e.getMessage());

344

}

345

}

346

}

347

348

// Try-with-resources pattern (recommended)

349

try (HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE)) {

350

outputStream.write("Data".getBytes());

351

outputStream.sync();

352

// Stream automatically closed

353

} catch (IOException e) {

354

System.err.println("Write error: " + e.getMessage());

355

}

356

```

357

358

## Types

359

360

```java { .api }

361

// Base stream interfaces

362

public abstract class FSDataInputStream extends DataInputStream {

363

public abstract void seek(long seekPos) throws IOException;

364

public abstract long getPos() throws IOException;

365

}

366

367

public abstract class FSDataOutputStream extends DataOutputStream {

368

public abstract long getPos() throws IOException;

369

public abstract void sync() throws IOException;

370

}

371

372

// ByteBuffer operations interface

373

public interface ByteBufferReadable {

374

int read(ByteBuffer byteBuffer) throws IOException;

375

}

376

377

// Write modes

378

public enum WriteMode {

379

NO_OVERWRITE,

380

OVERWRITE

381

}

382

```