0
# Flink GS FileSystem Hadoop
1
2
Flink GS FileSystem Hadoop provides a Google Cloud Storage (GCS) filesystem plugin for Apache Flink that enables reading from and writing to GCS buckets using the `gs://` URI scheme. The plugin integrates seamlessly with Flink's FileSystem interface and provides fault-tolerant streaming through recoverable writers, making it ideal for checkpointing, state storage, and data processing workflows.
3
4
## Package Information
5
6
- **Package Name**: flink-gs-fs-hadoop
7
- **Package Type**: Maven
8
- **Group ID**: org.apache.flink
9
- **Artifact ID**: flink-gs-fs-hadoop
10
- **Language**: Java
11
- **Version**: 2.1.0
12
- **License**: Apache License 2.0
13
- **Installation**: Add to Maven dependencies or use Flink's plugin system
14
15
## Core Usage
16
17
The filesystem is automatically registered with Flink through the service provider interface and can be used directly with `gs://` URIs:
18
19
```java
20
import org.apache.flink.core.fs.FileSystem;
21
import org.apache.flink.core.fs.Path;
22
23
// FileSystem is automatically discovered and configured
24
Path gcsPath = new Path("gs://my-bucket/data/file.txt");
25
FileSystem fs = gcsPath.getFileSystem();
26
27
// Use for reading/writing files
28
FSDataInputStream inputStream = fs.open(gcsPath);
29
FSDataOutputStream outputStream = fs.create(gcsPath, WriteMode.OVERWRITE);
30
```
31
32
## Basic Configuration
33
34
Configure the filesystem through Flink configuration using `gs.*` prefixes:
35
36
```properties
37
# Authentication via service account
38
fs.gs.auth.service.account.enable=true
39
fs.gs.auth.service.account.json.keyfile=/path/to/service-account.json
40
41
# Performance tuning
42
gs.writer.chunk.size=8MB
43
gs.filesink.entropy.enabled=true
44
45
# Retry configuration
46
gs.retry.max-attempt=10
47
gs.retry.total-timeout=300s
48
```
49
50
## Architecture
51
52
The plugin is built on several key components:
53
54
- **GSFileSystemFactory**: Main entry point that creates and configures filesystem instances
55
- **GSFileSystem**: Core filesystem implementation extending Hadoop FileSystem with recoverable writer support
56
- **GSRecoverableWriter**: Fault-tolerant writer system for streaming applications with exactly-once guarantees
57
- **GSBlobStorage**: Abstraction layer over Google Cloud Storage operations
58
- **Configuration System**: Comprehensive options for performance tuning, authentication, and retry behavior
59
60
The implementation leverages Google's Cloud Storage SDK and GCS Connector for Hadoop, providing enterprise-grade reliability and performance optimizations.
61
62
## Capabilities
63
64
### FileSystem Factory and Configuration
65
66
Core filesystem factory and configuration management for integrating GCS with Flink applications.
67
68
```java { .api }
69
// Main factory class registered via META-INF services
70
public class GSFileSystemFactory implements FileSystemFactory {
71
public static final String SCHEME = "gs";
72
73
public void configure(Configuration flinkConfig);
74
public String getScheme();
75
public FileSystem create(URI fsUri) throws IOException;
76
}
77
78
// Configuration options container
79
public class GSFileSystemOptions {
80
public Optional<String> getWriterTemporaryBucketName();
81
public Optional<MemorySize> getWriterChunkSize();
82
public Boolean isFileSinkEntropyEnabled();
83
public Optional<Integer> getHTTPConnectionTimeout();
84
public Optional<Integer> getHTTPReadTimeout();
85
// ... retry configuration methods
86
}
87
```
88
89
[FileSystem Configuration](./filesystem-configuration.md)
90
91
### Recoverable Writer System
92
93
Fault-tolerant streaming write system providing exactly-once guarantees for Flink streaming applications.
94
95
```java { .api }
96
// Main recoverable writer interface
97
public class GSRecoverableWriter implements RecoverableWriter {
98
public boolean requiresCleanupOfRecoverableState();
99
public boolean supportsResume();
100
public RecoverableFsDataOutputStream open(Path path) throws IOException;
101
public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
102
public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
103
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
104
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
105
}
106
107
// State objects for recovery
108
public class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
109
public final GSBlobIdentifier finalBlobIdentifier;
110
public final List<UUID> componentObjectIds;
111
}
112
113
public class GSResumeRecoverable extends GSCommitRecoverable
114
implements RecoverableWriter.ResumeRecoverable {
115
public final long position;
116
public final boolean closed;
117
}
118
```
119
120
[Recoverable Writer](./recoverable-writer.md)
121
122
### Storage Operations
123
124
Low-level Google Cloud Storage operations abstraction for blob management and data operations.
125
126
```java { .api }
127
// Storage abstraction interface
128
public interface GSBlobStorage {
129
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
130
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
131
void createBlob(GSBlobIdentifier blobIdentifier);
132
Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
133
List<GSBlobIdentifier> list(String bucketName, String prefix);
134
void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
135
void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
136
List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
137
}
138
139
// Blob identifier abstraction
140
public class GSBlobIdentifier {
141
public final String bucketName;
142
public final String objectName;
143
144
public GSBlobIdentifier(String bucketName, String objectName);
145
public BlobId getBlobId();
146
public static GSBlobIdentifier fromBlobId(BlobId blobId);
147
}
148
```
149
150
[Storage Operations](./storage-operations.md)
151
152
## Configuration Options
153
154
All configuration options use Flink's Configuration system with `gs.*` prefixes:
155
156
| Option | Type | Description |
157
|--------|------|-------------|
158
| `gs.writer.temporary.bucket.name` | String | Bucket for temporary files during recoverable writes |
159
| `gs.writer.chunk.size` | MemorySize | Upload chunk size (must be multiple of 256KB) |
160
| `gs.filesink.entropy.enabled` | Boolean | Enable entropy injection to reduce hotspots (default: false) |
161
| `gs.http.connect-timeout` | Integer | HTTP connection timeout (milliseconds) |
162
| `gs.http.read-timeout` | Integer | HTTP read timeout (milliseconds) |
163
| `gs.retry.max-attempt` | Integer | Maximum retry attempts |
164
| `gs.retry.init-rpc-timeout` | Duration | Initial RPC timeout |
165
| `gs.retry.rpc-timeout-multiplier` | Double | RPC timeout multiplier |
166
| `gs.retry.max-rpc-timeout` | Duration | Maximum RPC timeout |
167
| `gs.retry.total-timeout` | Duration | Total timeout for retries |
168
169
## Common Use Cases
170
171
- **Checkpointing**: Store Flink application checkpoints in GCS for fault tolerance
172
- **State Backend**: Use GCS as a distributed state backend for large-state applications
173
- **Data Ingestion**: Read data files from GCS buckets for batch and streaming processing
174
- **Data Output**: Write processed results to GCS with exactly-once guarantees
175
- **File Sink**: Use FileSink connector to write streaming data to GCS in various formats
176
177
## Error Handling
178
179
The plugin handles various error scenarios:
180
181
- **Authentication Failures**: Clear error messages for credential issues
182
- **Network Timeouts**: Configurable retry policies with exponential backoff
183
- **Storage Errors**: Proper exception propagation with context information
184
- **Recovery Scenarios**: Automatic cleanup and recovery for interrupted operations