Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers
—
The Flink GS FileSystem plugin provides a comprehensive abstraction layer over Google Cloud Storage operations through the GSBlobStorage interface. This abstraction enables efficient blob management, batch operations, and low-level storage access while maintaining testability and clean separation of concerns.
Main abstraction interface for Google Cloud Storage operations providing all necessary blob management functionality.
/**
* Abstract blob storage interface for Google storage operations
* Provides clean abstraction over Google Cloud Storage SDK
*/
public interface GSBlobStorage {
/**
* Creates a write channel with the default chunk size
* @param blobIdentifier The blob identifier to which to write
* @return The WriteChannel helper for streaming writes
*/
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
/**
* Creates a write channel with the specified chunk size
* @param blobIdentifier The blob identifier to which to write
* @param chunkSize The chunk size, must be > 0 and multiple of 256KB
* @return The WriteChannel helper for streaming writes
*/
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
/**
* Create an empty blob
* @param blobIdentifier The blob to create
*/
void createBlob(GSBlobIdentifier blobIdentifier);
/**
* Gets blob metadata
* @param blobIdentifier The blob identifier
* @return The blob metadata, if the blob exists. Empty if the blob doesn't exist.
*/
Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
/**
* Lists all the blobs in a bucket matching a given prefix
* @param bucketName The bucket name
* @param prefix The object prefix
* @return The found blob identifiers
*/
List<GSBlobIdentifier> list(String bucketName, String prefix);
/**
* Copies from a source blob id to a target blob id. Does not delete the source blob.
* @param sourceBlobIdentifier The source blob identifier
* @param targetBlobIdentifier The target blob identifier
*/
void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
/**
* Composes multiple blobs into one. Does not delete any of the source blobs.
* @param sourceBlobIdentifiers The source blob identifiers to combine, max of 32
* @param targetBlobIdentifier The target blob identifier
*/
void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
/**
* Deletes blobs. Note that this does not fail if blobs don't exist.
* @param blobIdentifiers The blob identifiers to delete
* @return The results of each delete operation
*/
List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}Usage Example:
import org.apache.flink.fs.gs.storage.GSBlobStorage;
import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
import org.apache.flink.configuration.MemorySize;
// Get storage instance (typically through GSFileSystem)
GSBlobStorage storage = ...; // Obtained from filesystem implementation
// Create blob identifier
GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
// Write data to blob
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId, MemorySize.ofMebiBytes(8));
channel.write("Hello World".getBytes(), 0, 11);
channel.close();
// Get blob metadata
Optional<GSBlobStorage.BlobMetadata> metadata = storage.getMetadata(blobId);
if (metadata.isPresent()) {
String checksum = metadata.get().getChecksum();
System.out.println("Blob checksum: " + checksum);
}
// List blobs with prefix
List<GSBlobIdentifier> blobs = storage.list("my-bucket", "path/to/");
// Copy blob
GSBlobIdentifier targetId = new GSBlobIdentifier("my-bucket", "path/to/copy.txt");
storage.copy(blobId, targetId);
// Delete blobs
List<Boolean> results = storage.delete(Arrays.asList(blobId, targetId));Concrete implementation of GSBlobStorage using Google Cloud Storage client libraries.
/**
* Concrete implementation of the GSBlobStorage interface for Google Cloud Storage operations
* Uses Google Cloud Storage SDK internally
*/
public class GSBlobStorageImpl implements GSBlobStorage {
/**
* Construct blob storage implementation
* @param storage The Google Cloud Storage service instance
*/
public GSBlobStorageImpl(Storage storage);
// Implements all GSBlobStorage interface methods
public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
public void createBlob(GSBlobIdentifier blobIdentifier);
public Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
public List<GSBlobIdentifier> list(String bucketName, String prefix);
public void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
public void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
}Provides access to blob metadata information.
/**
* Abstract blob metadata interface
*/
public interface BlobMetadata {
/**
* The crc32c checksum for the blob
* @return The checksum in base64 format
*/
String getChecksum();
}Provides streaming write access to blobs.
/**
* Abstract blob write channel interface
*/
public interface WriteChannel {
/**
* Writes data to the channel
* @param content The data buffer
* @param start Start offset in the data buffer
* @param length Number of bytes to write
* @return The number of bytes written
* @throws IOException On underlying failure
*/
int write(byte[] content, int start, int length) throws IOException;
/**
* Closes the channel and commits the write
* @throws IOException On underlying failure
*/
void close() throws IOException;
}Usage Example:
// Write large data using streaming approach
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);
byte[] buffer = new byte[8192];
InputStream inputStream = new FileInputStream("large-file.dat");
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
int written = channel.write(buffer, 0, bytesRead);
assert written == bytesRead; // Should write all bytes
}
channel.close(); // Commits the write
inputStream.close();Abstraction for Google Cloud Storage blob identifiers providing clean separation from Google SDK types.
/**
* An abstraction for the Google BlobId type
* Provides clean separation from Google Cloud Storage SDK
*/
public class GSBlobIdentifier {
/** The bucket name */
public final String bucketName;
/** The object name, within the bucket */
public final String objectName;
/**
* Construct an abstract blob identifier
* @param bucketName The bucket name
* @param objectName The object name
*/
public GSBlobIdentifier(String bucketName, String objectName);
/**
* Get a Google blob id for this identifier, with generation=null
* @return The BlobId for use with Google Cloud Storage SDK
*/
public BlobId getBlobId();
/**
* Construct an abstract blob identifier from a Google BlobId
* @param blobId The Google BlobId
* @return The abstract blob identifier
*/
public static GSBlobIdentifier fromBlobId(BlobId blobId);
/**
* Standard equals method for identifier comparison
* @param o Object to compare
* @return true if identifiers are equal
*/
public boolean equals(Object o);
/**
* Standard hashCode method for hash-based collections
* @return Hash code for this identifier
*/
public int hashCode();
/**
* String representation of the identifier
* @return String representation showing bucket and object names
*/
public String toString();
}Usage Examples:
// Create blob identifier
GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
// Convert to Google SDK BlobId
BlobId googleBlobId = blobId.getBlobId();
// Create from Google SDK BlobId
BlobId existingBlobId = BlobId.of("another-bucket", "another/path");
GSBlobIdentifier convertedId = GSBlobIdentifier.fromBlobId(existingBlobId);
// Use in collections
Set<GSBlobIdentifier> blobSet = new HashSet<>();
blobSet.add(blobId);
blobSet.add(convertedId);
// Comparison
GSBlobIdentifier sameBlobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
assert blobId.equals(sameBlobId);Utility functions for blob operations and URI parsing.
/**
* Utility functions related to blobs
*/
public class BlobUtils {
/** The temporary object prefix */
private static final String TEMPORARY_OBJECT_PREFIX = ".inprogress";
/** The maximum number of blobs that can be composed in a single operation */
public static final int COMPOSE_MAX_BLOBS = 32;
/**
* Parses a blob id from a Google storage uri
* gs://bucket/foo/bar yields a blob with bucket name "bucket" and object name "foo/bar"
* @param uri The gs:// URI
* @return The blob identifier
* @throws IllegalArgumentException if URI format is invalid
*/
public static GSBlobIdentifier parseUri(URI uri);
/**
* Returns the temporary bucket name
* If options specifies a temporary bucket name, use that; otherwise, use the final bucket
* @param finalBlobIdentifier The final blob identifier
* @param options The file system options
* @return The temporary bucket name
*/
public static String getTemporaryBucketName(
GSBlobIdentifier finalBlobIdentifier, GSFileSystemOptions options);
/**
* Returns a temporary object partial name for organizing temporary files
* Format: .inprogress/bucket/object/ (with trailing slash)
* @param finalBlobIdentifier The final blob identifier
* @return The temporary object partial name
*/
public static String getTemporaryObjectPartialName(GSBlobIdentifier finalBlobIdentifier);
/**
* Returns a temporary object name by appending UUID to partial name
* Format: .inprogress/bucket/object/uuid
* @param finalBlobIdentifier The final blob identifier
* @param temporaryObjectId The UUID for this temporary object
* @return The complete temporary object name
*/
public static String getTemporaryObjectName(
GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);
/**
* Returns a temporary object name with entropy injection
* Format: uuid.inprogress/bucket/object/uuid (for hotspot reduction)
* @param finalBlobIdentifier The final blob identifier
* @param temporaryObjectId The UUID for this temporary object
* @return The complete temporary object name with entropy
*/
public static String getTemporaryObjectNameWithEntropy(
GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);
/**
* Resolves a temporary blob identifier for provided temporary object id and options
* @param finalBlobIdentifier The final blob identifier
* @param temporaryObjectId The UUID for this temporary object
* @param options The file system options
* @return The temporary blob identifier
*/
public static GSBlobIdentifier getTemporaryBlobIdentifier(
GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId, GSFileSystemOptions options);
}Usage Examples:
import java.net.URI;
import java.util.UUID;
// Parse GCS URI
URI gcsUri = URI.create("gs://my-bucket/data/input.txt");
GSBlobIdentifier blobId = BlobUtils.parseUri(gcsUri);
// Result: blobId.bucketName = "my-bucket", blobId.objectName = "data/input.txt"
// Get temporary bucket name
GSFileSystemOptions options = new GSFileSystemOptions(config);
String tempBucket = BlobUtils.getTemporaryBucketName(blobId, options);
// Generate temporary object names
String partialName = BlobUtils.getTemporaryObjectPartialName(blobId);
// Result: ".inprogress/my-bucket/data/input.txt/"
// Generate temporary object id
UUID tempId = UUID.randomUUID();
String tempObjectName = BlobUtils.getTemporaryObjectName(blobId, tempId);
// Result: ".inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"
// Or with entropy for hotspot reduction
String tempObjectNameWithEntropy = BlobUtils.getTemporaryObjectNameWithEntropy(blobId, tempId);
// Result: "550e8400-e29b-41d4-a716-446655440000.inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"Utilities for CRC32C checksum operations used by Google Cloud Storage.
/**
* Utility class for checksum operations, particularly CRC32C checksums used by Google Storage
*/
public class ChecksumUtils {
/** THe crc hash function used by Google storage */
public static final HashFunction CRC_HASH_FUNCTION = Hashing.crc32c();
/**
* Converts int CRC32 checksum to Google Storage's base64 string format
* @param checksum The integer checksum value
* @return Base64-encoded checksum string
*/
public static String convertChecksumToString(int checksum);
}Usage Example:
import com.google.common.hash.Hasher;
// Compute checksum for data
byte[] data = "Hello World".getBytes();
Hasher hasher = ChecksumUtils.CRC_HASH_FUNCTION.newHasher();
hasher.putBytes(data);
int checksum = hasher.hash().asInt();
// Convert to Google Storage format
String checksumString = ChecksumUtils.convertChecksumToString(checksum);
System.out.println("Checksum: " + checksumString);The compose operation allows combining up to 32 source blobs into a single target blob:
List<GSBlobIdentifier> sourceBlobs = Arrays.asList(
new GSBlobIdentifier("bucket", "part-1"),
new GSBlobIdentifier("bucket", "part-2"),
new GSBlobIdentifier("bucket", "part-3")
);
GSBlobIdentifier targetBlob = new GSBlobIdentifier("bucket", "combined-file");
// Compose all parts into single blob
storage.compose(sourceBlobs, targetBlob);
// Important: Source blobs are NOT deleted - must be cleaned up separately
storage.delete(sourceBlobs);Delete operations accept multiple blob identifiers and return individual results:
List<GSBlobIdentifier> blobsToDelete = Arrays.asList(
new GSBlobIdentifier("bucket", "temp-1"),
new GSBlobIdentifier("bucket", "temp-2"),
new GSBlobIdentifier("bucket", "temp-3")
);
// Delete all blobs - does not fail if some don't exist
List<Boolean> deleteResults = storage.delete(blobsToDelete);
// Check results
for (int i = 0; i < deleteResults.size(); i++) {
if (deleteResults.get(i)) {
System.out.println("Successfully deleted: " + blobsToDelete.get(i));
} else {
System.out.println("Failed to delete or didn't exist: " + blobsToDelete.get(i));
}
}The storage layer automatically validates checksums during write operations to ensure data integrity:
// Checksum validation happens automatically
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);
channel.write(data, 0, data.length);
channel.close(); // Validates checksum before completing writeInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gs-fs-hadoop