or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-factory.mdfilesystem-operations.mdhadoop-utilities.mdindex.mdio-streams.mdrecoverable-writers.md

index.mddocs/

0

# Apache Flink Hadoop FileSystem

1

2

Apache Flink Hadoop FileSystem (flink-hadoop-fs) provides seamless integration between Apache Flink's file system abstraction and Hadoop's file system implementations. It enables Flink applications to access HDFS, S3, Azure Blob Storage, Google Cloud Storage, and other Hadoop-compatible file systems with full support for fault-tolerant streaming, exactly-once processing guarantees, and high-performance I/O operations.

3

4

## Package Information

5

6

- **Package Name**: flink-hadoop-fs

7

- **Package Type**: maven

8

- **GroupId**: org.apache.flink

9

- **ArtifactId**: flink-hadoop-fs

10

- **Language**: Java

11

- **Installation**: Add to Maven dependencies:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-hadoop-fs</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;

25

import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;

26

import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;

27

import org.apache.flink.runtime.util.HadoopUtils;

28

import org.apache.flink.core.fs.FileSystem;

29

import org.apache.flink.core.fs.Path;

30

```

31

32

## Basic Usage

33

34

```java

35

import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;

36

import org.apache.flink.core.fs.FileSystem;

37

import org.apache.flink.core.fs.Path;

38

import org.apache.flink.configuration.Configuration;

39

import java.net.URI;

40

41

// Create and configure factory

42

HadoopFsFactory factory = new HadoopFsFactory();

43

Configuration config = new Configuration();

44

factory.configure(config);

45

46

// Create file system for HDFS

47

URI hdfsUri = URI.create("hdfs://namenode:9000/");

48

FileSystem fs = factory.create(hdfsUri);

49

50

// Basic file operations

51

Path filePath = new Path("hdfs://namenode:9000/data/input.txt");

52

boolean exists = fs.exists(filePath);

53

FileStatus status = fs.getFileStatus(filePath);

54

55

// Read from file

56

FSDataInputStream inputStream = fs.open(filePath);

57

// ... read data

58

inputStream.close();

59

60

// Write to file

61

FSDataOutputStream outputStream = fs.create(new Path("hdfs://namenode:9000/data/output.txt"));

62

outputStream.writeUTF("Hello, Hadoop!");

63

outputStream.close();

64

```

65

66

## Architecture

67

68

Apache Flink Hadoop FileSystem is built around several key components:

69

70

- **Factory Pattern**: `HadoopFsFactory` creates appropriate file system instances for different schemes (HDFS, S3, etc.)

71

- **FileSystem Wrapper**: `HadoopFileSystem` wraps Hadoop's FileSystem implementations with Flink's interface

72

- **Recoverable Writers**: Fault-tolerant writers that support exactly-once processing guarantees through checkpoint/recovery

73

- **Optimized Streams**: High-performance I/O streams with ByteBuffer support and connection limiting

74

- **Configuration Integration**: Seamless bridging between Flink and Hadoop configurations

75

- **Security Support**: Kerberos authentication and delegation token management

76

77

## Capabilities

78

79

### FileSystem Factory

80

81

Factory for creating Hadoop-based file systems that automatically detects and instantiates the appropriate implementation based on URI schemes.

82

83

```java { .api }

84

public class HadoopFsFactory implements FileSystemFactory {

85

public String getScheme();

86

public void configure(Configuration config);

87

public FileSystem create(URI fsUri) throws IOException;

88

}

89

```

90

91

[FileSystem Factory](./filesystem-factory.md)

92

93

### Core FileSystem Operations

94

95

Comprehensive file system operations including reading, writing, directory management, and metadata access with support for all Hadoop-compatible file systems.

96

97

```java { .api }

98

public class HadoopFileSystem extends FileSystem {

99

public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem);

100

public FileStatus getFileStatus(Path f) throws IOException;

101

public HadoopDataInputStream open(Path f) throws IOException;

102

public HadoopDataOutputStream create(Path f, WriteMode overwrite) throws IOException;

103

public boolean delete(Path f, boolean recursive) throws IOException;

104

public FileStatus[] listStatus(Path f) throws IOException;

105

public RecoverableWriter createRecoverableWriter() throws IOException;

106

}

107

```

108

109

[Core FileSystem Operations](./filesystem-operations.md)

110

111

### High-Performance I/O Streams

112

113

Optimized input and output streams with ByteBuffer support, connection limiting, and advanced positioning capabilities for efficient data processing.

114

115

```java { .api }

116

public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {

117

public void seek(long seekPos) throws IOException;

118

public int read(ByteBuffer byteBuffer) throws IOException;

119

public int read(long position, ByteBuffer byteBuffer) throws IOException;

120

}

121

122

public class HadoopDataOutputStream extends FSDataOutputStream {

123

public long getPos() throws IOException;

124

public void sync() throws IOException;

125

}

126

```

127

128

[I/O Streams](./io-streams.md)

129

130

### Fault-Tolerant Writing

131

132

Recoverable writers that provide exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms.

133

134

```java { .api }

135

public class HadoopRecoverableWriter implements RecoverableWriter {

136

public RecoverableFsDataOutputStream open(Path filePath) throws IOException;

137

public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;

138

public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;

139

}

140

141

public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {

142

public Path targetFile();

143

public Path tempFile();

144

public long offset();

145

}

146

```

147

148

[Fault-Tolerant Writing](./recoverable-writers.md)

149

150

### Hadoop Integration Utilities

151

152

Utility functions for configuration management, security handling, and version compatibility checks when working with Hadoop ecosystems.

153

154

```java { .api }

155

public class HadoopUtils {

156

public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration);

157

public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi);

158

public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache);

159

}

160

```

161

162

[Hadoop Integration Utilities](./hadoop-utilities.md)

163

164

## Supported File Systems

165

166

This package supports all Hadoop-compatible file systems through automatic scheme detection:

167

168

- **HDFS**: Hadoop Distributed File System (hdfs://)

169

- **Amazon S3**: S3A and S3N implementations (s3a://, s3n://, s3://)

170

- **Azure Storage**: Blob Storage and Data Lake (wasb://, wasbs://, abfs://, abfss://)

171

- **Google Cloud Storage**: GCS connector (gs://)

172

- **Local File System**: Local file access (file://)

173

- **Other Hadoop FS**: Any Hadoop FileSystem implementation

174

175

## Error Handling

176

177

The package throws standard Java IOExceptions for file system operations, with specific exceptions for:

178

179

- `UnsupportedFileSystemSchemeException`: When the URI scheme is not supported by Hadoop

180

- `UnknownHostException`: When the file system authority cannot be resolved

181

- `IOException`: General I/O errors during file operations

182

- `FlinkRuntimeException`: Configuration and version compatibility issues

183

184

All exceptions include detailed error messages to aid in troubleshooting configuration and connectivity issues.

185

186

## Types

187

188

```java { .api }

189

// Core file system interface

190

public abstract class FileSystem {

191

public enum WriteMode { NO_OVERWRITE, OVERWRITE }

192

}

193

194

// File metadata

195

public interface FileStatus {

196

long getLen();

197

long getBlockSize();

198

long getAccessTime();

199

long getModificationTime();

200

short getReplication();

201

Path getPath();

202

boolean isDir();

203

}

204

205

// Block location information

206

public interface BlockLocation extends Comparable<BlockLocation> {

207

String[] getHosts() throws IOException;

208

long getLength();

209

long getOffset();

210

}

211

212

// Path representation

213

public class Path {

214

public Path(String path);

215

public Path(URI uri);

216

public URI toUri();

217

}

218

```