CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gs-fs-hadoop

Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers

Pending
Overview
Eval results
Files

storage-operations.mddocs/

Storage Operations

The Flink GS FileSystem plugin provides a comprehensive abstraction layer over Google Cloud Storage operations through the GSBlobStorage interface. This abstraction enables efficient blob management, batch operations, and low-level storage access while maintaining testability and clean separation of concerns.

Capabilities

GSBlobStorage Interface

Main abstraction interface for Google Cloud Storage operations providing all necessary blob management functionality.

/**
 * Abstract blob storage interface for Google storage operations
 * Provides clean abstraction over Google Cloud Storage SDK
 */
public interface GSBlobStorage {
    
    /**
     * Creates a write channel with the default chunk size
     * @param blobIdentifier The blob identifier to which to write
     * @return The WriteChannel helper for streaming writes
     */
    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
    
    /**
     * Creates a write channel with the specified chunk size
     * @param blobIdentifier The blob identifier to which to write
     * @param chunkSize The chunk size, must be > 0 and multiple of 256KB
     * @return The WriteChannel helper for streaming writes
     */
    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
    
    /**
     * Create an empty blob
     * @param blobIdentifier The blob to create
     */
    void createBlob(GSBlobIdentifier blobIdentifier);
    
    /**
     * Gets blob metadata
     * @param blobIdentifier The blob identifier
     * @return The blob metadata, if the blob exists. Empty if the blob doesn't exist.
     */
    Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
    
    /**
     * Lists all the blobs in a bucket matching a given prefix
     * @param bucketName The bucket name
     * @param prefix The object prefix
     * @return The found blob identifiers
     */
    List<GSBlobIdentifier> list(String bucketName, String prefix);
    
    /**
     * Copies from a source blob id to a target blob id. Does not delete the source blob.
     * @param sourceBlobIdentifier The source blob identifier
     * @param targetBlobIdentifier The target blob identifier
     */
    void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
    
    /**
     * Composes multiple blobs into one. Does not delete any of the source blobs.
     * @param sourceBlobIdentifiers The source blob identifiers to combine, max of 32
     * @param targetBlobIdentifier The target blob identifier
     */
    void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
    
    /**
     * Deletes blobs. Note that this does not fail if blobs don't exist.
     * @param blobIdentifiers The blob identifiers to delete
     * @return The results of each delete operation
     */
    List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}

Usage Example:

import org.apache.flink.fs.gs.storage.GSBlobStorage;
import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
import org.apache.flink.configuration.MemorySize;

// Get storage instance (typically through GSFileSystem)
GSBlobStorage storage = ...; // Obtained from filesystem implementation

// Create blob identifier
GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");

// Write data to blob
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId, MemorySize.ofMebiBytes(8));
channel.write("Hello World".getBytes(), 0, 11);
channel.close();

// Get blob metadata
Optional<GSBlobStorage.BlobMetadata> metadata = storage.getMetadata(blobId);
if (metadata.isPresent()) {
    String checksum = metadata.get().getChecksum();
    System.out.println("Blob checksum: " + checksum);
}

// List blobs with prefix
List<GSBlobIdentifier> blobs = storage.list("my-bucket", "path/to/");

// Copy blob
GSBlobIdentifier targetId = new GSBlobIdentifier("my-bucket", "path/to/copy.txt");
storage.copy(blobId, targetId);

// Delete blobs
List<Boolean> results = storage.delete(Arrays.asList(blobId, targetId));

GSBlobStorageImpl

Concrete implementation of GSBlobStorage using Google Cloud Storage client libraries.

/**
 * Concrete implementation of the GSBlobStorage interface for Google Cloud Storage operations
 * Uses Google Cloud Storage SDK internally
 */
public class GSBlobStorageImpl implements GSBlobStorage {
    
    /**
     * Construct blob storage implementation
     * @param storage The Google Cloud Storage service instance
     */
    public GSBlobStorageImpl(Storage storage);
    
    // Implements all GSBlobStorage interface methods
    public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
    public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
    public void createBlob(GSBlobIdentifier blobIdentifier);
    public Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
    public List<GSBlobIdentifier> list(String bucketName, String prefix);
    public void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
    public void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
    public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}

Nested Interfaces

BlobMetadata Interface

Provides access to blob metadata information.

/**
 * Abstract blob metadata interface
 */
public interface BlobMetadata {
    /**
     * The crc32c checksum for the blob
     * @return The checksum in base64 format
     */
    String getChecksum();
}

WriteChannel Interface

Provides streaming write access to blobs.

/**
 * Abstract blob write channel interface
 */
public interface WriteChannel {
    /**
     * Writes data to the channel
     * @param content The data buffer
     * @param start Start offset in the data buffer
     * @param length Number of bytes to write
     * @return The number of bytes written
     * @throws IOException On underlying failure
     */
    int write(byte[] content, int start, int length) throws IOException;
    
    /**
     * Closes the channel and commits the write
     * @throws IOException On underlying failure
     */
    void close() throws IOException;
}

Usage Example:

// Write large data using streaming approach
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);

byte[] buffer = new byte[8192];
InputStream inputStream = new FileInputStream("large-file.dat");
int bytesRead;

while ((bytesRead = inputStream.read(buffer)) != -1) {
    int written = channel.write(buffer, 0, bytesRead);
    assert written == bytesRead; // Should write all bytes
}

channel.close(); // Commits the write
inputStream.close();

Core Data Types

GSBlobIdentifier

Abstraction for Google Cloud Storage blob identifiers providing clean separation from Google SDK types.

/**
 * An abstraction for the Google BlobId type
 * Provides clean separation from Google Cloud Storage SDK
 */
public class GSBlobIdentifier {
    /** The bucket name */
    public final String bucketName;
    
    /** The object name, within the bucket */
    public final String objectName;
    
    /**
     * Construct an abstract blob identifier
     * @param bucketName The bucket name
     * @param objectName The object name
     */
    public GSBlobIdentifier(String bucketName, String objectName);
    
    /**
     * Get a Google blob id for this identifier, with generation=null
     * @return The BlobId for use with Google Cloud Storage SDK
     */
    public BlobId getBlobId();
    
    /**
     * Construct an abstract blob identifier from a Google BlobId
     * @param blobId The Google BlobId
     * @return The abstract blob identifier
     */
    public static GSBlobIdentifier fromBlobId(BlobId blobId);
    
    /**
     * Standard equals method for identifier comparison
     * @param o Object to compare
     * @return true if identifiers are equal
     */
    public boolean equals(Object o);
    
    /**
     * Standard hashCode method for hash-based collections
     * @return Hash code for this identifier
     */
    public int hashCode();
    
    /**
     * String representation of the identifier
     * @return String representation showing bucket and object names
     */
    public String toString();
}

Usage Examples:

// Create blob identifier
GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");

// Convert to Google SDK BlobId
BlobId googleBlobId = blobId.getBlobId();

// Create from Google SDK BlobId
BlobId existingBlobId = BlobId.of("another-bucket", "another/path");
GSBlobIdentifier convertedId = GSBlobIdentifier.fromBlobId(existingBlobId);

// Use in collections
Set<GSBlobIdentifier> blobSet = new HashSet<>();
blobSet.add(blobId);
blobSet.add(convertedId);

// Comparison
GSBlobIdentifier sameBlobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
assert blobId.equals(sameBlobId);

Utility Classes

BlobUtils

Utility functions for blob operations and URI parsing.

/**
 * Utility functions related to blobs
 */
public class BlobUtils {
    /** The temporary object prefix */
    private static final String TEMPORARY_OBJECT_PREFIX = ".inprogress";
    
    /** The maximum number of blobs that can be composed in a single operation */
    public static final int COMPOSE_MAX_BLOBS = 32;
    
    /**
     * Parses a blob id from a Google storage uri
     * gs://bucket/foo/bar yields a blob with bucket name "bucket" and object name "foo/bar"
     * @param uri The gs:// URI
     * @return The blob identifier
     * @throws IllegalArgumentException if URI format is invalid
     */
    public static GSBlobIdentifier parseUri(URI uri);
    
    /**
     * Returns the temporary bucket name
     * If options specifies a temporary bucket name, use that; otherwise, use the final bucket
     * @param finalBlobIdentifier The final blob identifier
     * @param options The file system options
     * @return The temporary bucket name
     */
    public static String getTemporaryBucketName(
            GSBlobIdentifier finalBlobIdentifier, GSFileSystemOptions options);
    
    /**
     * Returns a temporary object partial name for organizing temporary files
     * Format: .inprogress/bucket/object/ (with trailing slash)
     * @param finalBlobIdentifier The final blob identifier
     * @return The temporary object partial name
     */
    public static String getTemporaryObjectPartialName(GSBlobIdentifier finalBlobIdentifier);
    
    /**
     * Returns a temporary object name by appending UUID to partial name
     * Format: .inprogress/bucket/object/uuid
     * @param finalBlobIdentifier The final blob identifier
     * @param temporaryObjectId The UUID for this temporary object
     * @return The complete temporary object name
     */
    public static String getTemporaryObjectName(
            GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);
    
    /**
     * Returns a temporary object name with entropy injection
     * Format: uuid.inprogress/bucket/object/uuid (for hotspot reduction)
     * @param finalBlobIdentifier The final blob identifier
     * @param temporaryObjectId The UUID for this temporary object
     * @return The complete temporary object name with entropy
     */
    public static String getTemporaryObjectNameWithEntropy(
            GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);
    
    /**
     * Resolves a temporary blob identifier for provided temporary object id and options
     * @param finalBlobIdentifier The final blob identifier
     * @param temporaryObjectId The UUID for this temporary object
     * @param options The file system options
     * @return The temporary blob identifier
     */
    public static GSBlobIdentifier getTemporaryBlobIdentifier(
            GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId, GSFileSystemOptions options);
}

Usage Examples:

import java.net.URI;
import java.util.UUID;

// Parse GCS URI
URI gcsUri = URI.create("gs://my-bucket/data/input.txt");
GSBlobIdentifier blobId = BlobUtils.parseUri(gcsUri);
// Result: blobId.bucketName = "my-bucket", blobId.objectName = "data/input.txt"

// Get temporary bucket name
GSFileSystemOptions options = new GSFileSystemOptions(config);
String tempBucket = BlobUtils.getTemporaryBucketName(blobId, options);

// Generate temporary object names
String partialName = BlobUtils.getTemporaryObjectPartialName(blobId);
// Result: ".inprogress/my-bucket/data/input.txt/"

// Generate temporary object id
UUID tempId = UUID.randomUUID();
String tempObjectName = BlobUtils.getTemporaryObjectName(blobId, tempId);
// Result: ".inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"

// Or with entropy for hotspot reduction
String tempObjectNameWithEntropy = BlobUtils.getTemporaryObjectNameWithEntropy(blobId, tempId);
// Result: "550e8400-e29b-41d4-a716-446655440000.inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"

ChecksumUtils

Utilities for CRC32C checksum operations used by Google Cloud Storage.

/**
 * Utility class for checksum operations, particularly CRC32C checksums used by Google Storage
 */
public class ChecksumUtils {
    /** THe crc hash function used by Google storage */
    public static final HashFunction CRC_HASH_FUNCTION = Hashing.crc32c();
    
    /**
     * Converts int CRC32 checksum to Google Storage's base64 string format
     * @param checksum The integer checksum value
     * @return Base64-encoded checksum string
     */
    public static String convertChecksumToString(int checksum);
}

Usage Example:

import com.google.common.hash.Hasher;

// Compute checksum for data
byte[] data = "Hello World".getBytes();
Hasher hasher = ChecksumUtils.CRC_HASH_FUNCTION.newHasher();
hasher.putBytes(data);
int checksum = hasher.hash().asInt();

// Convert to Google Storage format
String checksumString = ChecksumUtils.convertChecksumToString(checksum);
System.out.println("Checksum: " + checksumString);

Batch Operations

Composition Operations

The compose operation allows combining up to 32 source blobs into a single target blob:

List<GSBlobIdentifier> sourceBlobs = Arrays.asList(
    new GSBlobIdentifier("bucket", "part-1"),
    new GSBlobIdentifier("bucket", "part-2"),
    new GSBlobIdentifier("bucket", "part-3")
);

GSBlobIdentifier targetBlob = new GSBlobIdentifier("bucket", "combined-file");

// Compose all parts into single blob
storage.compose(sourceBlobs, targetBlob);

// Important: Source blobs are NOT deleted - must be cleaned up separately
storage.delete(sourceBlobs);

Batch Delete Operations

Delete operations accept multiple blob identifiers and return individual results:

List<GSBlobIdentifier> blobsToDelete = Arrays.asList(
    new GSBlobIdentifier("bucket", "temp-1"),
    new GSBlobIdentifier("bucket", "temp-2"),
    new GSBlobIdentifier("bucket", "temp-3")
);

// Delete all blobs - does not fail if some don't exist
List<Boolean> deleteResults = storage.delete(blobsToDelete);

// Check results
for (int i = 0; i < deleteResults.size(); i++) {
    if (deleteResults.get(i)) {
        System.out.println("Successfully deleted: " + blobsToDelete.get(i));
    } else {
        System.out.println("Failed to delete or didn't exist: " + blobsToDelete.get(i));
    }
}

Error Handling

Common Exceptions

  • IOException: Network failures, authentication issues, storage errors
  • IllegalArgumentException: Invalid blob identifiers, malformed URIs
  • StorageException: Google Cloud Storage specific errors (wrapped in IOException)

Best Practices

  • Retry Logic: Use Flink's retry configuration for transient failures
  • Batch Operations: Prefer batch delete over individual operations
  • Resource Cleanup: Always close WriteChannel instances
  • Temporary Object Management: Clean up temporary objects after successful operations

Checksum Validation

The storage layer automatically validates checksums during write operations to ensure data integrity:

// Checksum validation happens automatically
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);
channel.write(data, 0, data.length);
channel.close(); // Validates checksum before completing write

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-gs-fs-hadoop

docs

filesystem-configuration.md

index.md

recoverable-writer.md

storage-operations.md

tile.json