0
# FileSystem Factory
1
2
The HadoopFsFactory provides a factory pattern implementation for creating Hadoop-based file systems in Flink applications. It automatically detects the appropriate Hadoop FileSystem implementation based on URI schemes and handles the complex initialization process.
3
4
## Capabilities
5
6
### HadoopFsFactory Class
7
8
Factory class that implements Flink's FileSystemFactory interface to create Hadoop-compatible file systems.
9
10
```java { .api }
11
/**
12
* A file system factory for Hadoop-based file systems.
13
* Calls Hadoop's mechanism to find a file system implementation for a given file
14
* system scheme and wraps it as a Flink file system.
15
*/
16
public class HadoopFsFactory implements FileSystemFactory {
17
public HadoopFsFactory();
18
19
/**
20
* Returns the scheme handled by this factory.
21
* @return "*" indicating it handles various schemes
22
*/
23
public String getScheme();
24
25
/**
26
* Configures the factory with Flink configuration.
27
* @param config Flink's configuration object
28
*/
29
public void configure(Configuration config);
30
31
/**
32
* Creates a file system instance for the given URI.
33
* @param fsUri URI of the file system to create
34
* @return FileSystem instance wrapped as Flink FileSystem
35
* @throws IOException if file system creation fails
36
* @throws UnsupportedFileSystemSchemeException if scheme is not supported
37
*/
38
public FileSystem create(URI fsUri) throws IOException;
39
}
40
```
41
42
**Usage Examples:**
43
44
```java
45
import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;
46
import org.apache.flink.configuration.Configuration;
47
import org.apache.flink.core.fs.FileSystem;
48
import java.net.URI;
49
50
// Create and configure factory
51
HadoopFsFactory factory = new HadoopFsFactory();
52
Configuration config = new Configuration();
53
// Add Hadoop configuration properties to Flink config if needed
54
config.setString("fs.defaultFS", "hdfs://namenode:9000");
55
factory.configure(config);
56
57
// Create HDFS file system
58
URI hdfsUri = URI.create("hdfs://namenode:9000/");
59
FileSystem hdfsFs = factory.create(hdfsUri);
60
61
// Create S3 file system
62
URI s3Uri = URI.create("s3a://my-bucket/");
63
FileSystem s3Fs = factory.create(s3Uri);
64
65
// Create local file system
66
URI localUri = URI.create("file:///tmp/");
67
FileSystem localFs = factory.create(localUri);
68
```
69
70
### Factory Configuration
71
72
The factory integrates Flink and Hadoop configurations, allowing you to set Hadoop properties through Flink's configuration system.
73
74
```java
75
import org.apache.flink.configuration.Configuration;
76
77
Configuration config = new Configuration();
78
79
// Set Hadoop configuration through Flink config
80
config.setString("fs.defaultFS", "hdfs://namenode:9000");
81
config.setString("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
82
config.setString("fs.s3a.access.key", "your-access-key");
83
config.setString("fs.s3a.secret.key", "your-secret-key");
84
85
// Configure connection limits
86
config.setInteger("fs.hdfs.limit.total", 100);
87
config.setInteger("fs.hdfs.limit.input", 50);
88
config.setInteger("fs.hdfs.limit.output", 50);
89
config.setLong("fs.hdfs.limit.timeout", 30000L);
90
91
factory.configure(config);
92
```
93
94
### Scheme Detection and Support
95
96
The factory automatically detects and supports various file system schemes:
97
98
```java
99
// HDFS schemes
100
FileSystem hdfs1 = factory.create(URI.create("hdfs://namenode:9000/"));
101
FileSystem hdfs2 = factory.create(URI.create("hdfs://ha-cluster/"));
102
103
// S3 schemes
104
FileSystem s3a = factory.create(URI.create("s3a://bucket/"));
105
FileSystem s3n = factory.create(URI.create("s3n://bucket/"));
106
FileSystem s3 = factory.create(URI.create("s3://bucket/"));
107
108
// Azure schemes
109
FileSystem wasb = factory.create(URI.create("wasb://container@account.blob.core.windows.net/"));
110
FileSystem abfs = factory.create(URI.create("abfs://container@account.dfs.core.windows.net/"));
111
112
// Google Cloud Storage
113
FileSystem gcs = factory.create(URI.create("gs://bucket/"));
114
115
// Local file system
116
FileSystem local = factory.create(URI.create("file:///path/"));
117
```
118
119
### Connection Limiting
120
121
The factory supports connection limiting to prevent resource exhaustion when accessing remote file systems:
122
123
```java
124
Configuration config = new Configuration();
125
126
// Set total connection limit for HDFS
127
config.setInteger("fs.hdfs.limit.total", 100);
128
129
// Set input stream connection limit
130
config.setInteger("fs.hdfs.limit.input", 50);
131
132
// Set output stream connection limit
133
config.setInteger("fs.hdfs.limit.output", 30);
134
135
// Set connection timeout (milliseconds)
136
config.setLong("fs.hdfs.limit.timeout", 30000L);
137
138
// Set stream inactivity timeout (milliseconds)
139
config.setLong("fs.hdfs.limit.stream-timeout", 300000L);
140
141
factory.configure(config);
142
143
// Created file system will automatically use connection limiting
144
FileSystem limitedFs = factory.create(URI.create("hdfs://namenode:9000/"));
145
```
146
147
### Error Handling
148
149
The factory provides detailed error handling for common configuration and connectivity issues:
150
151
```java
152
try {
153
FileSystem fs = factory.create(URI.create("hdfs://invalid-host:9000/"));
154
} catch (UnsupportedFileSystemSchemeException e) {
155
// Scheme not supported or Hadoop classes missing
156
System.err.println("Unsupported scheme: " + e.getMessage());
157
} catch (UnknownHostException e) {
158
// Authority cannot be resolved
159
System.err.println("Cannot resolve host: " + e.getMessage());
160
} catch (IOException e) {
161
// General I/O error during file system creation
162
System.err.println("File system creation failed: " + e.getMessage());
163
}
164
```
165
166
### Authority Resolution
167
168
When a URI doesn't specify an authority, the factory attempts to resolve it from Hadoop configuration:
169
170
```java
171
Configuration config = new Configuration();
172
config.setString("fs.defaultFS", "hdfs://namenode:9000");
173
factory.configure(config);
174
175
// This URI has no authority
176
URI uriNoAuthority = URI.create("hdfs:///path/to/file");
177
178
// Factory will use fs.defaultFS to resolve authority
179
FileSystem fs = factory.create(uriNoAuthority);
180
// Results in: hdfs://namenode:9000/path/to/file
181
```
182
183
## Types
184
185
```java { .api }
186
// Flink's FileSystemFactory interface
187
public interface FileSystemFactory {
188
String getScheme();
189
void configure(Configuration config);
190
FileSystem create(URI fsUri) throws IOException;
191
}
192
193
// Flink's Configuration class
194
public class Configuration {
195
public void setString(String key, String value);
196
public void setInteger(String key, int value);
197
public void setLong(String key, long value);
198
public String getString(String key, String defaultValue);
199
}
200
201
// Exceptions thrown by factory
202
public class UnsupportedFileSystemSchemeException extends IOException {
203
public UnsupportedFileSystemSchemeException(String message, Throwable cause);
204
}
205
```