or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

anypath.mdazure-integration.mdclient-management.mdcloud-operations.mdconfiguration.mdcore-operations.mddirectory-operations.mdexceptions.mdfile-io.mdgcs-integration.mdhttp-support.mdindex.mdpatching.mds3-integration.md

gcs-integration.mddocs/

0

# Google Cloud Storage Integration

1

2

Full Google Cloud Storage support with service account authentication, custom retry policies, concurrent downloads, and GCS-specific features. This implementation provides comprehensive access to Google Cloud Storage capabilities through a pathlib-compatible interface.

3

4

## Capabilities

5

6

### GSPath Class

7

8

GCS-specific path implementation with access to Google Cloud Storage metadata.

9

10

```python { .api }

11

class GSPath(CloudPath):

12

"""Google Cloud Storage path implementation."""

13

14

@property

15

def bucket(self) -> str:

16

"""

17

GCS bucket name.

18

19

Returns:

20

Bucket name from the GCS URI

21

"""

22

23

@property

24

def blob(self) -> str:

25

"""

26

GCS object name (path within bucket).

27

28

Returns:

29

Object name string

30

"""

31

32

@property

33

def etag(self) -> str:

34

"""

35

GCS object ETag identifier.

36

37

Returns:

38

ETag string for the object

39

"""

40

41

@property

42

def md5(self) -> str:

43

"""

44

MD5 hash of the object content.

45

46

Returns:

47

MD5 hash string

48

"""

49

```

50

51

### GSClient Class

52

53

Google Cloud Storage client with comprehensive authentication and configuration options.

54

55

```python { .api }

56

class GSClient:

57

"""Google Cloud Storage client."""

58

59

def __init__(

60

self,

61

application_credentials: str = None,

62

credentials = None,

63

project: str = None,

64

storage_client = None,

65

file_cache_mode: FileCacheMode = None,

66

local_cache_dir: str = None,

67

content_type_method = None,

68

download_chunks_concurrently_kwargs: dict = None,

69

timeout: float = None,

70

retry = None

71

):

72

"""

73

Initialize GCS client.

74

75

Args:

76

application_credentials: Path to service account JSON file

77

credentials: Google auth credentials object

78

project: GCP project ID

79

storage_client: Custom google.cloud.storage.Client instance

80

file_cache_mode: Cache management strategy

81

local_cache_dir: Local directory for file cache

82

content_type_method: Function to determine MIME types

83

download_chunks_concurrently_kwargs: Concurrent download settings

84

timeout: Request timeout in seconds

85

retry: Retry policy for failed requests

86

"""

87

```

88

89

## Usage Examples

90

91

### Basic GCS Operations

92

93

```python

94

from cloudpathlib import GSPath, GSClient

95

96

# Create GCS path (uses default client)

97

gs_path = GSPath("gs://my-bucket/data/file.txt")

98

99

# Access GCS-specific properties

100

print(f"Bucket: {gs_path.bucket}") # "my-bucket"

101

print(f"Blob: {gs_path.blob}") # "data/file.txt"

102

103

# Check if object exists and get metadata

104

if gs_path.exists():

105

print(f"ETag: {gs_path.etag}")

106

print(f"MD5: {gs_path.md5}")

107

```

108

109

### Service Account Authentication

110

111

```python

112

# Use service account key file

113

client = GSClient(application_credentials="path/to/service-account.json")

114

client.set_as_default_client()

115

116

# Create paths using service account

117

gs_path = GSPath("gs://my-bucket/data.json")

118

content = gs_path.read_text()

119

```

120

121

### Credentials Object Authentication

122

123

```python

124

from google.oauth2 import service_account

125

126

# Load credentials from service account

127

credentials = service_account.Credentials.from_service_account_file(

128

"service-account.json",

129

scopes=["https://www.googleapis.com/auth/cloud-platform"]

130

)

131

132

client = GSClient(

133

credentials=credentials,

134

project="my-gcp-project"

135

)

136

137

gs_path = GSPath("gs://my-bucket/file.txt", client=client)

138

```

139

140

### Application Default Credentials

141

142

```python

143

# Use Application Default Credentials (ADC)

144

# Works when running on GCE, Cloud Run, or with gcloud auth

145

client = GSClient(project="my-gcp-project")

146

147

# ADC automatically handles authentication

148

gs_path = GSPath("gs://my-bucket/data.csv", client=client)

149

data = gs_path.read_text()

150

```

151

152

### Custom Storage Client

153

154

```python

155

from google.cloud import storage

156

157

# Create custom storage client with specific settings

158

storage_client = storage.Client(

159

project="my-project",

160

credentials=credentials

161

)

162

163

client = GSClient(storage_client=storage_client)

164

165

# Use custom client

166

gs_path = GSPath("gs://my-bucket/file.txt", client=client)

167

```

168

169

### Concurrent Downloads

170

171

```python

172

# Configure concurrent download settings

173

client = GSClient(

174

download_chunks_concurrently_kwargs={

175

"max_workers": 8,

176

"chunk_size": 1024 * 1024 # 1MB chunks

177

}

178

)

179

180

# Download large file with concurrent chunks

181

large_file = GSPath("gs://my-bucket/large-dataset.zip", client=client)

182

large_file.download_to("local-dataset.zip")

183

```

184

185

### Timeout and Retry Configuration

186

187

```python

188

from google.api_core import retry

189

import google.api_core.exceptions

190

191

# Configure custom retry policy

192

custom_retry = retry.Retry(

193

initial=1.0,

194

maximum=10.0,

195

multiplier=2.0,

196

predicate=retry.if_exception_type(

197

google.api_core.exceptions.ServiceUnavailable,

198

google.api_core.exceptions.TooManyRequests

199

)

200

)

201

202

client = GSClient(

203

timeout=60.0, # 60 second timeout

204

retry=custom_retry # Custom retry policy

205

)

206

207

# Operations use configured timeout and retry

208

gs_path = GSPath("gs://my-bucket/important.txt", client=client)

209

```

210

211

### Storage Classes

212

213

```python

214

# Upload with specific storage class

215

def upload_with_storage_class(local_path, gs_path, storage_class):

216

"""Upload file with specific GCS storage class."""

217

218

# Note: Storage class is set via direct client usage

219

blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)

220

blob.storage_class = storage_class

221

222

with open(local_path, 'rb') as f:

223

blob.upload_from_file(f)

224

225

# Usage examples

226

gs_path = GSPath("gs://my-bucket/archive.zip")

227

upload_with_storage_class("data.zip", gs_path, "COLDLINE")

228

229

# Different storage classes

230

storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]

231

```

232

233

### Lifecycle Management

234

235

```python

236

# Work with object lifecycle

237

def archive_old_files(bucket_name, days_old=365):

238

"""Archive files older than specified days to ARCHIVE storage class."""

239

from datetime import datetime, timedelta

240

241

cutoff_date = datetime.now() - timedelta(days=days_old)

242

bucket_path = GSPath(f"gs://{bucket_name}/")

243

244

for gs_file in bucket_path.rglob("*"):

245

if gs_file.is_file():

246

stats = gs_file.stat()

247

if datetime.fromtimestamp(stats.st_mtime) < cutoff_date:

248

# Move to archive storage class

249

blob = gs_file.client.storage_client.bucket(gs_file.bucket).blob(gs_file.blob)

250

blob.storage_class = "ARCHIVE"

251

blob.patch()

252

print(f"Archived: {gs_file}")

253

254

# Usage

255

archive_old_files("my-backup-bucket")

256

```

257

258

### Signed URLs

259

260

```python

261

from datetime import datetime, timedelta

262

263

# Generate signed URLs for temporary access

264

gs_path = GSPath("gs://private-bucket/confidential.pdf")

265

266

# Generate download URL (valid for 1 hour)

267

download_url = gs_path.as_url(presign=True, expire_seconds=3600)

268

print(f"Download URL: {download_url}")

269

270

# Generate upload URL

271

upload_path = GSPath("gs://uploads-bucket/new-file.txt")

272

upload_url = upload_path.as_url(presign=True, expire_seconds=1800) # 30 minutes

273

```

274

275

### Metadata Operations

276

277

```python

278

# Access and modify object metadata

279

def set_custom_metadata(gs_path, metadata_dict):

280

"""Set custom metadata on GCS object."""

281

blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)

282

blob.metadata = metadata_dict

283

blob.patch()

284

285

def get_custom_metadata(gs_path):

286

"""Get custom metadata from GCS object."""

287

blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)

288

blob.reload()

289

return blob.metadata

290

291

# Usage

292

gs_path = GSPath("gs://my-bucket/document.pdf")

293

294

# Set metadata

295

set_custom_metadata(gs_path, {

296

"author": "Data Team",

297

"project": "Analytics",

298

"version": "1.0"

299

})

300

301

# Read metadata

302

metadata = get_custom_metadata(gs_path)

303

print(f"Metadata: {metadata}")

304

```

305

306

### Batch Operations

307

308

```python

309

import concurrent.futures

310

from pathlib import Path

311

312

def upload_file_parallel(local_path, gs_base):

313

"""Upload single file to GCS."""

314

gs_path = gs_base / local_path.name

315

gs_path.upload_from(local_path)

316

return gs_path

317

318

# Parallel upload of multiple files

319

local_files = list(Path("data/").glob("*.json"))

320

gs_base = GSPath("gs://my-bucket/json-data/")

321

322

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

323

futures = [executor.submit(upload_file_parallel, f, gs_base) for f in local_files]

324

325

for future in concurrent.futures.as_completed(futures):

326

try:

327

gs_path = future.result()

328

print(f"Uploaded: {gs_path}")

329

except Exception as e:

330

print(f"Upload failed: {e}")

331

```

332

333

### Object Versioning

334

335

```python

336

# Work with object versions (requires versioned bucket)

337

def list_object_versions(gs_path):

338

"""List all versions of an object."""

339

bucket = gs_path.client.storage_client.bucket(gs_path.bucket)

340

341

versions = []

342

for blob in bucket.list_blobs(prefix=gs_path.blob, versions=True):

343

if blob.name == gs_path.blob:

344

versions.append({

345

"generation": blob.generation,

346

"time_created": blob.time_created,

347

"size": blob.size,

348

"etag": blob.etag

349

})

350

351

return sorted(versions, key=lambda x: x["time_created"], reverse=True)

352

353

# Usage

354

gs_path = GSPath("gs://versioned-bucket/important.txt")

355

versions = list_object_versions(gs_path)

356

for version in versions:

357

print(f"Generation {version['generation']}: {version['time_created']}")

358

```

359

360

### Cross-Project Operations

361

362

```python

363

# Work with buckets in different projects

364

project_a_client = GSClient(

365

project="project-a",

366

application_credentials="project-a-credentials.json"

367

)

368

369

project_b_client = GSClient(

370

project="project-b",

371

application_credentials="project-b-credentials.json"

372

)

373

374

# Copy between projects

375

source = GSPath("gs://project-a-bucket/data.txt", client=project_a_client)

376

destination = GSPath("gs://project-b-bucket/data.txt", client=project_b_client)

377

378

source.copy(destination)

379

```

380

381

### Streaming Operations

382

383

```python

384

# Stream large files without downloading entirely

385

def process_large_csv(gs_path):

386

"""Process large CSV file by streaming."""

387

import csv

388

389

with gs_path.open('r') as f:

390

reader = csv.DictReader(f)

391

for row_num, row in enumerate(reader):

392

process_row(row)

393

394

if row_num % 10000 == 0:

395

print(f"Processed {row_num} rows")

396

397

# Usage

398

large_csv = GSPath("gs://data-bucket/huge-dataset.csv")

399

process_large_csv(large_csv)

400

```

401

402

### IAM and Permissions

403

404

```python

405

# Check object permissions (requires direct client usage)

406

def check_object_permissions(gs_path, permissions):

407

"""Check if current credentials have specified permissions."""

408

bucket = gs_path.client.storage_client.bucket(gs_path.bucket)

409

blob = bucket.blob(gs_path.blob)

410

411

try:

412

result = blob.test_iam_permissions(permissions)

413

return result

414

except Exception as e:

415

print(f"Permission check failed: {e}")

416

return []

417

418

# Usage

419

gs_path = GSPath("gs://my-bucket/file.txt")

420

permissions = ["storage.objects.get", "storage.objects.delete"]

421

allowed = check_object_permissions(gs_path, permissions)

422

print(f"Allowed permissions: {allowed}")

423

```

424

425

### Error Handling

426

427

```python

428

from cloudpathlib import (

429

CloudPathFileNotFoundError,

430

MissingCredentialsError

431

)

432

from google.api_core import exceptions

433

import google.auth.exceptions

434

435

try:

436

gs_path = GSPath("gs://nonexistent-bucket/file.txt")

437

content = gs_path.read_text()

438

except CloudPathFileNotFoundError:

439

print("GCS object not found")

440

except google.auth.exceptions.DefaultCredentialsError:

441

print("GCP credentials not configured")

442

except exceptions.PermissionDenied:

443

print("Access denied")

444

except exceptions.GoogleAPIError as e:

445

print(f"GCP API error: {e}")

446

```

447

448

### Performance Optimization

449

450

```python

451

# Optimize for large file operations

452

client = GSClient(

453

download_chunks_concurrently_kwargs={

454

"max_workers": 16, # More concurrent workers

455

"chunk_size": 8 * 1024 * 1024 # 8MB chunks

456

},

457

timeout=300.0, # 5 minute timeout for large operations

458

)

459

460

# Configure client for high-throughput operations

461

gs_path = GSPath("gs://big-data-bucket/huge-file.dat", client=client)

462

463

# Performance monitoring

464

import time

465

start_time = time.time()

466

gs_path.download_to("local-huge-file.dat")

467

duration = time.time() - start_time

468

print(f"Download completed in {duration:.2f} seconds")

469

```