0
# FileSystem Configuration
1
2
The Flink GS FileSystem plugin provides comprehensive configuration management for integrating Google Cloud Storage with Flink applications. The system handles authentication, performance tuning, and filesystem creation through a factory pattern.
3
4
## Capabilities
5
6
### GSFileSystemFactory
7
8
Main factory class for creating and configuring GS filesystem instances. Registered automatically with Flink's plugin system.
9
10
```java { .api }
11
/**
12
* Implementation of the Flink FileSystemFactory interface for Google Storage.
13
* Automatically registered via META-INF/services/org.apache.flink.core.fs.FileSystemFactory
14
*/
15
public class GSFileSystemFactory implements FileSystemFactory {
16
/** The scheme for the Google Storage file system */
17
public static final String SCHEME = "gs";
18
19
/** Constructs the Google Storage file system factory */
20
public GSFileSystemFactory();
21
22
/**
23
* Configure the factory with Flink configuration
24
* @param flinkConfig The Flink configuration
25
*/
26
public void configure(Configuration flinkConfig);
27
28
/**
29
* Get the filesystem scheme
30
* @return "gs"
31
*/
32
public String getScheme();
33
34
/**
35
* Create a filesystem instance for the given URI
36
* @param fsUri The filesystem URI (must have gs:// scheme)
37
* @return GSFileSystem instance
38
* @throws IOException If filesystem creation fails
39
*/
40
public FileSystem create(URI fsUri) throws IOException;
41
}
42
```
43
44
**Usage Example:**
45
46
```java
47
import org.apache.flink.configuration.Configuration;
48
import org.apache.flink.fs.gs.GSFileSystemFactory;
49
50
// Factory is automatically instantiated and configured by Flink
51
// Users typically don't interact with it directly
52
GSFileSystemFactory factory = new GSFileSystemFactory();
53
Configuration config = new Configuration();
54
config.setString("gs.auth.service.account.json.keyfile", "/path/to/key.json");
55
factory.configure(config);
56
57
URI gcsUri = URI.create("gs://my-bucket/path");
58
FileSystem fs = factory.create(gcsUri);
59
```
60
61
### GSFileSystemOptions
62
63
Configuration options container that manages all filesystem and writer settings.
64
65
```java { .api }
66
/**
67
* Configuration options for the GS filesystem and recoverable writer
68
*/
69
public class GSFileSystemOptions {
70
71
// Configuration option constants
72
public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME;
73
public static final ConfigOption<MemorySize> WRITER_CHUNK_SIZE;
74
public static final ConfigOption<Boolean> ENABLE_FILESINK_ENTROPY;
75
public static final ConfigOption<Integer> GCS_HTTP_CONNECT_TIMEOUT;
76
public static final ConfigOption<Integer> GCS_HTTP_READ_TIMEOUT;
77
public static final ConfigOption<Integer> GCS_RETRY_MAX_ATTEMPT;
78
public static final ConfigOption<Duration> GCS_RETRY_INIT_RPC_TIMEOUT;
79
public static final ConfigOption<Double> GCS_RETRY_RPC_TIMEOUT_MULTIPLIER;
80
public static final ConfigOption<Duration> GCS_RETRY_MAX_RPC_TIMEOUT;
81
public static final ConfigOption<Duration> GCS_RETRY_TOTAL_TIMEOUT;
82
83
/**
84
* Constructs an options instance from Flink configuration
85
* @param flinkConfig The Flink configuration
86
*/
87
public GSFileSystemOptions(Configuration flinkConfig);
88
89
/**
90
* Get temporary bucket name for recoverable writes
91
* @return Optional temporary bucket name
92
*/
93
public Optional<String> getWriterTemporaryBucketName();
94
95
/**
96
* Get chunk size for writes to Google Storage
97
* @return Optional chunk size (must be multiple of 256KB)
98
*/
99
public Optional<MemorySize> getWriterChunkSize();
100
101
/**
102
* Check if entropy injection is enabled for FileSink paths
103
* @return true if entropy injection is enabled
104
*/
105
public Boolean isFileSinkEntropyEnabled();
106
107
/**
108
* Get HTTP connection timeout
109
* @return Optional connection timeout in milliseconds
110
*/
111
public Optional<Integer> getHTTPConnectionTimeout();
112
113
/**
114
* Get HTTP read timeout
115
* @return Optional read timeout in milliseconds
116
*/
117
public Optional<Integer> getHTTPReadTimeout();
118
119
/**
120
* Get maximum retry attempts for operations
121
* @return Optional maximum attempts
122
*/
123
public Optional<Integer> getMaxAttempts();
124
125
/**
126
* Get initial RPC timeout for retry operations
127
* @return Optional initial timeout duration
128
*/
129
public Optional<org.threeten.bp.Duration> getInitialRpcTimeout();
130
131
/**
132
* Get RPC timeout multiplier for retry backoff
133
* @return Optional timeout multiplier
134
*/
135
public Optional<Double> getRpcTimeoutMultiplier();
136
137
/**
138
* Get maximum RPC timeout for retry operations
139
* @return Optional maximum RPC timeout duration
140
*/
141
public Optional<org.threeten.bp.Duration> getMaxRpcTimeout();
142
143
/**
144
* Get total timeout for all retry operations
145
* @return Optional total timeout duration
146
*/
147
public Optional<org.threeten.bp.Duration> getTotalTimeout();
148
}
149
```
150
151
**Configuration Examples:**
152
153
```java
154
import org.apache.flink.configuration.Configuration;
155
import org.apache.flink.configuration.MemorySize;
156
import org.apache.flink.fs.gs.GSFileSystemOptions;
157
158
// Create configuration
159
Configuration config = new Configuration();
160
161
// Writer configuration
162
config.setString("gs.writer.temporary.bucket.name", "temp-bucket");
163
config.set(GSFileSystemOptions.WRITER_CHUNK_SIZE, MemorySize.ofMebiBytes(8));
164
config.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, true);
165
166
// HTTP timeouts
167
config.setInteger("gs.http.connect-timeout", 30000);
168
config.setInteger("gs.http.read-timeout", 60000);
169
170
// Retry configuration
171
config.setInteger("gs.retry.max-attempt", 10);
172
config.setString("gs.retry.init-rpc-timeout", "5s");
173
config.setDouble("gs.retry.rpc-timeout-multiplier", 2.0);
174
config.setString("gs.retry.max-rpc-timeout", "60s");
175
config.setString("gs.retry.total-timeout", "300s");
176
177
// Create options instance
178
GSFileSystemOptions options = new GSFileSystemOptions(config);
179
```
180
181
### GSFileSystem
182
183
Core filesystem implementation that extends Hadoop FileSystem with recoverable writer support.
184
185
```java { .api }
186
/**
187
* FileSystem implementation that wraps GoogleHadoopFileSystem and supports RecoverableWriter
188
* Package-private - users interact through standard Flink FileSystem APIs
189
*/
190
class GSFileSystem extends HadoopFileSystem {
191
192
/**
193
* Create a recoverable writer for fault-tolerant streaming writes
194
* @return GSRecoverableWriter instance
195
*/
196
public RecoverableWriter createRecoverableWriter();
197
}
198
```
199
200
**Usage Example:**
201
202
```java
203
import org.apache.flink.core.fs.FileSystem;
204
import org.apache.flink.core.fs.Path;
205
import org.apache.flink.core.fs.RecoverableWriter;
206
207
// Get filesystem instance (automatically created by factory)
208
Path gcsPath = new Path("gs://my-bucket/data/");
209
FileSystem fs = gcsPath.getFileSystem();
210
211
// Use recoverable writer for streaming applications
212
RecoverableWriter recoverableWriter = fs.createRecoverableWriter();
213
RecoverableFsDataOutputStream outputStream = recoverableWriter.open(
214
new Path("gs://my-bucket/output/part-0")
215
);
216
```
217
218
## Configuration Properties
219
220
### Authentication Configuration
221
222
Configure authentication through Hadoop configuration properties:
223
224
```properties
225
# Enable service account authentication
226
google.cloud.auth.service.account.enable=true
227
228
# Path to service account JSON key file
229
google.cloud.auth.service.account.json.keyfile=/path/to/service-account.json
230
231
# Alternative: use environment variable GOOGLE_APPLICATION_CREDENTIALS
232
```
233
234
### Writer Configuration
235
236
Configure recoverable writer behavior:
237
238
```properties
239
# Temporary bucket for multi-part uploads (optional)
240
gs.writer.temporary.bucket.name=my-temp-bucket
241
242
# Upload chunk size - must be multiple of 256KB
243
gs.writer.chunk.size=8MB
244
245
# Enable entropy injection to reduce hotspots
246
gs.filesink.entropy.enabled=true
247
```
248
249
### Network Configuration
250
251
Configure HTTP client behavior:
252
253
```properties
254
# Connection timeout in milliseconds
255
gs.http.connect-timeout=30000
256
257
# Read timeout in milliseconds
258
gs.http.read-timeout=60000
259
```
260
261
### Retry Configuration
262
263
Configure retry behavior for transient failures:
264
265
```properties
266
# Maximum number of retry attempts
267
gs.retry.max-attempt=10
268
269
# Initial RPC timeout
270
gs.retry.init-rpc-timeout=5s
271
272
# Timeout multiplier for exponential backoff
273
gs.retry.rpc-timeout-multiplier=2.0
274
275
# Maximum RPC timeout
276
gs.retry.max-rpc-timeout=60s
277
278
# Total timeout for all retries
279
gs.retry.total-timeout=300s
280
```
281
282
## Integration with Flink
283
284
### Service Discovery
285
286
The filesystem factory is automatically registered with Flink through the service provider interface:
287
288
```
289
META-INF/services/org.apache.flink.core.fs.FileSystemFactory
290
```
291
292
This file contains:
293
```
294
org.apache.flink.fs.gs.GSFileSystemFactory
295
```
296
297
### Configuration Loading
298
299
The factory integrates with Flink's configuration system to load settings from:
300
301
1. **Flink Configuration**: Properties with `gs.*` prefixes
302
2. **Hadoop Configuration**: Properties with `fs.gs.*` prefixes from Hadoop config files
303
3. **Environment Variables**: `HADOOP_CONF_DIR`, `GOOGLE_APPLICATION_CREDENTIALS`
304
305
### Hadoop Integration
306
307
The implementation leverages Hadoop's configuration system and integrates with:
308
309
- **HadoopConfigLoader**: For loading Hadoop-style configurations
310
- **GoogleHadoopFileSystem**: Underlying GCS Hadoop connector
311
- **Configuration overlays**: Flink config takes precedence over Hadoop config