Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-fs@2.1.00
# Apache Flink Hadoop FileSystem
1
2
Apache Flink Hadoop FileSystem (flink-hadoop-fs) provides seamless integration between Apache Flink's file system abstraction and Hadoop's file system implementations. It enables Flink applications to access HDFS, S3, Azure Blob Storage, Google Cloud Storage, and other Hadoop-compatible file systems with full support for fault-tolerant streaming, exactly-once processing guarantees, and high-performance I/O operations.
3
4
## Package Information
5
6
- **Package Name**: flink-hadoop-fs
7
- **Package Type**: maven
8
- **GroupId**: org.apache.flink
9
- **ArtifactId**: flink-hadoop-fs
10
- **Language**: Java
11
- **Installation**: Add to Maven dependencies:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-hadoop-fs</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;
25
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
26
import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
27
import org.apache.flink.runtime.util.HadoopUtils;
28
import org.apache.flink.core.fs.FileSystem;
29
import org.apache.flink.core.fs.Path;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;
36
import org.apache.flink.core.fs.FileSystem;
37
import org.apache.flink.core.fs.Path;
38
import org.apache.flink.configuration.Configuration;
39
import java.net.URI;
40
41
// Create and configure factory
42
HadoopFsFactory factory = new HadoopFsFactory();
43
Configuration config = new Configuration();
44
factory.configure(config);
45
46
// Create file system for HDFS
47
URI hdfsUri = URI.create("hdfs://namenode:9000/");
48
FileSystem fs = factory.create(hdfsUri);
49
50
// Basic file operations
51
Path filePath = new Path("hdfs://namenode:9000/data/input.txt");
52
boolean exists = fs.exists(filePath);
53
FileStatus status = fs.getFileStatus(filePath);
54
55
// Read from file
56
FSDataInputStream inputStream = fs.open(filePath);
57
// ... read data
58
inputStream.close();
59
60
// Write to file
61
FSDataOutputStream outputStream = fs.create(new Path("hdfs://namenode:9000/data/output.txt"));
62
outputStream.writeUTF("Hello, Hadoop!");
63
outputStream.close();
64
```
65
66
## Architecture
67
68
Apache Flink Hadoop FileSystem is built around several key components:
69
70
- **Factory Pattern**: `HadoopFsFactory` creates appropriate file system instances for different schemes (HDFS, S3, etc.)
71
- **FileSystem Wrapper**: `HadoopFileSystem` wraps Hadoop's FileSystem implementations with Flink's interface
72
- **Recoverable Writers**: Fault-tolerant writers that support exactly-once processing guarantees through checkpoint/recovery
73
- **Optimized Streams**: High-performance I/O streams with ByteBuffer support and connection limiting
74
- **Configuration Integration**: Seamless bridging between Flink and Hadoop configurations
75
- **Security Support**: Kerberos authentication and delegation token management
76
77
## Capabilities
78
79
### FileSystem Factory
80
81
Factory for creating Hadoop-based file systems that automatically detects and instantiates the appropriate implementation based on URI schemes.
82
83
```java { .api }
84
public class HadoopFsFactory implements FileSystemFactory {
85
public String getScheme();
86
public void configure(Configuration config);
87
public FileSystem create(URI fsUri) throws IOException;
88
}
89
```
90
91
[FileSystem Factory](./filesystem-factory.md)
92
93
### Core FileSystem Operations
94
95
Comprehensive file system operations including reading, writing, directory management, and metadata access with support for all Hadoop-compatible file systems.
96
97
```java { .api }
98
public class HadoopFileSystem extends FileSystem {
99
public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem);
100
public FileStatus getFileStatus(Path f) throws IOException;
101
public HadoopDataInputStream open(Path f) throws IOException;
102
public HadoopDataOutputStream create(Path f, WriteMode overwrite) throws IOException;
103
public boolean delete(Path f, boolean recursive) throws IOException;
104
public FileStatus[] listStatus(Path f) throws IOException;
105
public RecoverableWriter createRecoverableWriter() throws IOException;
106
}
107
```
108
109
[Core FileSystem Operations](./filesystem-operations.md)
110
111
### High-Performance I/O Streams
112
113
Optimized input and output streams with ByteBuffer support, connection limiting, and advanced positioning capabilities for efficient data processing.
114
115
```java { .api }
116
public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {
117
public void seek(long seekPos) throws IOException;
118
public int read(ByteBuffer byteBuffer) throws IOException;
119
public int read(long position, ByteBuffer byteBuffer) throws IOException;
120
}
121
122
public class HadoopDataOutputStream extends FSDataOutputStream {
123
public long getPos() throws IOException;
124
public void sync() throws IOException;
125
}
126
```
127
128
[I/O Streams](./io-streams.md)
129
130
### Fault-Tolerant Writing
131
132
Recoverable writers that provide exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms.
133
134
```java { .api }
135
public class HadoopRecoverableWriter implements RecoverableWriter {
136
public RecoverableFsDataOutputStream open(Path filePath) throws IOException;
137
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
138
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
139
}
140
141
public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
142
public Path targetFile();
143
public Path tempFile();
144
public long offset();
145
}
146
```
147
148
[Fault-Tolerant Writing](./recoverable-writers.md)
149
150
### Hadoop Integration Utilities
151
152
Utility functions for configuration management, security handling, and version compatibility checks when working with Hadoop ecosystems.
153
154
```java { .api }
155
public class HadoopUtils {
156
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration);
157
public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi);
158
public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache);
159
}
160
```
161
162
[Hadoop Integration Utilities](./hadoop-utilities.md)
163
164
## Supported File Systems
165
166
This package supports all Hadoop-compatible file systems through automatic scheme detection:
167
168
- **HDFS**: Hadoop Distributed File System (hdfs://)
169
- **Amazon S3**: S3A and S3N implementations (s3a://, s3n://, s3://)
170
- **Azure Storage**: Blob Storage and Data Lake (wasb://, wasbs://, abfs://, abfss://)
171
- **Google Cloud Storage**: GCS connector (gs://)
172
- **Local File System**: Local file access (file://)
173
- **Other Hadoop FS**: Any Hadoop FileSystem implementation
174
175
## Error Handling
176
177
The package throws standard Java IOExceptions for file system operations, with specific exceptions for:
178
179
- `UnsupportedFileSystemSchemeException`: When the URI scheme is not supported by Hadoop
180
- `UnknownHostException`: When the file system authority cannot be resolved
181
- `IOException`: General I/O errors during file operations
182
- `FlinkRuntimeException`: Configuration and version compatibility issues
183
184
All exceptions include detailed error messages to aid in troubleshooting configuration and connectivity issues.
185
186
## Types
187
188
```java { .api }
189
// Core file system interface
190
public abstract class FileSystem {
191
public enum WriteMode { NO_OVERWRITE, OVERWRITE }
192
}
193
194
// File metadata
195
public interface FileStatus {
196
long getLen();
197
long getBlockSize();
198
long getAccessTime();
199
long getModificationTime();
200
short getReplication();
201
Path getPath();
202
boolean isDir();
203
}
204
205
// Block location information
206
public interface BlockLocation extends Comparable<BlockLocation> {
207
String[] getHosts() throws IOException;
208
long getLength();
209
long getOffset();
210
}
211
212
// Path representation
213
public class Path {
214
public Path(String path);
215
public Path(URI uri);
216
public URI toUri();
217
}
218
```