CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gs-fs-hadoop

Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Flink GS FileSystem Hadoop

Flink GS FileSystem Hadoop provides a Google Cloud Storage (GCS) filesystem plugin for Apache Flink that enables reading from and writing to GCS buckets using the gs:// URI scheme. The plugin integrates seamlessly with Flink's FileSystem interface and provides fault-tolerant streaming through recoverable writers, making it ideal for checkpointing, state storage, and data processing workflows.

Package Information

  • Package Name: flink-gs-fs-hadoop
  • Package Type: Maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-gs-fs-hadoop
  • Language: Java
  • Version: 2.1.0
  • License: Apache License 2.0
  • Installation: Add to Maven dependencies or use Flink's plugin system

Core Usage

The filesystem is automatically registered with Flink through the service provider interface and can be used directly with gs:// URIs:

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

// FileSystem is automatically discovered and configured
Path gcsPath = new Path("gs://my-bucket/data/file.txt");
FileSystem fs = gcsPath.getFileSystem();

// Use for reading/writing files
FSDataInputStream inputStream = fs.open(gcsPath);
FSDataOutputStream outputStream = fs.create(gcsPath, WriteMode.OVERWRITE);

Basic Configuration

Configure the filesystem through Flink configuration using gs.* prefixes:

# Authentication via service account
fs.gs.auth.service.account.enable=true
fs.gs.auth.service.account.json.keyfile=/path/to/service-account.json

# Performance tuning
gs.writer.chunk.size=8MB
gs.filesink.entropy.enabled=true

# Retry configuration
gs.retry.max-attempt=10
gs.retry.total-timeout=300s

Architecture

The plugin is built on several key components:

  • GSFileSystemFactory: Main entry point that creates and configures filesystem instances
  • GSFileSystem: Core filesystem implementation extending Hadoop FileSystem with recoverable writer support
  • GSRecoverableWriter: Fault-tolerant writer system for streaming applications with exactly-once guarantees
  • GSBlobStorage: Abstraction layer over Google Cloud Storage operations
  • Configuration System: Comprehensive options for performance tuning, authentication, and retry behavior

The implementation leverages Google's Cloud Storage SDK and GCS Connector for Hadoop, providing enterprise-grade reliability and performance optimizations.

Capabilities

FileSystem Factory and Configuration

Core filesystem factory and configuration management for integrating GCS with Flink applications.

// Main factory class registered via META-INF services
public class GSFileSystemFactory implements FileSystemFactory {
    public static final String SCHEME = "gs";
    
    public void configure(Configuration flinkConfig);
    public String getScheme();
    public FileSystem create(URI fsUri) throws IOException;
}

// Configuration options container
public class GSFileSystemOptions {
    public Optional<String> getWriterTemporaryBucketName();
    public Optional<MemorySize> getWriterChunkSize();
    public Boolean isFileSinkEntropyEnabled();
    public Optional<Integer> getHTTPConnectionTimeout();
    public Optional<Integer> getHTTPReadTimeout();
    // ... retry configuration methods
}

FileSystem Configuration

Recoverable Writer System

Fault-tolerant streaming write system providing exactly-once guarantees for Flink streaming applications.

// Main recoverable writer interface
public class GSRecoverableWriter implements RecoverableWriter {
    public boolean requiresCleanupOfRecoverableState();
    public boolean supportsResume();
    public RecoverableFsDataOutputStream open(Path path) throws IOException;
    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
}

// State objects for recovery
public class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
    public final GSBlobIdentifier finalBlobIdentifier;
    public final List<UUID> componentObjectIds;
}

public class GSResumeRecoverable extends GSCommitRecoverable 
    implements RecoverableWriter.ResumeRecoverable {
    public final long position;
    public final boolean closed;
}

Recoverable Writer

Storage Operations

Low-level Google Cloud Storage operations abstraction for blob management and data operations.

// Storage abstraction interface
public interface GSBlobStorage {
    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
    void createBlob(GSBlobIdentifier blobIdentifier);
    Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
    List<GSBlobIdentifier> list(String bucketName, String prefix);
    void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
    void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
    List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}

// Blob identifier abstraction
public class GSBlobIdentifier {
    public final String bucketName;
    public final String objectName;
    
    public GSBlobIdentifier(String bucketName, String objectName);
    public BlobId getBlobId();
    public static GSBlobIdentifier fromBlobId(BlobId blobId);
}

Storage Operations

Configuration Options

All configuration options use Flink's Configuration system with gs.* prefixes:

OptionTypeDescription
gs.writer.temporary.bucket.nameStringBucket for temporary files during recoverable writes
gs.writer.chunk.sizeMemorySizeUpload chunk size (must be multiple of 256KB)
gs.filesink.entropy.enabledBooleanEnable entropy injection to reduce hotspots (default: false)
gs.http.connect-timeoutIntegerHTTP connection timeout (milliseconds)
gs.http.read-timeoutIntegerHTTP read timeout (milliseconds)
gs.retry.max-attemptIntegerMaximum retry attempts
gs.retry.init-rpc-timeoutDurationInitial RPC timeout
gs.retry.rpc-timeout-multiplierDoubleRPC timeout multiplier
gs.retry.max-rpc-timeoutDurationMaximum RPC timeout
gs.retry.total-timeoutDurationTotal timeout for retries

Common Use Cases

  • Checkpointing: Store Flink application checkpoints in GCS for fault tolerance
  • State Backend: Use GCS as a distributed state backend for large-state applications
  • Data Ingestion: Read data files from GCS buckets for batch and streaming processing
  • Data Output: Write processed results to GCS with exactly-once guarantees
  • File Sink: Use FileSink connector to write streaming data to GCS in various formats

Error Handling

The plugin handles various error scenarios:

  • Authentication Failures: Clear error messages for credential issues
  • Network Timeouts: Configurable retry policies with exponential backoff
  • Storage Errors: Proper exception propagation with context information
  • Recovery Scenarios: Automatic cleanup and recovery for interrupted operations
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-gs-fs-hadoop@2.1.x
Publish Source
CLI
Badge
tessl/maven-org-apache-flink--flink-gs-fs-hadoop badge