or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-factory.mdfilesystem-operations.mdhadoop-utilities.mdindex.mdio-streams.mdrecoverable-writers.md

filesystem-factory.mddocs/

0

# FileSystem Factory

1

2

The HadoopFsFactory provides a factory pattern implementation for creating Hadoop-based file systems in Flink applications. It automatically detects the appropriate Hadoop FileSystem implementation based on URI schemes and handles the complex initialization process.

3

4

## Capabilities

5

6

### HadoopFsFactory Class

7

8

Factory class that implements Flink's FileSystemFactory interface to create Hadoop-compatible file systems.

9

10

```java { .api }

11

/**

12

* A file system factory for Hadoop-based file systems.

13

* Calls Hadoop's mechanism to find a file system implementation for a given file

14

* system scheme and wraps it as a Flink file system.

15

*/

16

public class HadoopFsFactory implements FileSystemFactory {

17

public HadoopFsFactory();

18

19

/**

20

* Returns the scheme handled by this factory.

21

* @return "*" indicating it handles various schemes

22

*/

23

public String getScheme();

24

25

/**

26

* Configures the factory with Flink configuration.

27

* @param config Flink's configuration object

28

*/

29

public void configure(Configuration config);

30

31

/**

32

* Creates a file system instance for the given URI.

33

* @param fsUri URI of the file system to create

34

* @return FileSystem instance wrapped as Flink FileSystem

35

* @throws IOException if file system creation fails

36

* @throws UnsupportedFileSystemSchemeException if scheme is not supported

37

*/

38

public FileSystem create(URI fsUri) throws IOException;

39

}

40

```

41

42

**Usage Examples:**

43

44

```java

45

import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;

46

import org.apache.flink.configuration.Configuration;

47

import org.apache.flink.core.fs.FileSystem;

48

import java.net.URI;

49

50

// Create and configure factory

51

HadoopFsFactory factory = new HadoopFsFactory();

52

Configuration config = new Configuration();

53

// Add Hadoop configuration properties to Flink config if needed

54

config.setString("fs.defaultFS", "hdfs://namenode:9000");

55

factory.configure(config);

56

57

// Create HDFS file system

58

URI hdfsUri = URI.create("hdfs://namenode:9000/");

59

FileSystem hdfsFs = factory.create(hdfsUri);

60

61

// Create S3 file system

62

URI s3Uri = URI.create("s3a://my-bucket/");

63

FileSystem s3Fs = factory.create(s3Uri);

64

65

// Create local file system

66

URI localUri = URI.create("file:///tmp/");

67

FileSystem localFs = factory.create(localUri);

68

```

69

70

### Factory Configuration

71

72

The factory integrates Flink and Hadoop configurations, allowing you to set Hadoop properties through Flink's configuration system.

73

74

```java

75

import org.apache.flink.configuration.Configuration;

76

77

Configuration config = new Configuration();

78

79

// Set Hadoop configuration through Flink config

80

config.setString("fs.defaultFS", "hdfs://namenode:9000");

81

config.setString("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

82

config.setString("fs.s3a.access.key", "your-access-key");

83

config.setString("fs.s3a.secret.key", "your-secret-key");

84

85

// Configure connection limits

86

config.setInteger("fs.hdfs.limit.total", 100);

87

config.setInteger("fs.hdfs.limit.input", 50);

88

config.setInteger("fs.hdfs.limit.output", 50);

89

config.setLong("fs.hdfs.limit.timeout", 30000L);

90

91

factory.configure(config);

92

```

93

94

### Scheme Detection and Support

95

96

The factory automatically detects and supports various file system schemes:

97

98

```java

99

// HDFS schemes

100

FileSystem hdfs1 = factory.create(URI.create("hdfs://namenode:9000/"));

101

FileSystem hdfs2 = factory.create(URI.create("hdfs://ha-cluster/"));

102

103

// S3 schemes

104

FileSystem s3a = factory.create(URI.create("s3a://bucket/"));

105

FileSystem s3n = factory.create(URI.create("s3n://bucket/"));

106

FileSystem s3 = factory.create(URI.create("s3://bucket/"));

107

108

// Azure schemes

109

FileSystem wasb = factory.create(URI.create("wasb://container@account.blob.core.windows.net/"));

110

FileSystem abfs = factory.create(URI.create("abfs://container@account.dfs.core.windows.net/"));

111

112

// Google Cloud Storage

113

FileSystem gcs = factory.create(URI.create("gs://bucket/"));

114

115

// Local file system

116

FileSystem local = factory.create(URI.create("file:///path/"));

117

```

118

119

### Connection Limiting

120

121

The factory supports connection limiting to prevent resource exhaustion when accessing remote file systems:

122

123

```java

124

Configuration config = new Configuration();

125

126

// Set total connection limit for HDFS

127

config.setInteger("fs.hdfs.limit.total", 100);

128

129

// Set input stream connection limit

130

config.setInteger("fs.hdfs.limit.input", 50);

131

132

// Set output stream connection limit

133

config.setInteger("fs.hdfs.limit.output", 30);

134

135

// Set connection timeout (milliseconds)

136

config.setLong("fs.hdfs.limit.timeout", 30000L);

137

138

// Set stream inactivity timeout (milliseconds)

139

config.setLong("fs.hdfs.limit.stream-timeout", 300000L);

140

141

factory.configure(config);

142

143

// Created file system will automatically use connection limiting

144

FileSystem limitedFs = factory.create(URI.create("hdfs://namenode:9000/"));

145

```

146

147

### Error Handling

148

149

The factory provides detailed error handling for common configuration and connectivity issues:

150

151

```java

152

try {

153

FileSystem fs = factory.create(URI.create("hdfs://invalid-host:9000/"));

154

} catch (UnsupportedFileSystemSchemeException e) {

155

// Scheme not supported or Hadoop classes missing

156

System.err.println("Unsupported scheme: " + e.getMessage());

157

} catch (UnknownHostException e) {

158

// Authority cannot be resolved

159

System.err.println("Cannot resolve host: " + e.getMessage());

160

} catch (IOException e) {

161

// General I/O error during file system creation

162

System.err.println("File system creation failed: " + e.getMessage());

163

}

164

```

165

166

### Authority Resolution

167

168

When a URI doesn't specify an authority, the factory attempts to resolve it from Hadoop configuration:

169

170

```java

171

Configuration config = new Configuration();

172

config.setString("fs.defaultFS", "hdfs://namenode:9000");

173

factory.configure(config);

174

175

// This URI has no authority

176

URI uriNoAuthority = URI.create("hdfs:///path/to/file");

177

178

// Factory will use fs.defaultFS to resolve authority

179

FileSystem fs = factory.create(uriNoAuthority);

180

// Results in: hdfs://namenode:9000/path/to/file

181

```

182

183

## Types

184

185

```java { .api }

186

// Flink's FileSystemFactory interface

187

public interface FileSystemFactory {

188

String getScheme();

189

void configure(Configuration config);

190

FileSystem create(URI fsUri) throws IOException;

191

}

192

193

// Flink's Configuration class

194

public class Configuration {

195

public void setString(String key, String value);

196

public void setInteger(String key, int value);

197

public void setLong(String key, long value);

198

public String getString(String key, String defaultValue);

199

}

200

201

// Exceptions thrown by factory

202

public class UnsupportedFileSystemSchemeException extends IOException {

203

public UnsupportedFileSystemSchemeException(String message, Throwable cause);

204

}

205

```