Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers
—
The Flink GS FileSystem plugin provides comprehensive configuration management for integrating Google Cloud Storage with Flink applications. The system handles authentication, performance tuning, and filesystem creation through a factory pattern.
Main factory class for creating and configuring GS filesystem instances. Registered automatically with Flink's plugin system.
/**
* Implementation of the Flink FileSystemFactory interface for Google Storage.
* Automatically registered via META-INF/services/org.apache.flink.core.fs.FileSystemFactory
*/
public class GSFileSystemFactory implements FileSystemFactory {
/** The scheme for the Google Storage file system */
public static final String SCHEME = "gs";
/** Constructs the Google Storage file system factory */
public GSFileSystemFactory();
/**
* Configure the factory with Flink configuration
* @param flinkConfig The Flink configuration
*/
public void configure(Configuration flinkConfig);
/**
* Get the filesystem scheme
* @return "gs"
*/
public String getScheme();
/**
* Create a filesystem instance for the given URI
* @param fsUri The filesystem URI (must have gs:// scheme)
* @return GSFileSystem instance
* @throws IOException If filesystem creation fails
*/
public FileSystem create(URI fsUri) throws IOException;
}Usage Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.gs.GSFileSystemFactory;
// Factory is automatically instantiated and configured by Flink
// Users typically don't interact with it directly
GSFileSystemFactory factory = new GSFileSystemFactory();
Configuration config = new Configuration();
config.setString("gs.auth.service.account.json.keyfile", "/path/to/key.json");
factory.configure(config);
URI gcsUri = URI.create("gs://my-bucket/path");
FileSystem fs = factory.create(gcsUri);Configuration options container that manages all filesystem and writer settings.
/**
* Configuration options for the GS filesystem and recoverable writer
*/
public class GSFileSystemOptions {
// Configuration option constants
public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME;
public static final ConfigOption<MemorySize> WRITER_CHUNK_SIZE;
public static final ConfigOption<Boolean> ENABLE_FILESINK_ENTROPY;
public static final ConfigOption<Integer> GCS_HTTP_CONNECT_TIMEOUT;
public static final ConfigOption<Integer> GCS_HTTP_READ_TIMEOUT;
public static final ConfigOption<Integer> GCS_RETRY_MAX_ATTEMPT;
public static final ConfigOption<Duration> GCS_RETRY_INIT_RPC_TIMEOUT;
public static final ConfigOption<Double> GCS_RETRY_RPC_TIMEOUT_MULTIPLIER;
public static final ConfigOption<Duration> GCS_RETRY_MAX_RPC_TIMEOUT;
public static final ConfigOption<Duration> GCS_RETRY_TOTAL_TIMEOUT;
/**
* Constructs an options instance from Flink configuration
* @param flinkConfig The Flink configuration
*/
public GSFileSystemOptions(Configuration flinkConfig);
/**
* Get temporary bucket name for recoverable writes
* @return Optional temporary bucket name
*/
public Optional<String> getWriterTemporaryBucketName();
/**
* Get chunk size for writes to Google Storage
* @return Optional chunk size (must be multiple of 256KB)
*/
public Optional<MemorySize> getWriterChunkSize();
/**
* Check if entropy injection is enabled for FileSink paths
* @return true if entropy injection is enabled
*/
public Boolean isFileSinkEntropyEnabled();
/**
* Get HTTP connection timeout
* @return Optional connection timeout in milliseconds
*/
public Optional<Integer> getHTTPConnectionTimeout();
/**
* Get HTTP read timeout
* @return Optional read timeout in milliseconds
*/
public Optional<Integer> getHTTPReadTimeout();
/**
* Get maximum retry attempts for operations
* @return Optional maximum attempts
*/
public Optional<Integer> getMaxAttempts();
/**
* Get initial RPC timeout for retry operations
* @return Optional initial timeout duration
*/
public Optional<org.threeten.bp.Duration> getInitialRpcTimeout();
/**
* Get RPC timeout multiplier for retry backoff
* @return Optional timeout multiplier
*/
public Optional<Double> getRpcTimeoutMultiplier();
/**
* Get maximum RPC timeout for retry operations
* @return Optional maximum RPC timeout duration
*/
public Optional<org.threeten.bp.Duration> getMaxRpcTimeout();
/**
* Get total timeout for all retry operations
* @return Optional total timeout duration
*/
public Optional<org.threeten.bp.Duration> getTotalTimeout();
}Configuration Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.fs.gs.GSFileSystemOptions;
// Create configuration
Configuration config = new Configuration();
// Writer configuration
config.setString("gs.writer.temporary.bucket.name", "temp-bucket");
config.set(GSFileSystemOptions.WRITER_CHUNK_SIZE, MemorySize.ofMebiBytes(8));
config.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, true);
// HTTP timeouts
config.setInteger("gs.http.connect-timeout", 30000);
config.setInteger("gs.http.read-timeout", 60000);
// Retry configuration
config.setInteger("gs.retry.max-attempt", 10);
config.setString("gs.retry.init-rpc-timeout", "5s");
config.setDouble("gs.retry.rpc-timeout-multiplier", 2.0);
config.setString("gs.retry.max-rpc-timeout", "60s");
config.setString("gs.retry.total-timeout", "300s");
// Create options instance
GSFileSystemOptions options = new GSFileSystemOptions(config);Core filesystem implementation that extends Hadoop FileSystem with recoverable writer support.
/**
* FileSystem implementation that wraps GoogleHadoopFileSystem and supports RecoverableWriter
* Package-private - users interact through standard Flink FileSystem APIs
*/
class GSFileSystem extends HadoopFileSystem {
/**
* Create a recoverable writer for fault-tolerant streaming writes
* @return GSRecoverableWriter instance
*/
public RecoverableWriter createRecoverableWriter();
}Usage Example:
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
// Get filesystem instance (automatically created by factory)
Path gcsPath = new Path("gs://my-bucket/data/");
FileSystem fs = gcsPath.getFileSystem();
// Use recoverable writer for streaming applications
RecoverableWriter recoverableWriter = fs.createRecoverableWriter();
RecoverableFsDataOutputStream outputStream = recoverableWriter.open(
new Path("gs://my-bucket/output/part-0")
);Configure authentication through Hadoop configuration properties:
# Enable service account authentication
google.cloud.auth.service.account.enable=true
# Path to service account JSON key file
google.cloud.auth.service.account.json.keyfile=/path/to/service-account.json
# Alternative: use environment variable GOOGLE_APPLICATION_CREDENTIALSConfigure recoverable writer behavior:
# Temporary bucket for multi-part uploads (optional)
gs.writer.temporary.bucket.name=my-temp-bucket
# Upload chunk size - must be multiple of 256KB
gs.writer.chunk.size=8MB
# Enable entropy injection to reduce hotspots
gs.filesink.entropy.enabled=trueConfigure HTTP client behavior:
# Connection timeout in milliseconds
gs.http.connect-timeout=30000
# Read timeout in milliseconds
gs.http.read-timeout=60000Configure retry behavior for transient failures:
# Maximum number of retry attempts
gs.retry.max-attempt=10
# Initial RPC timeout
gs.retry.init-rpc-timeout=5s
# Timeout multiplier for exponential backoff
gs.retry.rpc-timeout-multiplier=2.0
# Maximum RPC timeout
gs.retry.max-rpc-timeout=60s
# Total timeout for all retries
gs.retry.total-timeout=300sThe filesystem factory is automatically registered with Flink through the service provider interface:
META-INF/services/org.apache.flink.core.fs.FileSystemFactoryThis file contains:
org.apache.flink.fs.gs.GSFileSystemFactoryThe factory integrates with Flink's configuration system to load settings from:
gs.* prefixesfs.gs.* prefixes from Hadoop config filesHADOOP_CONF_DIR, GOOGLE_APPLICATION_CREDENTIALSThe implementation leverages Hadoop's configuration system and integrates with:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gs-fs-hadoop