0
# Google Cloud Storage Integration
1
2
Full Google Cloud Storage support with service account authentication, custom retry policies, concurrent downloads, and GCS-specific features. This implementation provides comprehensive access to Google Cloud Storage capabilities through a pathlib-compatible interface.
3
4
## Capabilities
5
6
### GSPath Class
7
8
GCS-specific path implementation with access to Google Cloud Storage metadata.
9
10
```python { .api }
11
class GSPath(CloudPath):
12
"""Google Cloud Storage path implementation."""
13
14
@property
15
def bucket(self) -> str:
16
"""
17
GCS bucket name.
18
19
Returns:
20
Bucket name from the GCS URI
21
"""
22
23
@property
24
def blob(self) -> str:
25
"""
26
GCS object name (path within bucket).
27
28
Returns:
29
Object name string
30
"""
31
32
@property
33
def etag(self) -> str:
34
"""
35
GCS object ETag identifier.
36
37
Returns:
38
ETag string for the object
39
"""
40
41
@property
42
def md5(self) -> str:
43
"""
44
MD5 hash of the object content.
45
46
Returns:
47
MD5 hash string
48
"""
49
```
50
51
### GSClient Class
52
53
Google Cloud Storage client with comprehensive authentication and configuration options.
54
55
```python { .api }
56
class GSClient:
57
"""Google Cloud Storage client."""
58
59
def __init__(
60
self,
61
application_credentials: str = None,
62
credentials = None,
63
project: str = None,
64
storage_client = None,
65
file_cache_mode: FileCacheMode = None,
66
local_cache_dir: str = None,
67
content_type_method = None,
68
download_chunks_concurrently_kwargs: dict = None,
69
timeout: float = None,
70
retry = None
71
):
72
"""
73
Initialize GCS client.
74
75
Args:
76
application_credentials: Path to service account JSON file
77
credentials: Google auth credentials object
78
project: GCP project ID
79
storage_client: Custom google.cloud.storage.Client instance
80
file_cache_mode: Cache management strategy
81
local_cache_dir: Local directory for file cache
82
content_type_method: Function to determine MIME types
83
download_chunks_concurrently_kwargs: Concurrent download settings
84
timeout: Request timeout in seconds
85
retry: Retry policy for failed requests
86
"""
87
```
88
89
## Usage Examples
90
91
### Basic GCS Operations
92
93
```python
94
from cloudpathlib import GSPath, GSClient
95
96
# Create GCS path (uses default client)
97
gs_path = GSPath("gs://my-bucket/data/file.txt")
98
99
# Access GCS-specific properties
100
print(f"Bucket: {gs_path.bucket}") # "my-bucket"
101
print(f"Blob: {gs_path.blob}") # "data/file.txt"
102
103
# Check if object exists and get metadata
104
if gs_path.exists():
105
print(f"ETag: {gs_path.etag}")
106
print(f"MD5: {gs_path.md5}")
107
```
108
109
### Service Account Authentication
110
111
```python
112
# Use service account key file
113
client = GSClient(application_credentials="path/to/service-account.json")
114
client.set_as_default_client()
115
116
# Create paths using service account
117
gs_path = GSPath("gs://my-bucket/data.json")
118
content = gs_path.read_text()
119
```
120
121
### Credentials Object Authentication
122
123
```python
124
from google.oauth2 import service_account
125
126
# Load credentials from service account
127
credentials = service_account.Credentials.from_service_account_file(
128
"service-account.json",
129
scopes=["https://www.googleapis.com/auth/cloud-platform"]
130
)
131
132
client = GSClient(
133
credentials=credentials,
134
project="my-gcp-project"
135
)
136
137
gs_path = GSPath("gs://my-bucket/file.txt", client=client)
138
```
139
140
### Application Default Credentials
141
142
```python
143
# Use Application Default Credentials (ADC)
144
# Works when running on GCE, Cloud Run, or with gcloud auth
145
client = GSClient(project="my-gcp-project")
146
147
# ADC automatically handles authentication
148
gs_path = GSPath("gs://my-bucket/data.csv", client=client)
149
data = gs_path.read_text()
150
```
151
152
### Custom Storage Client
153
154
```python
155
from google.cloud import storage
156
157
# Create custom storage client with specific settings
158
storage_client = storage.Client(
159
project="my-project",
160
credentials=credentials
161
)
162
163
client = GSClient(storage_client=storage_client)
164
165
# Use custom client
166
gs_path = GSPath("gs://my-bucket/file.txt", client=client)
167
```
168
169
### Concurrent Downloads
170
171
```python
172
# Configure concurrent download settings
173
client = GSClient(
174
download_chunks_concurrently_kwargs={
175
"max_workers": 8,
176
"chunk_size": 1024 * 1024 # 1MB chunks
177
}
178
)
179
180
# Download large file with concurrent chunks
181
large_file = GSPath("gs://my-bucket/large-dataset.zip", client=client)
182
large_file.download_to("local-dataset.zip")
183
```
184
185
### Timeout and Retry Configuration
186
187
```python
188
from google.api_core import retry
189
import google.api_core.exceptions
190
191
# Configure custom retry policy
192
custom_retry = retry.Retry(
193
initial=1.0,
194
maximum=10.0,
195
multiplier=2.0,
196
predicate=retry.if_exception_type(
197
google.api_core.exceptions.ServiceUnavailable,
198
google.api_core.exceptions.TooManyRequests
199
)
200
)
201
202
client = GSClient(
203
timeout=60.0, # 60 second timeout
204
retry=custom_retry # Custom retry policy
205
)
206
207
# Operations use configured timeout and retry
208
gs_path = GSPath("gs://my-bucket/important.txt", client=client)
209
```
210
211
### Storage Classes
212
213
```python
214
# Upload with specific storage class
215
def upload_with_storage_class(local_path, gs_path, storage_class):
216
"""Upload file with specific GCS storage class."""
217
218
# Note: Storage class is set via direct client usage
219
blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
220
blob.storage_class = storage_class
221
222
with open(local_path, 'rb') as f:
223
blob.upload_from_file(f)
224
225
# Usage examples
226
gs_path = GSPath("gs://my-bucket/archive.zip")
227
upload_with_storage_class("data.zip", gs_path, "COLDLINE")
228
229
# Different storage classes
230
storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]
231
```
232
233
### Lifecycle Management
234
235
```python
236
# Work with object lifecycle
237
def archive_old_files(bucket_name, days_old=365):
238
"""Archive files older than specified days to ARCHIVE storage class."""
239
from datetime import datetime, timedelta
240
241
cutoff_date = datetime.now() - timedelta(days=days_old)
242
bucket_path = GSPath(f"gs://{bucket_name}/")
243
244
for gs_file in bucket_path.rglob("*"):
245
if gs_file.is_file():
246
stats = gs_file.stat()
247
if datetime.fromtimestamp(stats.st_mtime) < cutoff_date:
248
# Move to archive storage class
249
blob = gs_file.client.storage_client.bucket(gs_file.bucket).blob(gs_file.blob)
250
blob.storage_class = "ARCHIVE"
251
blob.patch()
252
print(f"Archived: {gs_file}")
253
254
# Usage
255
archive_old_files("my-backup-bucket")
256
```
257
258
### Signed URLs
259
260
```python
261
from datetime import datetime, timedelta
262
263
# Generate signed URLs for temporary access
264
gs_path = GSPath("gs://private-bucket/confidential.pdf")
265
266
# Generate download URL (valid for 1 hour)
267
download_url = gs_path.as_url(presign=True, expire_seconds=3600)
268
print(f"Download URL: {download_url}")
269
270
# Generate upload URL
271
upload_path = GSPath("gs://uploads-bucket/new-file.txt")
272
upload_url = upload_path.as_url(presign=True, expire_seconds=1800) # 30 minutes
273
```
274
275
### Metadata Operations
276
277
```python
278
# Access and modify object metadata
279
def set_custom_metadata(gs_path, metadata_dict):
280
"""Set custom metadata on GCS object."""
281
blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
282
blob.metadata = metadata_dict
283
blob.patch()
284
285
def get_custom_metadata(gs_path):
286
"""Get custom metadata from GCS object."""
287
blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
288
blob.reload()
289
return blob.metadata
290
291
# Usage
292
gs_path = GSPath("gs://my-bucket/document.pdf")
293
294
# Set metadata
295
set_custom_metadata(gs_path, {
296
"author": "Data Team",
297
"project": "Analytics",
298
"version": "1.0"
299
})
300
301
# Read metadata
302
metadata = get_custom_metadata(gs_path)
303
print(f"Metadata: {metadata}")
304
```
305
306
### Batch Operations
307
308
```python
309
import concurrent.futures
310
from pathlib import Path
311
312
def upload_file_parallel(local_path, gs_base):
313
"""Upload single file to GCS."""
314
gs_path = gs_base / local_path.name
315
gs_path.upload_from(local_path)
316
return gs_path
317
318
# Parallel upload of multiple files
319
local_files = list(Path("data/").glob("*.json"))
320
gs_base = GSPath("gs://my-bucket/json-data/")
321
322
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
323
futures = [executor.submit(upload_file_parallel, f, gs_base) for f in local_files]
324
325
for future in concurrent.futures.as_completed(futures):
326
try:
327
gs_path = future.result()
328
print(f"Uploaded: {gs_path}")
329
except Exception as e:
330
print(f"Upload failed: {e}")
331
```
332
333
### Object Versioning
334
335
```python
336
# Work with object versions (requires versioned bucket)
337
def list_object_versions(gs_path):
338
"""List all versions of an object."""
339
bucket = gs_path.client.storage_client.bucket(gs_path.bucket)
340
341
versions = []
342
for blob in bucket.list_blobs(prefix=gs_path.blob, versions=True):
343
if blob.name == gs_path.blob:
344
versions.append({
345
"generation": blob.generation,
346
"time_created": blob.time_created,
347
"size": blob.size,
348
"etag": blob.etag
349
})
350
351
return sorted(versions, key=lambda x: x["time_created"], reverse=True)
352
353
# Usage
354
gs_path = GSPath("gs://versioned-bucket/important.txt")
355
versions = list_object_versions(gs_path)
356
for version in versions:
357
print(f"Generation {version['generation']}: {version['time_created']}")
358
```
359
360
### Cross-Project Operations
361
362
```python
363
# Work with buckets in different projects
364
project_a_client = GSClient(
365
project="project-a",
366
application_credentials="project-a-credentials.json"
367
)
368
369
project_b_client = GSClient(
370
project="project-b",
371
application_credentials="project-b-credentials.json"
372
)
373
374
# Copy between projects
375
source = GSPath("gs://project-a-bucket/data.txt", client=project_a_client)
376
destination = GSPath("gs://project-b-bucket/data.txt", client=project_b_client)
377
378
source.copy(destination)
379
```
380
381
### Streaming Operations
382
383
```python
384
# Stream large files without downloading entirely
385
def process_large_csv(gs_path):
386
"""Process large CSV file by streaming."""
387
import csv
388
389
with gs_path.open('r') as f:
390
reader = csv.DictReader(f)
391
for row_num, row in enumerate(reader):
392
process_row(row)
393
394
if row_num % 10000 == 0:
395
print(f"Processed {row_num} rows")
396
397
# Usage
398
large_csv = GSPath("gs://data-bucket/huge-dataset.csv")
399
process_large_csv(large_csv)
400
```
401
402
### IAM and Permissions
403
404
```python
405
# Check object permissions (requires direct client usage)
406
def check_object_permissions(gs_path, permissions):
407
"""Check if current credentials have specified permissions."""
408
bucket = gs_path.client.storage_client.bucket(gs_path.bucket)
409
blob = bucket.blob(gs_path.blob)
410
411
try:
412
result = blob.test_iam_permissions(permissions)
413
return result
414
except Exception as e:
415
print(f"Permission check failed: {e}")
416
return []
417
418
# Usage
419
gs_path = GSPath("gs://my-bucket/file.txt")
420
permissions = ["storage.objects.get", "storage.objects.delete"]
421
allowed = check_object_permissions(gs_path, permissions)
422
print(f"Allowed permissions: {allowed}")
423
```
424
425
### Error Handling
426
427
```python
428
from cloudpathlib import (
429
CloudPathFileNotFoundError,
430
MissingCredentialsError
431
)
432
from google.api_core import exceptions
433
import google.auth.exceptions
434
435
try:
436
gs_path = GSPath("gs://nonexistent-bucket/file.txt")
437
content = gs_path.read_text()
438
except CloudPathFileNotFoundError:
439
print("GCS object not found")
440
except google.auth.exceptions.DefaultCredentialsError:
441
print("GCP credentials not configured")
442
except exceptions.PermissionDenied:
443
print("Access denied")
444
except exceptions.GoogleAPIError as e:
445
print(f"GCP API error: {e}")
446
```
447
448
### Performance Optimization
449
450
```python
451
# Optimize for large file operations
452
client = GSClient(
453
download_chunks_concurrently_kwargs={
454
"max_workers": 16, # More concurrent workers
455
"chunk_size": 8 * 1024 * 1024 # 8MB chunks
456
},
457
timeout=300.0, # 5 minute timeout for large operations
458
)
459
460
# Configure client for high-throughput operations
461
gs_path = GSPath("gs://big-data-bucket/huge-file.dat", client=client)
462
463
# Performance monitoring
464
import time
465
start_time = time.time()
466
gs_path.download_to("local-huge-file.dat")
467
duration = time.time() - start_time
468
print(f"Download completed in {duration:.2f} seconds")
469
```