Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers
npx @tessl/cli install tessl/maven-org-apache-flink--flink-gs-fs-hadoop@2.1.0Flink GS FileSystem Hadoop provides a Google Cloud Storage (GCS) filesystem plugin for Apache Flink that enables reading from and writing to GCS buckets using the gs:// URI scheme. The plugin integrates seamlessly with Flink's FileSystem interface and provides fault-tolerant streaming through recoverable writers, making it ideal for checkpointing, state storage, and data processing workflows.
The filesystem is automatically registered with Flink through the service provider interface and can be used directly with gs:// URIs:
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
// FileSystem is automatically discovered and configured
Path gcsPath = new Path("gs://my-bucket/data/file.txt");
FileSystem fs = gcsPath.getFileSystem();
// Use for reading/writing files
FSDataInputStream inputStream = fs.open(gcsPath);
FSDataOutputStream outputStream = fs.create(gcsPath, WriteMode.OVERWRITE);Configure the filesystem through Flink configuration using gs.* prefixes:
# Authentication via service account
fs.gs.auth.service.account.enable=true
fs.gs.auth.service.account.json.keyfile=/path/to/service-account.json
# Performance tuning
gs.writer.chunk.size=8MB
gs.filesink.entropy.enabled=true
# Retry configuration
gs.retry.max-attempt=10
gs.retry.total-timeout=300sThe plugin is built on several key components:
The implementation leverages Google's Cloud Storage SDK and GCS Connector for Hadoop, providing enterprise-grade reliability and performance optimizations.
Core filesystem factory and configuration management for integrating GCS with Flink applications.
// Main factory class registered via META-INF services
public class GSFileSystemFactory implements FileSystemFactory {
public static final String SCHEME = "gs";
public void configure(Configuration flinkConfig);
public String getScheme();
public FileSystem create(URI fsUri) throws IOException;
}
// Configuration options container
public class GSFileSystemOptions {
public Optional<String> getWriterTemporaryBucketName();
public Optional<MemorySize> getWriterChunkSize();
public Boolean isFileSinkEntropyEnabled();
public Optional<Integer> getHTTPConnectionTimeout();
public Optional<Integer> getHTTPReadTimeout();
// ... retry configuration methods
}Fault-tolerant streaming write system providing exactly-once guarantees for Flink streaming applications.
// Main recoverable writer interface
public class GSRecoverableWriter implements RecoverableWriter {
public boolean requiresCleanupOfRecoverableState();
public boolean supportsResume();
public RecoverableFsDataOutputStream open(Path path) throws IOException;
public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
}
// State objects for recovery
public class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
public final GSBlobIdentifier finalBlobIdentifier;
public final List<UUID> componentObjectIds;
}
public class GSResumeRecoverable extends GSCommitRecoverable
implements RecoverableWriter.ResumeRecoverable {
public final long position;
public final boolean closed;
}Low-level Google Cloud Storage operations abstraction for blob management and data operations.
// Storage abstraction interface
public interface GSBlobStorage {
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
void createBlob(GSBlobIdentifier blobIdentifier);
Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
List<GSBlobIdentifier> list(String bucketName, String prefix);
void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}
// Blob identifier abstraction
public class GSBlobIdentifier {
public final String bucketName;
public final String objectName;
public GSBlobIdentifier(String bucketName, String objectName);
public BlobId getBlobId();
public static GSBlobIdentifier fromBlobId(BlobId blobId);
}All configuration options use Flink's Configuration system with gs.* prefixes:
| Option | Type | Description |
|---|---|---|
gs.writer.temporary.bucket.name | String | Bucket for temporary files during recoverable writes |
gs.writer.chunk.size | MemorySize | Upload chunk size (must be multiple of 256KB) |
gs.filesink.entropy.enabled | Boolean | Enable entropy injection to reduce hotspots (default: false) |
gs.http.connect-timeout | Integer | HTTP connection timeout (milliseconds) |
gs.http.read-timeout | Integer | HTTP read timeout (milliseconds) |
gs.retry.max-attempt | Integer | Maximum retry attempts |
gs.retry.init-rpc-timeout | Duration | Initial RPC timeout |
gs.retry.rpc-timeout-multiplier | Double | RPC timeout multiplier |
gs.retry.max-rpc-timeout | Duration | Maximum RPC timeout |
gs.retry.total-timeout | Duration | Total timeout for retries |
The plugin handles various error scenarios: