0
# Client Management
1
2
Base client functionality for authentication, caching configuration, and cloud service connection management. The client system provides a unified interface for managing connections to different cloud providers while handling authentication, caching, and service-specific configurations.
3
4
## Capabilities
5
6
### Base Client Class
7
8
Abstract base class that defines the common interface for all cloud clients.
9
10
```python { .api }
11
class Client:
12
"""Base class for all cloud storage clients."""
13
14
def __init__(
15
self,
16
file_cache_mode: FileCacheMode = None,
17
local_cache_dir: str = None,
18
content_type_method = None
19
):
20
"""
21
Initialize base client.
22
23
Args:
24
file_cache_mode: Cache management strategy
25
local_cache_dir: Local directory for file caching
26
content_type_method: Function to determine MIME types
27
"""
28
29
@classmethod
30
def get_default_client(cls):
31
"""
32
Get the default client instance for this client type.
33
34
Returns:
35
Default client instance or None if not set
36
"""
37
38
def set_as_default_client(self) -> None:
39
"""
40
Set this client as the default for its type.
41
42
All paths created without explicit client will use this client.
43
"""
44
45
def CloudPath(
46
self,
47
cloud_path: str,
48
*parts: str
49
) -> "CloudPath":
50
"""
51
Create CloudPath associated with this client.
52
53
Args:
54
cloud_path: Cloud storage URI
55
*parts: Additional path segments
56
57
Returns:
58
CloudPath instance using this client
59
"""
60
61
def clear_cache(self) -> None:
62
"""
63
Clear all cached files for this client.
64
"""
65
66
@property
67
def file_cache_mode(self) -> FileCacheMode:
68
"""Cache management mode for this client."""
69
70
@property
71
def content_type_method(self):
72
"""Function used to determine MIME types."""
73
```
74
75
## Usage Examples
76
77
### Default Client Management
78
79
```python
80
from cloudpathlib import S3Client, GSClient, AzureBlobClient, CloudPath
81
82
# Configure default clients for each provider
83
s3_client = S3Client(
84
aws_access_key_id="your-key",
85
aws_secret_access_key="your-secret"
86
)
87
s3_client.set_as_default_client()
88
89
gs_client = GSClient(
90
application_credentials="path/to/service-account.json"
91
)
92
gs_client.set_as_default_client()
93
94
azure_client = AzureBlobClient(
95
connection_string="your-connection-string"
96
)
97
azure_client.set_as_default_client()
98
99
# Now all paths use the configured default clients
100
s3_path = CloudPath("s3://my-bucket/file.txt") # Uses s3_client
101
gs_path = CloudPath("gs://my-bucket/file.txt") # Uses gs_client
102
azure_path = CloudPath("az://my-container/file.txt") # Uses azure_client
103
104
# Check which client is being used
105
print(f"S3 client: {s3_path.client}")
106
print(f"GS client: {gs_path.client}")
107
```
108
109
### Multiple Client Configurations
110
111
```python
112
# Configure different clients for different environments
113
prod_s3_client = S3Client(
114
profile_name="production",
115
file_cache_mode=FileCacheMode.persistent
116
)
117
118
dev_s3_client = S3Client(
119
profile_name="development",
120
file_cache_mode=FileCacheMode.tmp_dir
121
)
122
123
# Use specific clients explicitly
124
prod_path = CloudPath("s3://prod-bucket/data.txt", client=prod_s3_client)
125
dev_path = CloudPath("s3://dev-bucket/data.txt", client=dev_s3_client)
126
127
# Or create paths using client method
128
prod_path = prod_s3_client.CloudPath("s3://prod-bucket/data.txt")
129
dev_path = dev_s3_client.CloudPath("s3://dev-bucket/data.txt")
130
```
131
132
### Cache Management
133
134
```python
135
from cloudpathlib import FileCacheMode
136
import tempfile
137
138
# Configure client with persistent cache
139
cache_dir = "/tmp/cloudpathlib-cache"
140
client = S3Client(
141
file_cache_mode=FileCacheMode.persistent,
142
local_cache_dir=cache_dir
143
)
144
145
# Create paths with configured caching
146
path = CloudPath("s3://my-bucket/large-file.dat", client=client)
147
148
# File is cached locally on first access
149
content = path.read_bytes() # Downloads and caches
150
content = path.read_bytes() # Uses cached version
151
152
# Clear cache for specific client
153
client.clear_cache()
154
155
# Clear cache for specific path
156
path.clear_cache()
157
```
158
159
### Content Type Detection
160
161
```python
162
import mimetypes
163
164
def custom_content_type(path):
165
"""Custom MIME type detection."""
166
mime_type, _ = mimetypes.guess_type(str(path))
167
168
# Custom mappings
169
if str(path).endswith('.parquet'):
170
return 'application/octet-stream'
171
elif str(path).endswith('.jsonl'):
172
return 'application/x-jsonlines'
173
174
return mime_type or 'application/octet-stream'
175
176
# Configure client with custom content type detection
177
client = S3Client(content_type_method=custom_content_type)
178
179
# Uploads will use custom MIME type detection
180
path = CloudPath("s3://my-bucket/data.parquet", client=client)
181
path.upload_from("local_data.parquet") # Uses custom content type
182
```
183
184
### Client Factory Pattern
185
186
```python
187
class CloudClientFactory:
188
"""Factory for creating configured cloud clients."""
189
190
@staticmethod
191
def create_s3_client(environment="production"):
192
"""Create S3 client for specific environment."""
193
if environment == "production":
194
return S3Client(
195
profile_name="prod",
196
file_cache_mode=FileCacheMode.persistent,
197
local_cache_dir="/var/cache/cloudpathlib"
198
)
199
elif environment == "development":
200
return S3Client(
201
profile_name="dev",
202
file_cache_mode=FileCacheMode.tmp_dir
203
)
204
elif environment == "testing":
205
return S3Client(
206
no_sign_request=True, # For public buckets
207
file_cache_mode=FileCacheMode.close_file
208
)
209
else:
210
raise ValueError(f"Unknown environment: {environment}")
211
212
@staticmethod
213
def create_gs_client(environment="production"):
214
"""Create GCS client for specific environment."""
215
if environment == "production":
216
return GSClient(
217
project="my-prod-project",
218
file_cache_mode=FileCacheMode.persistent
219
)
220
elif environment == "development":
221
return GSClient(
222
application_credentials="dev-service-account.json",
223
file_cache_mode=FileCacheMode.tmp_dir
224
)
225
else:
226
raise ValueError(f"Unknown environment: {environment}")
227
228
# Usage
229
import os
230
env = os.getenv("ENVIRONMENT", "development")
231
232
s3_client = CloudClientFactory.create_s3_client(env)
233
s3_client.set_as_default_client()
234
235
gs_client = CloudClientFactory.create_gs_client(env)
236
gs_client.set_as_default_client()
237
```
238
239
### Configuration from Environment
240
241
```python
242
import os
243
from cloudpathlib import S3Client, GSClient, FileCacheMode
244
245
def configure_clients_from_env():
246
"""Configure clients from environment variables."""
247
248
# S3 client configuration
249
s3_config = {}
250
if os.getenv("AWS_ACCESS_KEY_ID"):
251
s3_config["aws_access_key_id"] = os.getenv("AWS_ACCESS_KEY_ID")
252
if os.getenv("AWS_SECRET_ACCESS_KEY"):
253
s3_config["aws_secret_access_key"] = os.getenv("AWS_SECRET_ACCESS_KEY")
254
if os.getenv("AWS_PROFILE"):
255
s3_config["profile_name"] = os.getenv("AWS_PROFILE")
256
if os.getenv("S3_ENDPOINT_URL"):
257
s3_config["endpoint_url"] = os.getenv("S3_ENDPOINT_URL")
258
259
# Cache configuration
260
cache_mode = os.getenv("CLOUDPATHLIB_CACHE_MODE", "tmp_dir")
261
cache_dir = os.getenv("CLOUDPATHLIB_CACHE_DIR")
262
263
s3_config["file_cache_mode"] = FileCacheMode(cache_mode)
264
if cache_dir:
265
s3_config["local_cache_dir"] = cache_dir
266
267
s3_client = S3Client(**s3_config)
268
s3_client.set_as_default_client()
269
270
# GCS client configuration
271
gs_config = {}
272
if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
273
gs_config["application_credentials"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
274
if os.getenv("GCP_PROJECT"):
275
gs_config["project"] = os.getenv("GCP_PROJECT")
276
277
gs_config["file_cache_mode"] = FileCacheMode(cache_mode)
278
if cache_dir:
279
gs_config["local_cache_dir"] = cache_dir
280
281
gs_client = GSClient(**gs_config)
282
gs_client.set_as_default_client()
283
284
return s3_client, gs_client
285
286
# Configure from environment
287
s3_client, gs_client = configure_clients_from_env()
288
```
289
290
### Client Context Managers
291
292
```python
293
class TemporaryClient:
294
"""Context manager for temporary client configuration."""
295
296
def __init__(self, client):
297
self.client = client
298
self.original_default = None
299
300
def __enter__(self):
301
# Save current default
302
self.original_default = self.client.__class__.get_default_client()
303
# Set temporary default
304
self.client.set_as_default_client()
305
return self.client
306
307
def __exit__(self, exc_type, exc_val, exc_tb):
308
# Restore original default
309
if self.original_default:
310
self.original_default.set_as_default_client()
311
312
# Usage
313
temp_client = S3Client(profile_name="temporary-profile")
314
315
with TemporaryClient(temp_client):
316
# Inside context, paths use temporary client
317
path = CloudPath("s3://temp-bucket/file.txt")
318
content = path.read_text()
319
320
# Outside context, original default is restored
321
```
322
323
### Client Health Checks
324
325
```python
326
def check_client_connectivity(client):
327
"""Check if client can connect to cloud service."""
328
try:
329
# Try to list a path (this tests authentication and connectivity)
330
test_path = client.CloudPath("s3://test-bucket/")
331
list(test_path.iterdir())
332
return True, "Connection successful"
333
except Exception as e:
334
return False, str(e)
335
336
# Check all configured clients
337
clients = {
338
"S3": S3Client.get_default_client(),
339
"GCS": GSClient.get_default_client(),
340
"Azure": AzureBlobClient.get_default_client()
341
}
342
343
for name, client in clients.items():
344
if client:
345
is_healthy, message = check_client_connectivity(client)
346
print(f"{name} client: {'✓' if is_healthy else '✗'} {message}")
347
else:
348
print(f"{name} client: Not configured")
349
```
350
351
### Advanced Cache Configuration
352
353
```python
354
import tempfile
355
import shutil
356
from pathlib import Path
357
358
class ManagedCacheClient:
359
"""Client wrapper with advanced cache management."""
360
361
def __init__(self, base_client, max_cache_size_mb=1000):
362
self.base_client = base_client
363
self.max_cache_size_mb = max_cache_size_mb
364
self.cache_dir = Path(tempfile.mkdtemp(prefix="cloudpath_"))
365
366
# Configure client with managed cache directory
367
self.base_client.local_cache_dir = str(self.cache_dir)
368
self.base_client.file_cache_mode = FileCacheMode.persistent
369
370
def get_cache_size_mb(self):
371
"""Get current cache size in MB."""
372
total_size = sum(
373
f.stat().st_size for f in self.cache_dir.rglob('*') if f.is_file()
374
)
375
return total_size / (1024 * 1024)
376
377
def cleanup_old_files(self):
378
"""Remove old cached files if cache is too large."""
379
current_size = self.get_cache_size_mb()
380
381
if current_size <= self.max_cache_size_mb:
382
return
383
384
# Get all cached files with modification times
385
cached_files = [
386
(f, f.stat().st_mtime) for f in self.cache_dir.rglob('*')
387
if f.is_file()
388
]
389
390
# Sort by modification time (oldest first)
391
cached_files.sort(key=lambda x: x[1])
392
393
# Remove files until under size limit
394
for file_path, _ in cached_files:
395
file_path.unlink()
396
current_size = self.get_cache_size_mb()
397
if current_size <= self.max_cache_size_mb:
398
break
399
400
def CloudPath(self, *args, **kwargs):
401
"""Create CloudPath and manage cache."""
402
self.cleanup_old_files()
403
return self.base_client.CloudPath(*args, **kwargs)
404
405
def __del__(self):
406
"""Clean up temporary cache directory."""
407
if self.cache_dir.exists():
408
shutil.rmtree(self.cache_dir)
409
410
# Usage
411
base_s3_client = S3Client(profile_name="default")
412
managed_client = ManagedCacheClient(base_s3_client, max_cache_size_mb=500)
413
414
# Paths automatically benefit from managed caching
415
path = managed_client.CloudPath("s3://large-data-bucket/dataset.csv")
416
data = path.read_text() # Cached with size management
417
```
418
419
### Multi-Region Client Setup
420
421
```python
422
class MultiRegionS3Client:
423
"""Wrapper for managing S3 clients across multiple regions."""
424
425
def __init__(self, regions, credentials):
426
self.clients = {}
427
self.credentials = credentials
428
429
for region in regions:
430
self.clients[region] = S3Client(
431
region_name=region,
432
**credentials
433
)
434
435
def get_client_for_bucket(self, bucket_name):
436
"""Get appropriate client for bucket based on region."""
437
# This would require boto3 to determine bucket region
438
# Simplified example assumes bucket naming convention
439
for region, client in self.clients.items():
440
if region in bucket_name:
441
return client
442
443
# Return default region client
444
return next(iter(self.clients.values()))
445
446
def CloudPath(self, path_str):
447
"""Create CloudPath with region-appropriate client."""
448
# Extract bucket name from path
449
bucket_name = path_str.split('/')[2] # s3://bucket/key
450
client = self.get_client_for_bucket(bucket_name)
451
return client.CloudPath(path_str)
452
453
# Usage
454
multi_region_client = MultiRegionS3Client(
455
regions=["us-east-1", "us-west-2", "eu-west-1"],
456
credentials={
457
"aws_access_key_id": "your-key",
458
"aws_secret_access_key": "your-secret"
459
}
460
)
461
462
# Automatically uses appropriate regional client
463
us_path = multi_region_client.CloudPath("s3://us-east-1-bucket/data.txt")
464
eu_path = multi_region_client.CloudPath("s3://eu-west-1-bucket/data.txt")
465
```
466
467
### Client Monitoring and Metrics
468
469
```python
470
import time
471
from collections import defaultdict
472
473
class MonitoringClient:
474
"""Client wrapper that tracks usage metrics."""
475
476
def __init__(self, base_client):
477
self.base_client = base_client
478
self.metrics = defaultdict(int)
479
self.operation_times = defaultdict(list)
480
481
def CloudPath(self, *args, **kwargs):
482
"""Create monitored CloudPath."""
483
self.metrics["paths_created"] += 1
484
return MonitoredCloudPath(
485
self.base_client.CloudPath(*args, **kwargs),
486
self
487
)
488
489
def record_operation(self, operation, duration):
490
"""Record operation metrics."""
491
self.metrics[f"{operation}_count"] += 1
492
self.operation_times[operation].append(duration)
493
494
def get_metrics(self):
495
"""Get collected metrics."""
496
summary = dict(self.metrics)
497
498
for operation, times in self.operation_times.items():
499
if times:
500
summary[f"{operation}_avg_time"] = sum(times) / len(times)
501
summary[f"{operation}_total_time"] = sum(times)
502
503
return summary
504
505
class MonitoredCloudPath:
506
"""CloudPath wrapper that tracks operations."""
507
508
def __init__(self, path, monitor):
509
self.path = path
510
self.monitor = monitor
511
512
def read_text(self):
513
start_time = time.time()
514
try:
515
result = self.path.read_text()
516
duration = time.time() - start_time
517
self.monitor.record_operation("read_text", duration)
518
return result
519
except Exception:
520
self.monitor.record_operation("read_text_error", 0)
521
raise
522
523
def write_text(self, data):
524
start_time = time.time()
525
try:
526
result = self.path.write_text(data)
527
duration = time.time() - start_time
528
self.monitor.record_operation("write_text", duration)
529
return result
530
except Exception:
531
self.monitor.record_operation("write_text_error", 0)
532
raise
533
534
# Delegate other methods to wrapped path
535
def __getattr__(self, name):
536
return getattr(self.path, name)
537
538
# Usage
539
base_client = S3Client()
540
monitoring_client = MonitoringClient(base_client)
541
542
# All operations are monitored
543
path = monitoring_client.CloudPath("s3://my-bucket/file.txt")
544
path.write_text("Hello, world!")
545
content = path.read_text()
546
547
# Check metrics
548
metrics = monitoring_client.get_metrics()
549
print(f"Operations performed: {metrics}")
550
```