0
# Storage Operations
1
2
The Flink GS FileSystem plugin provides a comprehensive abstraction layer over Google Cloud Storage operations through the GSBlobStorage interface. This abstraction enables efficient blob management, batch operations, and low-level storage access while maintaining testability and clean separation of concerns.
3
4
## Capabilities
5
6
### GSBlobStorage Interface
7
8
Main abstraction interface for Google Cloud Storage operations providing all necessary blob management functionality.
9
10
```java { .api }
11
/**
12
* Abstract blob storage interface for Google storage operations
13
* Provides clean abstraction over Google Cloud Storage SDK
14
*/
15
public interface GSBlobStorage {
16
17
/**
18
* Creates a write channel with the default chunk size
19
* @param blobIdentifier The blob identifier to which to write
20
* @return The WriteChannel helper for streaming writes
21
*/
22
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
23
24
/**
25
* Creates a write channel with the specified chunk size
26
* @param blobIdentifier The blob identifier to which to write
27
* @param chunkSize The chunk size, must be > 0 and multiple of 256KB
28
* @return The WriteChannel helper for streaming writes
29
*/
30
WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
31
32
/**
33
* Create an empty blob
34
* @param blobIdentifier The blob to create
35
*/
36
void createBlob(GSBlobIdentifier blobIdentifier);
37
38
/**
39
* Gets blob metadata
40
* @param blobIdentifier The blob identifier
41
* @return The blob metadata, if the blob exists. Empty if the blob doesn't exist.
42
*/
43
Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
44
45
/**
46
* Lists all the blobs in a bucket matching a given prefix
47
* @param bucketName The bucket name
48
* @param prefix The object prefix
49
* @return The found blob identifiers
50
*/
51
List<GSBlobIdentifier> list(String bucketName, String prefix);
52
53
/**
54
* Copies from a source blob id to a target blob id. Does not delete the source blob.
55
* @param sourceBlobIdentifier The source blob identifier
56
* @param targetBlobIdentifier The target blob identifier
57
*/
58
void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
59
60
/**
61
* Composes multiple blobs into one. Does not delete any of the source blobs.
62
* @param sourceBlobIdentifiers The source blob identifiers to combine, max of 32
63
* @param targetBlobIdentifier The target blob identifier
64
*/
65
void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
66
67
/**
68
* Deletes blobs. Note that this does not fail if blobs don't exist.
69
* @param blobIdentifiers The blob identifiers to delete
70
* @return The results of each delete operation
71
*/
72
List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
73
}
74
```
75
76
**Usage Example:**
77
78
```java
79
import org.apache.flink.fs.gs.storage.GSBlobStorage;
80
import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
81
import org.apache.flink.configuration.MemorySize;
82
83
// Get storage instance (typically through GSFileSystem)
84
GSBlobStorage storage = ...; // Obtained from filesystem implementation
85
86
// Create blob identifier
87
GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
88
89
// Write data to blob
90
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId, MemorySize.ofMebiBytes(8));
91
channel.write("Hello World".getBytes(), 0, 11);
92
channel.close();
93
94
// Get blob metadata
95
Optional<GSBlobStorage.BlobMetadata> metadata = storage.getMetadata(blobId);
96
if (metadata.isPresent()) {
97
String checksum = metadata.get().getChecksum();
98
System.out.println("Blob checksum: " + checksum);
99
}
100
101
// List blobs with prefix
102
List<GSBlobIdentifier> blobs = storage.list("my-bucket", "path/to/");
103
104
// Copy blob
105
GSBlobIdentifier targetId = new GSBlobIdentifier("my-bucket", "path/to/copy.txt");
106
storage.copy(blobId, targetId);
107
108
// Delete blobs
109
List<Boolean> results = storage.delete(Arrays.asList(blobId, targetId));
110
```
111
112
### GSBlobStorageImpl
113
114
Concrete implementation of GSBlobStorage using Google Cloud Storage client libraries.
115
116
```java { .api }
117
/**
118
* Concrete implementation of the GSBlobStorage interface for Google Cloud Storage operations
119
* Uses Google Cloud Storage SDK internally
120
*/
121
public class GSBlobStorageImpl implements GSBlobStorage {
122
123
/**
124
* Construct blob storage implementation
125
* @param storage The Google Cloud Storage service instance
126
*/
127
public GSBlobStorageImpl(Storage storage);
128
129
// Implements all GSBlobStorage interface methods
130
public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
131
public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);
132
public void createBlob(GSBlobIdentifier blobIdentifier);
133
public Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
134
public List<GSBlobIdentifier> list(String bucketName, String prefix);
135
public void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);
136
public void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);
137
public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
138
}
139
```
140
141
## Nested Interfaces
142
143
### BlobMetadata Interface
144
145
Provides access to blob metadata information.
146
147
```java { .api }
148
/**
149
* Abstract blob metadata interface
150
*/
151
public interface BlobMetadata {
152
/**
153
* The crc32c checksum for the blob
154
* @return The checksum in base64 format
155
*/
156
String getChecksum();
157
}
158
```
159
160
### WriteChannel Interface
161
162
Provides streaming write access to blobs.
163
164
```java { .api }
165
/**
166
* Abstract blob write channel interface
167
*/
168
public interface WriteChannel {
169
/**
170
* Writes data to the channel
171
* @param content The data buffer
172
* @param start Start offset in the data buffer
173
* @param length Number of bytes to write
174
* @return The number of bytes written
175
* @throws IOException On underlying failure
176
*/
177
int write(byte[] content, int start, int length) throws IOException;
178
179
/**
180
* Closes the channel and commits the write
181
* @throws IOException On underlying failure
182
*/
183
void close() throws IOException;
184
}
185
```
186
187
**Usage Example:**
188
189
```java
190
// Write large data using streaming approach
191
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);
192
193
byte[] buffer = new byte[8192];
194
InputStream inputStream = new FileInputStream("large-file.dat");
195
int bytesRead;
196
197
while ((bytesRead = inputStream.read(buffer)) != -1) {
198
int written = channel.write(buffer, 0, bytesRead);
199
assert written == bytesRead; // Should write all bytes
200
}
201
202
channel.close(); // Commits the write
203
inputStream.close();
204
```
205
206
## Core Data Types
207
208
### GSBlobIdentifier
209
210
Abstraction for Google Cloud Storage blob identifiers providing clean separation from Google SDK types.
211
212
```java { .api }
213
/**
214
* An abstraction for the Google BlobId type
215
* Provides clean separation from Google Cloud Storage SDK
216
*/
217
public class GSBlobIdentifier {
218
/** The bucket name */
219
public final String bucketName;
220
221
/** The object name, within the bucket */
222
public final String objectName;
223
224
/**
225
* Construct an abstract blob identifier
226
* @param bucketName The bucket name
227
* @param objectName The object name
228
*/
229
public GSBlobIdentifier(String bucketName, String objectName);
230
231
/**
232
* Get a Google blob id for this identifier, with generation=null
233
* @return The BlobId for use with Google Cloud Storage SDK
234
*/
235
public BlobId getBlobId();
236
237
/**
238
* Construct an abstract blob identifier from a Google BlobId
239
* @param blobId The Google BlobId
240
* @return The abstract blob identifier
241
*/
242
public static GSBlobIdentifier fromBlobId(BlobId blobId);
243
244
/**
245
* Standard equals method for identifier comparison
246
* @param o Object to compare
247
* @return true if identifiers are equal
248
*/
249
public boolean equals(Object o);
250
251
/**
252
* Standard hashCode method for hash-based collections
253
* @return Hash code for this identifier
254
*/
255
public int hashCode();
256
257
/**
258
* String representation of the identifier
259
* @return String representation showing bucket and object names
260
*/
261
public String toString();
262
}
263
```
264
265
**Usage Examples:**
266
267
```java
268
// Create blob identifier
269
GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
270
271
// Convert to Google SDK BlobId
272
BlobId googleBlobId = blobId.getBlobId();
273
274
// Create from Google SDK BlobId
275
BlobId existingBlobId = BlobId.of("another-bucket", "another/path");
276
GSBlobIdentifier convertedId = GSBlobIdentifier.fromBlobId(existingBlobId);
277
278
// Use in collections
279
Set<GSBlobIdentifier> blobSet = new HashSet<>();
280
blobSet.add(blobId);
281
blobSet.add(convertedId);
282
283
// Comparison
284
GSBlobIdentifier sameBlobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");
285
assert blobId.equals(sameBlobId);
286
```
287
288
## Utility Classes
289
290
### BlobUtils
291
292
Utility functions for blob operations and URI parsing.
293
294
```java { .api }
295
/**
296
* Utility functions related to blobs
297
*/
298
public class BlobUtils {
299
/** The temporary object prefix */
300
private static final String TEMPORARY_OBJECT_PREFIX = ".inprogress";
301
302
/** The maximum number of blobs that can be composed in a single operation */
303
public static final int COMPOSE_MAX_BLOBS = 32;
304
305
/**
306
* Parses a blob id from a Google storage uri
307
* gs://bucket/foo/bar yields a blob with bucket name "bucket" and object name "foo/bar"
308
* @param uri The gs:// URI
309
* @return The blob identifier
310
* @throws IllegalArgumentException if URI format is invalid
311
*/
312
public static GSBlobIdentifier parseUri(URI uri);
313
314
/**
315
* Returns the temporary bucket name
316
* If options specifies a temporary bucket name, use that; otherwise, use the final bucket
317
* @param finalBlobIdentifier The final blob identifier
318
* @param options The file system options
319
* @return The temporary bucket name
320
*/
321
public static String getTemporaryBucketName(
322
GSBlobIdentifier finalBlobIdentifier, GSFileSystemOptions options);
323
324
/**
325
* Returns a temporary object partial name for organizing temporary files
326
* Format: .inprogress/bucket/object/ (with trailing slash)
327
* @param finalBlobIdentifier The final blob identifier
328
* @return The temporary object partial name
329
*/
330
public static String getTemporaryObjectPartialName(GSBlobIdentifier finalBlobIdentifier);
331
332
/**
333
* Returns a temporary object name by appending UUID to partial name
334
* Format: .inprogress/bucket/object/uuid
335
* @param finalBlobIdentifier The final blob identifier
336
* @param temporaryObjectId The UUID for this temporary object
337
* @return The complete temporary object name
338
*/
339
public static String getTemporaryObjectName(
340
GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);
341
342
/**
343
* Returns a temporary object name with entropy injection
344
* Format: uuid.inprogress/bucket/object/uuid (for hotspot reduction)
345
* @param finalBlobIdentifier The final blob identifier
346
* @param temporaryObjectId The UUID for this temporary object
347
* @return The complete temporary object name with entropy
348
*/
349
public static String getTemporaryObjectNameWithEntropy(
350
GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);
351
352
/**
353
* Resolves a temporary blob identifier for provided temporary object id and options
354
* @param finalBlobIdentifier The final blob identifier
355
* @param temporaryObjectId The UUID for this temporary object
356
* @param options The file system options
357
* @return The temporary blob identifier
358
*/
359
public static GSBlobIdentifier getTemporaryBlobIdentifier(
360
GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId, GSFileSystemOptions options);
361
}
362
```
363
364
**Usage Examples:**
365
366
```java
367
import java.net.URI;
368
import java.util.UUID;
369
370
// Parse GCS URI
371
URI gcsUri = URI.create("gs://my-bucket/data/input.txt");
372
GSBlobIdentifier blobId = BlobUtils.parseUri(gcsUri);
373
// Result: blobId.bucketName = "my-bucket", blobId.objectName = "data/input.txt"
374
375
// Get temporary bucket name
376
GSFileSystemOptions options = new GSFileSystemOptions(config);
377
String tempBucket = BlobUtils.getTemporaryBucketName(blobId, options);
378
379
// Generate temporary object names
380
String partialName = BlobUtils.getTemporaryObjectPartialName(blobId);
381
// Result: ".inprogress/my-bucket/data/input.txt/"
382
383
// Generate temporary object id
384
UUID tempId = UUID.randomUUID();
385
String tempObjectName = BlobUtils.getTemporaryObjectName(blobId, tempId);
386
// Result: ".inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"
387
388
// Or with entropy for hotspot reduction
389
String tempObjectNameWithEntropy = BlobUtils.getTemporaryObjectNameWithEntropy(blobId, tempId);
390
// Result: "550e8400-e29b-41d4-a716-446655440000.inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"
391
```
392
393
### ChecksumUtils
394
395
Utilities for CRC32C checksum operations used by Google Cloud Storage.
396
397
```java { .api }
398
/**
399
* Utility class for checksum operations, particularly CRC32C checksums used by Google Storage
400
*/
401
public class ChecksumUtils {
402
/** THe crc hash function used by Google storage */
403
public static final HashFunction CRC_HASH_FUNCTION = Hashing.crc32c();
404
405
/**
406
* Converts int CRC32 checksum to Google Storage's base64 string format
407
* @param checksum The integer checksum value
408
* @return Base64-encoded checksum string
409
*/
410
public static String convertChecksumToString(int checksum);
411
}
412
```
413
414
**Usage Example:**
415
416
```java
417
import com.google.common.hash.Hasher;
418
419
// Compute checksum for data
420
byte[] data = "Hello World".getBytes();
421
Hasher hasher = ChecksumUtils.CRC_HASH_FUNCTION.newHasher();
422
hasher.putBytes(data);
423
int checksum = hasher.hash().asInt();
424
425
// Convert to Google Storage format
426
String checksumString = ChecksumUtils.convertChecksumToString(checksum);
427
System.out.println("Checksum: " + checksumString);
428
```
429
430
## Batch Operations
431
432
### Composition Operations
433
434
The compose operation allows combining up to 32 source blobs into a single target blob:
435
436
```java
437
List<GSBlobIdentifier> sourceBlobs = Arrays.asList(
438
new GSBlobIdentifier("bucket", "part-1"),
439
new GSBlobIdentifier("bucket", "part-2"),
440
new GSBlobIdentifier("bucket", "part-3")
441
);
442
443
GSBlobIdentifier targetBlob = new GSBlobIdentifier("bucket", "combined-file");
444
445
// Compose all parts into single blob
446
storage.compose(sourceBlobs, targetBlob);
447
448
// Important: Source blobs are NOT deleted - must be cleaned up separately
449
storage.delete(sourceBlobs);
450
```
451
452
### Batch Delete Operations
453
454
Delete operations accept multiple blob identifiers and return individual results:
455
456
```java
457
List<GSBlobIdentifier> blobsToDelete = Arrays.asList(
458
new GSBlobIdentifier("bucket", "temp-1"),
459
new GSBlobIdentifier("bucket", "temp-2"),
460
new GSBlobIdentifier("bucket", "temp-3")
461
);
462
463
// Delete all blobs - does not fail if some don't exist
464
List<Boolean> deleteResults = storage.delete(blobsToDelete);
465
466
// Check results
467
for (int i = 0; i < deleteResults.size(); i++) {
468
if (deleteResults.get(i)) {
469
System.out.println("Successfully deleted: " + blobsToDelete.get(i));
470
} else {
471
System.out.println("Failed to delete or didn't exist: " + blobsToDelete.get(i));
472
}
473
}
474
```
475
476
## Error Handling
477
478
### Common Exceptions
479
480
- **IOException**: Network failures, authentication issues, storage errors
481
- **IllegalArgumentException**: Invalid blob identifiers, malformed URIs
482
- **StorageException**: Google Cloud Storage specific errors (wrapped in IOException)
483
484
### Best Practices
485
486
- **Retry Logic**: Use Flink's retry configuration for transient failures
487
- **Batch Operations**: Prefer batch delete over individual operations
488
- **Resource Cleanup**: Always close WriteChannel instances
489
- **Temporary Object Management**: Clean up temporary objects after successful operations
490
491
### Checksum Validation
492
493
The storage layer automatically validates checksums during write operations to ensure data integrity:
494
495
```java
496
// Checksum validation happens automatically
497
GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);
498
channel.write(data, 0, data.length);
499
channel.close(); // Validates checksum before completing write
500
```