or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-configuration.mdindex.mdrecoverable-writer.mdstorage-operations.md

storage-operations.mddocs/

0

# Storage Operations

1

2

The Flink GS FileSystem plugin provides a comprehensive abstraction layer over Google Cloud Storage operations through the GSBlobStorage interface. This abstraction enables efficient blob management, batch operations, and low-level storage access while maintaining testability and clean separation of concerns.

3

4

## Capabilities

5

6

### GSBlobStorage Interface

7

8

Main abstraction interface for Google Cloud Storage operations providing all necessary blob management functionality.

9

10

```java { .api }

11

/**

12

* Abstract blob storage interface for Google storage operations

13

* Provides clean abstraction over Google Cloud Storage SDK

14

*/

15

public interface GSBlobStorage {

16

17

/**

18

* Creates a write channel with the default chunk size

19

* @param blobIdentifier The blob identifier to which to write

20

* @return The WriteChannel helper for streaming writes

21

*/

22

WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);

23

24

/**

25

* Creates a write channel with the specified chunk size

26

* @param blobIdentifier The blob identifier to which to write

27

* @param chunkSize The chunk size, must be > 0 and multiple of 256KB

28

* @return The WriteChannel helper for streaming writes

29

*/

30

WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);

31

32

/**

33

* Create an empty blob

34

* @param blobIdentifier The blob to create

35

*/

36

void createBlob(GSBlobIdentifier blobIdentifier);

37

38

/**

39

* Gets blob metadata

40

* @param blobIdentifier The blob identifier

41

* @return The blob metadata, if the blob exists. Empty if the blob doesn't exist.

42

*/

43

Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);

44

45

/**

46

* Lists all the blobs in a bucket matching a given prefix

47

* @param bucketName The bucket name

48

* @param prefix The object prefix

49

* @return The found blob identifiers

50

*/

51

List<GSBlobIdentifier> list(String bucketName, String prefix);

52

53

/**

54

* Copies from a source blob id to a target blob id. Does not delete the source blob.

55

* @param sourceBlobIdentifier The source blob identifier

56

* @param targetBlobIdentifier The target blob identifier

57

*/

58

void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);

59

60

/**

61

* Composes multiple blobs into one. Does not delete any of the source blobs.

62

* @param sourceBlobIdentifiers The source blob identifiers to combine, max of 32

63

* @param targetBlobIdentifier The target blob identifier

64

*/

65

void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);

66

67

/**

68

* Deletes blobs. Note that this does not fail if blobs don't exist.

69

* @param blobIdentifiers The blob identifiers to delete

70

* @return The results of each delete operation

71

*/

72

List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);

73

}

74

```

75

76

**Usage Example:**

77

78

```java

79

import org.apache.flink.fs.gs.storage.GSBlobStorage;

80

import org.apache.flink.fs.gs.storage.GSBlobIdentifier;

81

import org.apache.flink.configuration.MemorySize;

82

83

// Get storage instance (typically through GSFileSystem)

84

GSBlobStorage storage = ...; // Obtained from filesystem implementation

85

86

// Create blob identifier

87

GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");

88

89

// Write data to blob

90

GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId, MemorySize.ofMebiBytes(8));

91

channel.write("Hello World".getBytes(), 0, 11);

92

channel.close();

93

94

// Get blob metadata

95

Optional<GSBlobStorage.BlobMetadata> metadata = storage.getMetadata(blobId);

96

if (metadata.isPresent()) {

97

String checksum = metadata.get().getChecksum();

98

System.out.println("Blob checksum: " + checksum);

99

}

100

101

// List blobs with prefix

102

List<GSBlobIdentifier> blobs = storage.list("my-bucket", "path/to/");

103

104

// Copy blob

105

GSBlobIdentifier targetId = new GSBlobIdentifier("my-bucket", "path/to/copy.txt");

106

storage.copy(blobId, targetId);

107

108

// Delete blobs

109

List<Boolean> results = storage.delete(Arrays.asList(blobId, targetId));

110

```

111

112

### GSBlobStorageImpl

113

114

Concrete implementation of GSBlobStorage using Google Cloud Storage client libraries.

115

116

```java { .api }

117

/**

118

* Concrete implementation of the GSBlobStorage interface for Google Cloud Storage operations

119

* Uses Google Cloud Storage SDK internally

120

*/

121

public class GSBlobStorageImpl implements GSBlobStorage {

122

123

/**

124

* Construct blob storage implementation

125

* @param storage The Google Cloud Storage service instance

126

*/

127

public GSBlobStorageImpl(Storage storage);

128

129

// Implements all GSBlobStorage interface methods

130

public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);

131

public WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);

132

public void createBlob(GSBlobIdentifier blobIdentifier);

133

public Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);

134

public List<GSBlobIdentifier> list(String bucketName, String prefix);

135

public void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);

136

public void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);

137

public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);

138

}

139

```

140

141

## Nested Interfaces

142

143

### BlobMetadata Interface

144

145

Provides access to blob metadata information.

146

147

```java { .api }

148

/**

149

* Abstract blob metadata interface

150

*/

151

public interface BlobMetadata {

152

/**

153

* The crc32c checksum for the blob

154

* @return The checksum in base64 format

155

*/

156

String getChecksum();

157

}

158

```

159

160

### WriteChannel Interface

161

162

Provides streaming write access to blobs.

163

164

```java { .api }

165

/**

166

* Abstract blob write channel interface

167

*/

168

public interface WriteChannel {

169

/**

170

* Writes data to the channel

171

* @param content The data buffer

172

* @param start Start offset in the data buffer

173

* @param length Number of bytes to write

174

* @return The number of bytes written

175

* @throws IOException On underlying failure

176

*/

177

int write(byte[] content, int start, int length) throws IOException;

178

179

/**

180

* Closes the channel and commits the write

181

* @throws IOException On underlying failure

182

*/

183

void close() throws IOException;

184

}

185

```

186

187

**Usage Example:**

188

189

```java

190

// Write large data using streaming approach

191

GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);

192

193

byte[] buffer = new byte[8192];

194

InputStream inputStream = new FileInputStream("large-file.dat");

195

int bytesRead;

196

197

while ((bytesRead = inputStream.read(buffer)) != -1) {

198

int written = channel.write(buffer, 0, bytesRead);

199

assert written == bytesRead; // Should write all bytes

200

}

201

202

channel.close(); // Commits the write

203

inputStream.close();

204

```

205

206

## Core Data Types

207

208

### GSBlobIdentifier

209

210

Abstraction for Google Cloud Storage blob identifiers providing clean separation from Google SDK types.

211

212

```java { .api }

213

/**

214

* An abstraction for the Google BlobId type

215

* Provides clean separation from Google Cloud Storage SDK

216

*/

217

public class GSBlobIdentifier {

218

/** The bucket name */

219

public final String bucketName;

220

221

/** The object name, within the bucket */

222

public final String objectName;

223

224

/**

225

* Construct an abstract blob identifier

226

* @param bucketName The bucket name

227

* @param objectName The object name

228

*/

229

public GSBlobIdentifier(String bucketName, String objectName);

230

231

/**

232

* Get a Google blob id for this identifier, with generation=null

233

* @return The BlobId for use with Google Cloud Storage SDK

234

*/

235

public BlobId getBlobId();

236

237

/**

238

* Construct an abstract blob identifier from a Google BlobId

239

* @param blobId The Google BlobId

240

* @return The abstract blob identifier

241

*/

242

public static GSBlobIdentifier fromBlobId(BlobId blobId);

243

244

/**

245

* Standard equals method for identifier comparison

246

* @param o Object to compare

247

* @return true if identifiers are equal

248

*/

249

public boolean equals(Object o);

250

251

/**

252

* Standard hashCode method for hash-based collections

253

* @return Hash code for this identifier

254

*/

255

public int hashCode();

256

257

/**

258

* String representation of the identifier

259

* @return String representation showing bucket and object names

260

*/

261

public String toString();

262

}

263

```

264

265

**Usage Examples:**

266

267

```java

268

// Create blob identifier

269

GSBlobIdentifier blobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");

270

271

// Convert to Google SDK BlobId

272

BlobId googleBlobId = blobId.getBlobId();

273

274

// Create from Google SDK BlobId

275

BlobId existingBlobId = BlobId.of("another-bucket", "another/path");

276

GSBlobIdentifier convertedId = GSBlobIdentifier.fromBlobId(existingBlobId);

277

278

// Use in collections

279

Set<GSBlobIdentifier> blobSet = new HashSet<>();

280

blobSet.add(blobId);

281

blobSet.add(convertedId);

282

283

// Comparison

284

GSBlobIdentifier sameBlobId = new GSBlobIdentifier("my-bucket", "path/to/file.txt");

285

assert blobId.equals(sameBlobId);

286

```

287

288

## Utility Classes

289

290

### BlobUtils

291

292

Utility functions for blob operations and URI parsing.

293

294

```java { .api }

295

/**

296

* Utility functions related to blobs

297

*/

298

public class BlobUtils {

299

/** The temporary object prefix */

300

private static final String TEMPORARY_OBJECT_PREFIX = ".inprogress";

301

302

/** The maximum number of blobs that can be composed in a single operation */

303

public static final int COMPOSE_MAX_BLOBS = 32;

304

305

/**

306

* Parses a blob id from a Google storage uri

307

* gs://bucket/foo/bar yields a blob with bucket name "bucket" and object name "foo/bar"

308

* @param uri The gs:// URI

309

* @return The blob identifier

310

* @throws IllegalArgumentException if URI format is invalid

311

*/

312

public static GSBlobIdentifier parseUri(URI uri);

313

314

/**

315

* Returns the temporary bucket name

316

* If options specifies a temporary bucket name, use that; otherwise, use the final bucket

317

* @param finalBlobIdentifier The final blob identifier

318

* @param options The file system options

319

* @return The temporary bucket name

320

*/

321

public static String getTemporaryBucketName(

322

GSBlobIdentifier finalBlobIdentifier, GSFileSystemOptions options);

323

324

/**

325

* Returns a temporary object partial name for organizing temporary files

326

* Format: .inprogress/bucket/object/ (with trailing slash)

327

* @param finalBlobIdentifier The final blob identifier

328

* @return The temporary object partial name

329

*/

330

public static String getTemporaryObjectPartialName(GSBlobIdentifier finalBlobIdentifier);

331

332

/**

333

* Returns a temporary object name by appending UUID to partial name

334

* Format: .inprogress/bucket/object/uuid

335

* @param finalBlobIdentifier The final blob identifier

336

* @param temporaryObjectId The UUID for this temporary object

337

* @return The complete temporary object name

338

*/

339

public static String getTemporaryObjectName(

340

GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);

341

342

/**

343

* Returns a temporary object name with entropy injection

344

* Format: uuid.inprogress/bucket/object/uuid (for hotspot reduction)

345

* @param finalBlobIdentifier The final blob identifier

346

* @param temporaryObjectId The UUID for this temporary object

347

* @return The complete temporary object name with entropy

348

*/

349

public static String getTemporaryObjectNameWithEntropy(

350

GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId);

351

352

/**

353

* Resolves a temporary blob identifier for provided temporary object id and options

354

* @param finalBlobIdentifier The final blob identifier

355

* @param temporaryObjectId The UUID for this temporary object

356

* @param options The file system options

357

* @return The temporary blob identifier

358

*/

359

public static GSBlobIdentifier getTemporaryBlobIdentifier(

360

GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId, GSFileSystemOptions options);

361

}

362

```

363

364

**Usage Examples:**

365

366

```java

367

import java.net.URI;

368

import java.util.UUID;

369

370

// Parse GCS URI

371

URI gcsUri = URI.create("gs://my-bucket/data/input.txt");

372

GSBlobIdentifier blobId = BlobUtils.parseUri(gcsUri);

373

// Result: blobId.bucketName = "my-bucket", blobId.objectName = "data/input.txt"

374

375

// Get temporary bucket name

376

GSFileSystemOptions options = new GSFileSystemOptions(config);

377

String tempBucket = BlobUtils.getTemporaryBucketName(blobId, options);

378

379

// Generate temporary object names

380

String partialName = BlobUtils.getTemporaryObjectPartialName(blobId);

381

// Result: ".inprogress/my-bucket/data/input.txt/"

382

383

// Generate temporary object id

384

UUID tempId = UUID.randomUUID();

385

String tempObjectName = BlobUtils.getTemporaryObjectName(blobId, tempId);

386

// Result: ".inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"

387

388

// Or with entropy for hotspot reduction

389

String tempObjectNameWithEntropy = BlobUtils.getTemporaryObjectNameWithEntropy(blobId, tempId);

390

// Result: "550e8400-e29b-41d4-a716-446655440000.inprogress/my-bucket/data/input.txt/550e8400-e29b-41d4-a716-446655440000"

391

```

392

393

### ChecksumUtils

394

395

Utilities for CRC32C checksum operations used by Google Cloud Storage.

396

397

```java { .api }

398

/**

399

* Utility class for checksum operations, particularly CRC32C checksums used by Google Storage

400

*/

401

public class ChecksumUtils {

402

/** THe crc hash function used by Google storage */

403

public static final HashFunction CRC_HASH_FUNCTION = Hashing.crc32c();

404

405

/**

406

* Converts int CRC32 checksum to Google Storage's base64 string format

407

* @param checksum The integer checksum value

408

* @return Base64-encoded checksum string

409

*/

410

public static String convertChecksumToString(int checksum);

411

}

412

```

413

414

**Usage Example:**

415

416

```java

417

import com.google.common.hash.Hasher;

418

419

// Compute checksum for data

420

byte[] data = "Hello World".getBytes();

421

Hasher hasher = ChecksumUtils.CRC_HASH_FUNCTION.newHasher();

422

hasher.putBytes(data);

423

int checksum = hasher.hash().asInt();

424

425

// Convert to Google Storage format

426

String checksumString = ChecksumUtils.convertChecksumToString(checksum);

427

System.out.println("Checksum: " + checksumString);

428

```

429

430

## Batch Operations

431

432

### Composition Operations

433

434

The compose operation allows combining up to 32 source blobs into a single target blob:

435

436

```java

437

List<GSBlobIdentifier> sourceBlobs = Arrays.asList(

438

new GSBlobIdentifier("bucket", "part-1"),

439

new GSBlobIdentifier("bucket", "part-2"),

440

new GSBlobIdentifier("bucket", "part-3")

441

);

442

443

GSBlobIdentifier targetBlob = new GSBlobIdentifier("bucket", "combined-file");

444

445

// Compose all parts into single blob

446

storage.compose(sourceBlobs, targetBlob);

447

448

// Important: Source blobs are NOT deleted - must be cleaned up separately

449

storage.delete(sourceBlobs);

450

```

451

452

### Batch Delete Operations

453

454

Delete operations accept multiple blob identifiers and return individual results:

455

456

```java

457

List<GSBlobIdentifier> blobsToDelete = Arrays.asList(

458

new GSBlobIdentifier("bucket", "temp-1"),

459

new GSBlobIdentifier("bucket", "temp-2"),

460

new GSBlobIdentifier("bucket", "temp-3")

461

);

462

463

// Delete all blobs - does not fail if some don't exist

464

List<Boolean> deleteResults = storage.delete(blobsToDelete);

465

466

// Check results

467

for (int i = 0; i < deleteResults.size(); i++) {

468

if (deleteResults.get(i)) {

469

System.out.println("Successfully deleted: " + blobsToDelete.get(i));

470

} else {

471

System.out.println("Failed to delete or didn't exist: " + blobsToDelete.get(i));

472

}

473

}

474

```

475

476

## Error Handling

477

478

### Common Exceptions

479

480

- **IOException**: Network failures, authentication issues, storage errors

481

- **IllegalArgumentException**: Invalid blob identifiers, malformed URIs

482

- **StorageException**: Google Cloud Storage specific errors (wrapped in IOException)

483

484

### Best Practices

485

486

- **Retry Logic**: Use Flink's retry configuration for transient failures

487

- **Batch Operations**: Prefer batch delete over individual operations

488

- **Resource Cleanup**: Always close WriteChannel instances

489

- **Temporary Object Management**: Clean up temporary objects after successful operations

490

491

### Checksum Validation

492

493

The storage layer automatically validates checksums during write operations to ensure data integrity:

494

495

```java

496

// Checksum validation happens automatically

497

GSBlobStorage.WriteChannel channel = storage.writeBlob(blobId);

498

channel.write(data, 0, data.length);

499

channel.close(); // Validates checksum before completing write

500

```