CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gs-fs-hadoop

Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers

Pending
Overview
Eval results
Files

filesystem-configuration.mddocs/

FileSystem Configuration

The Flink GS FileSystem plugin provides comprehensive configuration management for integrating Google Cloud Storage with Flink applications. The system handles authentication, performance tuning, and filesystem creation through a factory pattern.

Capabilities

GSFileSystemFactory

Main factory class for creating and configuring GS filesystem instances. Registered automatically with Flink's plugin system.

/**
 * Implementation of the Flink FileSystemFactory interface for Google Storage.
 * Automatically registered via META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 */
public class GSFileSystemFactory implements FileSystemFactory {
    /** The scheme for the Google Storage file system */
    public static final String SCHEME = "gs";
    
    /** Constructs the Google Storage file system factory */
    public GSFileSystemFactory();
    
    /**
     * Configure the factory with Flink configuration
     * @param flinkConfig The Flink configuration
     */
    public void configure(Configuration flinkConfig);
    
    /**
     * Get the filesystem scheme
     * @return "gs"
     */
    public String getScheme();
    
    /**
     * Create a filesystem instance for the given URI
     * @param fsUri The filesystem URI (must have gs:// scheme)
     * @return GSFileSystem instance
     * @throws IOException If filesystem creation fails
     */
    public FileSystem create(URI fsUri) throws IOException;
}

Usage Example:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.gs.GSFileSystemFactory;

// Factory is automatically instantiated and configured by Flink
// Users typically don't interact with it directly
GSFileSystemFactory factory = new GSFileSystemFactory();
Configuration config = new Configuration();
config.setString("gs.auth.service.account.json.keyfile", "/path/to/key.json");
factory.configure(config);

URI gcsUri = URI.create("gs://my-bucket/path");
FileSystem fs = factory.create(gcsUri);

GSFileSystemOptions

Configuration options container that manages all filesystem and writer settings.

/**
 * Configuration options for the GS filesystem and recoverable writer
 */
public class GSFileSystemOptions {
    
    // Configuration option constants
    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME;
    public static final ConfigOption<MemorySize> WRITER_CHUNK_SIZE;
    public static final ConfigOption<Boolean> ENABLE_FILESINK_ENTROPY;
    public static final ConfigOption<Integer> GCS_HTTP_CONNECT_TIMEOUT;
    public static final ConfigOption<Integer> GCS_HTTP_READ_TIMEOUT;
    public static final ConfigOption<Integer> GCS_RETRY_MAX_ATTEMPT;
    public static final ConfigOption<Duration> GCS_RETRY_INIT_RPC_TIMEOUT;
    public static final ConfigOption<Double> GCS_RETRY_RPC_TIMEOUT_MULTIPLIER;
    public static final ConfigOption<Duration> GCS_RETRY_MAX_RPC_TIMEOUT;
    public static final ConfigOption<Duration> GCS_RETRY_TOTAL_TIMEOUT;
    
    /**
     * Constructs an options instance from Flink configuration
     * @param flinkConfig The Flink configuration
     */
    public GSFileSystemOptions(Configuration flinkConfig);
    
    /**
     * Get temporary bucket name for recoverable writes
     * @return Optional temporary bucket name
     */
    public Optional<String> getWriterTemporaryBucketName();
    
    /**
     * Get chunk size for writes to Google Storage
     * @return Optional chunk size (must be multiple of 256KB)
     */
    public Optional<MemorySize> getWriterChunkSize();
    
    /**
     * Check if entropy injection is enabled for FileSink paths
     * @return true if entropy injection is enabled
     */
    public Boolean isFileSinkEntropyEnabled();
    
    /**
     * Get HTTP connection timeout
     * @return Optional connection timeout in milliseconds
     */
    public Optional<Integer> getHTTPConnectionTimeout();
    
    /**
     * Get HTTP read timeout
     * @return Optional read timeout in milliseconds
     */
    public Optional<Integer> getHTTPReadTimeout();
    
    /**
     * Get maximum retry attempts for operations
     * @return Optional maximum attempts
     */
    public Optional<Integer> getMaxAttempts();
    
    /**
     * Get initial RPC timeout for retry operations
     * @return Optional initial timeout duration
     */
    public Optional<org.threeten.bp.Duration> getInitialRpcTimeout();
    
    /**
     * Get RPC timeout multiplier for retry backoff
     * @return Optional timeout multiplier
     */
    public Optional<Double> getRpcTimeoutMultiplier();
    
    /**
     * Get maximum RPC timeout for retry operations
     * @return Optional maximum RPC timeout duration
     */
    public Optional<org.threeten.bp.Duration> getMaxRpcTimeout();
    
    /**
     * Get total timeout for all retry operations
     * @return Optional total timeout duration
     */
    public Optional<org.threeten.bp.Duration> getTotalTimeout();
}

Configuration Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.fs.gs.GSFileSystemOptions;

// Create configuration
Configuration config = new Configuration();

// Writer configuration
config.setString("gs.writer.temporary.bucket.name", "temp-bucket");
config.set(GSFileSystemOptions.WRITER_CHUNK_SIZE, MemorySize.ofMebiBytes(8));
config.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, true);

// HTTP timeouts
config.setInteger("gs.http.connect-timeout", 30000);
config.setInteger("gs.http.read-timeout", 60000);

// Retry configuration
config.setInteger("gs.retry.max-attempt", 10);
config.setString("gs.retry.init-rpc-timeout", "5s");
config.setDouble("gs.retry.rpc-timeout-multiplier", 2.0);
config.setString("gs.retry.max-rpc-timeout", "60s");
config.setString("gs.retry.total-timeout", "300s");

// Create options instance
GSFileSystemOptions options = new GSFileSystemOptions(config);

GSFileSystem

Core filesystem implementation that extends Hadoop FileSystem with recoverable writer support.

/**
 * FileSystem implementation that wraps GoogleHadoopFileSystem and supports RecoverableWriter
 * Package-private - users interact through standard Flink FileSystem APIs
 */
class GSFileSystem extends HadoopFileSystem {
    
    /**
     * Create a recoverable writer for fault-tolerant streaming writes
     * @return GSRecoverableWriter instance
     */
    public RecoverableWriter createRecoverableWriter();
}

Usage Example:

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;

// Get filesystem instance (automatically created by factory)
Path gcsPath = new Path("gs://my-bucket/data/");
FileSystem fs = gcsPath.getFileSystem();

// Use recoverable writer for streaming applications
RecoverableWriter recoverableWriter = fs.createRecoverableWriter();
RecoverableFsDataOutputStream outputStream = recoverableWriter.open(
    new Path("gs://my-bucket/output/part-0")
);

Configuration Properties

Authentication Configuration

Configure authentication through Hadoop configuration properties:

# Enable service account authentication
google.cloud.auth.service.account.enable=true

# Path to service account JSON key file
google.cloud.auth.service.account.json.keyfile=/path/to/service-account.json

# Alternative: use environment variable GOOGLE_APPLICATION_CREDENTIALS

Writer Configuration

Configure recoverable writer behavior:

# Temporary bucket for multi-part uploads (optional)
gs.writer.temporary.bucket.name=my-temp-bucket

# Upload chunk size - must be multiple of 256KB
gs.writer.chunk.size=8MB

# Enable entropy injection to reduce hotspots
gs.filesink.entropy.enabled=true

Network Configuration

Configure HTTP client behavior:

# Connection timeout in milliseconds
gs.http.connect-timeout=30000

# Read timeout in milliseconds
gs.http.read-timeout=60000

Retry Configuration

Configure retry behavior for transient failures:

# Maximum number of retry attempts
gs.retry.max-attempt=10

# Initial RPC timeout
gs.retry.init-rpc-timeout=5s

# Timeout multiplier for exponential backoff
gs.retry.rpc-timeout-multiplier=2.0

# Maximum RPC timeout
gs.retry.max-rpc-timeout=60s

# Total timeout for all retries
gs.retry.total-timeout=300s

Integration with Flink

Service Discovery

The filesystem factory is automatically registered with Flink through the service provider interface:

META-INF/services/org.apache.flink.core.fs.FileSystemFactory

This file contains:

org.apache.flink.fs.gs.GSFileSystemFactory

Configuration Loading

The factory integrates with Flink's configuration system to load settings from:

  1. Flink Configuration: Properties with gs.* prefixes
  2. Hadoop Configuration: Properties with fs.gs.* prefixes from Hadoop config files
  3. Environment Variables: HADOOP_CONF_DIR, GOOGLE_APPLICATION_CREDENTIALS

Hadoop Integration

The implementation leverages Hadoop's configuration system and integrates with:

  • HadoopConfigLoader: For loading Hadoop-style configurations
  • GoogleHadoopFileSystem: Underlying GCS Hadoop connector
  • Configuration overlays: Flink config takes precedence over Hadoop config

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-gs-fs-hadoop

docs

filesystem-configuration.md

index.md

recoverable-writer.md

storage-operations.md

tile.json