or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

filesystem-configuration.mdindex.mdrecoverable-writer.mdstorage-operations.md
tile.json

tessl/maven-org-apache-flink--flink-gs-fs-hadoop

Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-gs-fs-hadoop@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-gs-fs-hadoop@2.1.0

index.mddocs/

Flink GS FileSystem Hadoop

Flink GS FileSystem Hadoop provides a Google Cloud Storage (GCS) filesystem plugin for Apache Flink that enables reading from and writing to GCS buckets using the gs:// URI scheme. The plugin integrates seamlessly with Flink's FileSystem interface and provides fault-tolerant streaming through recoverable writers, making it ideal for checkpointing, state storage, and data processing workflows.

Package Information

  • Package Name: flink-gs-fs-hadoop
  • Package Type: Maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-gs-fs-hadoop
  • Language: Java
  • Version: 2.1.0
  • License: Apache License 2.0
  • Installation: Add to Maven dependencies or use Flink's plugin system

Core Usage

The filesystem is automatically registered with Flink through the service provider interface and can be used directly with gs:// URIs:

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

// FileSystem is automatically discovered and configured
Path gcsPath = new Path("gs://my-bucket/data/file.txt");
FileSystem fs = gcsPath.getFileSystem();

// Use for reading/writing files
FSDataInputStream inputStream = fs.open(gcsPath);
FSDataOutputStream outputStream = fs.create(gcsPath, WriteMode.OVERWRITE);

Basic Configuration

Configure the filesystem through Flink configuration using gs.* prefixes:

# Authentication via service account
fs.gs.auth.service.account.enable=true
fs.gs.auth.service.account.json.keyfile=/path/to/service-account.json

# Performance tuning
gs.writer.chunk.size=8MB
gs.filesink.entropy.enabled=true

# Retry configuration
gs.retry.max-attempt=10
gs.retry.total-timeout=300s

Architecture

The plugin is built on several key components:

  • GSFileSystemFactory: Main entry point that creates and configures filesystem instances
  • GSFileSystem: Core filesystem implementation extending Hadoop FileSystem with recoverable writer support
  • GSRecoverableWriter: Fault-tolerant writer system for streaming applications with exactly-once guarantees
  • GSBlobStorage: Abstraction layer over Google Cloud Storage operations
  • Configuration System: Comprehensive options for performance tuning, authentication, and retry behavior

The implementation leverages Google's Cloud Storage SDK and GCS Connector for Hadoop, providing enterprise-grade reliability and performance optimizations.

Capabilities

FileSystem Factory and Configuration

Core filesystem factory and configuration management for integrating GCS with Flink applications.

// Main factory class registered via META-INF services
public class GSFileSystemFactory implements FileSystemFactory {
    public static final String SCHEME = "gs";
    
    public void configure(Configuration flinkConfig);
    public String getScheme();
    public FileSystem create(URI fsUri) throws IOException;
}

// Configuration options container
public class GSFileSystemOptions {
    public Optional<String> getWriterTemporaryBucketName();
    public Optional<MemorySize> getWriterChunkSize();
    public Boolean isFileSinkEntropyEnabled();
    public Optional<Integer> getHTTPConnectionTimeout();
    public Optional<Integer> getHTTPReadTimeout();
    // ... retry configuration methods
}

FileSystem Configuration

Recoverable Writer System

Fault-tolerant streaming write system providing exactly-once guarantees for Flink streaming applications.

// Main recoverable writer interface
public class GSRecoverableWriter implements RecoverableWriter {
    public boolean requiresCleanupOfRecoverableState();
    public boolean supportsResume();
    public RecoverableFsDataOutputStream open(Path path) throws IOException;
    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
}

// State objects for recovery
public class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
    public final GSBlobIdentifier finalBlobIdentifier;
    public final List<UUID> componentObjectIds;
}

public class GSResumeRecoverable extends GSCommitRecoverable 
    implements RecoverableWriter.ResumeRecoverable {
    public final long position;
    public final boolean closed;
}

Recoverable Writer

Storage Operations

Low-level Google Cloud Storage operations abstraction for blob management and data operations.

// Storage abstraction interface
public interface GSBlobStorage {
    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
    void createBlob(GSBlobIdentifier blobIdentifier);
    Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
    List<GSBlobIdentifier> list(String bucketName, String prefix);
    void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
    void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
    List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}

// Blob identifier abstraction
public class GSBlobIdentifier {
    public final String bucketName;
    public final String objectName;
    
    public GSBlobIdentifier(String bucketName, String objectName);
    public BlobId getBlobId();
    public static GSBlobIdentifier fromBlobId(BlobId blobId);
}

Storage Operations

Configuration Options

All configuration options use Flink's Configuration system with gs.* prefixes:

OptionTypeDescription
gs.writer.temporary.bucket.nameStringBucket for temporary files during recoverable writes
gs.writer.chunk.sizeMemorySizeUpload chunk size (must be multiple of 256KB)
gs.filesink.entropy.enabledBooleanEnable entropy injection to reduce hotspots (default: false)
gs.http.connect-timeoutIntegerHTTP connection timeout (milliseconds)
gs.http.read-timeoutIntegerHTTP read timeout (milliseconds)
gs.retry.max-attemptIntegerMaximum retry attempts
gs.retry.init-rpc-timeoutDurationInitial RPC timeout
gs.retry.rpc-timeout-multiplierDoubleRPC timeout multiplier
gs.retry.max-rpc-timeoutDurationMaximum RPC timeout
gs.retry.total-timeoutDurationTotal timeout for retries

Common Use Cases

  • Checkpointing: Store Flink application checkpoints in GCS for fault tolerance
  • State Backend: Use GCS as a distributed state backend for large-state applications
  • Data Ingestion: Read data files from GCS buckets for batch and streaming processing
  • Data Output: Write processed results to GCS with exactly-once guarantees
  • File Sink: Use FileSink connector to write streaming data to GCS in various formats

Error Handling

The plugin handles various error scenarios:

  • Authentication Failures: Clear error messages for credential issues
  • Network Timeouts: Configurable retry policies with exponential backoff
  • Storage Errors: Proper exception propagation with context information
  • Recovery Scenarios: Automatic cleanup and recovery for interrupted operations