or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

anypath.mdazure-integration.mdclient-management.mdcloud-operations.mdconfiguration.mdcore-operations.mddirectory-operations.mdexceptions.mdfile-io.mdgcs-integration.mdhttp-support.mdindex.mdpatching.mds3-integration.md

client-management.mddocs/

0

# Client Management

1

2

Base client functionality for authentication, caching configuration, and cloud service connection management. The client system provides a unified interface for managing connections to different cloud providers while handling authentication, caching, and service-specific configurations.

3

4

## Capabilities

5

6

### Base Client Class

7

8

Abstract base class that defines the common interface for all cloud clients.

9

10

```python { .api }

11

class Client:

12

"""Base class for all cloud storage clients."""

13

14

def __init__(

15

self,

16

file_cache_mode: FileCacheMode = None,

17

local_cache_dir: str = None,

18

content_type_method = None

19

):

20

"""

21

Initialize base client.

22

23

Args:

24

file_cache_mode: Cache management strategy

25

local_cache_dir: Local directory for file caching

26

content_type_method: Function to determine MIME types

27

"""

28

29

@classmethod

30

def get_default_client(cls):

31

"""

32

Get the default client instance for this client type.

33

34

Returns:

35

Default client instance or None if not set

36

"""

37

38

def set_as_default_client(self) -> None:

39

"""

40

Set this client as the default for its type.

41

42

All paths created without explicit client will use this client.

43

"""

44

45

def CloudPath(

46

self,

47

cloud_path: str,

48

*parts: str

49

) -> "CloudPath":

50

"""

51

Create CloudPath associated with this client.

52

53

Args:

54

cloud_path: Cloud storage URI

55

*parts: Additional path segments

56

57

Returns:

58

CloudPath instance using this client

59

"""

60

61

def clear_cache(self) -> None:

62

"""

63

Clear all cached files for this client.

64

"""

65

66

@property

67

def file_cache_mode(self) -> FileCacheMode:

68

"""Cache management mode for this client."""

69

70

@property

71

def content_type_method(self):

72

"""Function used to determine MIME types."""

73

```

74

75

## Usage Examples

76

77

### Default Client Management

78

79

```python

80

from cloudpathlib import S3Client, GSClient, AzureBlobClient, CloudPath

81

82

# Configure default clients for each provider

83

s3_client = S3Client(

84

aws_access_key_id="your-key",

85

aws_secret_access_key="your-secret"

86

)

87

s3_client.set_as_default_client()

88

89

gs_client = GSClient(

90

application_credentials="path/to/service-account.json"

91

)

92

gs_client.set_as_default_client()

93

94

azure_client = AzureBlobClient(

95

connection_string="your-connection-string"

96

)

97

azure_client.set_as_default_client()

98

99

# Now all paths use the configured default clients

100

s3_path = CloudPath("s3://my-bucket/file.txt") # Uses s3_client

101

gs_path = CloudPath("gs://my-bucket/file.txt") # Uses gs_client

102

azure_path = CloudPath("az://my-container/file.txt") # Uses azure_client

103

104

# Check which client is being used

105

print(f"S3 client: {s3_path.client}")

106

print(f"GS client: {gs_path.client}")

107

```

108

109

### Multiple Client Configurations

110

111

```python

112

# Configure different clients for different environments

113

prod_s3_client = S3Client(

114

profile_name="production",

115

file_cache_mode=FileCacheMode.persistent

116

)

117

118

dev_s3_client = S3Client(

119

profile_name="development",

120

file_cache_mode=FileCacheMode.tmp_dir

121

)

122

123

# Use specific clients explicitly

124

prod_path = CloudPath("s3://prod-bucket/data.txt", client=prod_s3_client)

125

dev_path = CloudPath("s3://dev-bucket/data.txt", client=dev_s3_client)

126

127

# Or create paths using client method

128

prod_path = prod_s3_client.CloudPath("s3://prod-bucket/data.txt")

129

dev_path = dev_s3_client.CloudPath("s3://dev-bucket/data.txt")

130

```

131

132

### Cache Management

133

134

```python

135

from cloudpathlib import FileCacheMode

136

import tempfile

137

138

# Configure client with persistent cache

139

cache_dir = "/tmp/cloudpathlib-cache"

140

client = S3Client(

141

file_cache_mode=FileCacheMode.persistent,

142

local_cache_dir=cache_dir

143

)

144

145

# Create paths with configured caching

146

path = CloudPath("s3://my-bucket/large-file.dat", client=client)

147

148

# File is cached locally on first access

149

content = path.read_bytes() # Downloads and caches

150

content = path.read_bytes() # Uses cached version

151

152

# Clear cache for specific client

153

client.clear_cache()

154

155

# Clear cache for specific path

156

path.clear_cache()

157

```

158

159

### Content Type Detection

160

161

```python

162

import mimetypes

163

164

def custom_content_type(path):

165

"""Custom MIME type detection."""

166

mime_type, _ = mimetypes.guess_type(str(path))

167

168

# Custom mappings

169

if str(path).endswith('.parquet'):

170

return 'application/octet-stream'

171

elif str(path).endswith('.jsonl'):

172

return 'application/x-jsonlines'

173

174

return mime_type or 'application/octet-stream'

175

176

# Configure client with custom content type detection

177

client = S3Client(content_type_method=custom_content_type)

178

179

# Uploads will use custom MIME type detection

180

path = CloudPath("s3://my-bucket/data.parquet", client=client)

181

path.upload_from("local_data.parquet") # Uses custom content type

182

```

183

184

### Client Factory Pattern

185

186

```python

187

class CloudClientFactory:

188

"""Factory for creating configured cloud clients."""

189

190

@staticmethod

191

def create_s3_client(environment="production"):

192

"""Create S3 client for specific environment."""

193

if environment == "production":

194

return S3Client(

195

profile_name="prod",

196

file_cache_mode=FileCacheMode.persistent,

197

local_cache_dir="/var/cache/cloudpathlib"

198

)

199

elif environment == "development":

200

return S3Client(

201

profile_name="dev",

202

file_cache_mode=FileCacheMode.tmp_dir

203

)

204

elif environment == "testing":

205

return S3Client(

206

no_sign_request=True, # For public buckets

207

file_cache_mode=FileCacheMode.close_file

208

)

209

else:

210

raise ValueError(f"Unknown environment: {environment}")

211

212

@staticmethod

213

def create_gs_client(environment="production"):

214

"""Create GCS client for specific environment."""

215

if environment == "production":

216

return GSClient(

217

project="my-prod-project",

218

file_cache_mode=FileCacheMode.persistent

219

)

220

elif environment == "development":

221

return GSClient(

222

application_credentials="dev-service-account.json",

223

file_cache_mode=FileCacheMode.tmp_dir

224

)

225

else:

226

raise ValueError(f"Unknown environment: {environment}")

227

228

# Usage

229

import os

230

env = os.getenv("ENVIRONMENT", "development")

231

232

s3_client = CloudClientFactory.create_s3_client(env)

233

s3_client.set_as_default_client()

234

235

gs_client = CloudClientFactory.create_gs_client(env)

236

gs_client.set_as_default_client()

237

```

238

239

### Configuration from Environment

240

241

```python

242

import os

243

from cloudpathlib import S3Client, GSClient, FileCacheMode

244

245

def configure_clients_from_env():

246

"""Configure clients from environment variables."""

247

248

# S3 client configuration

249

s3_config = {}

250

if os.getenv("AWS_ACCESS_KEY_ID"):

251

s3_config["aws_access_key_id"] = os.getenv("AWS_ACCESS_KEY_ID")

252

if os.getenv("AWS_SECRET_ACCESS_KEY"):

253

s3_config["aws_secret_access_key"] = os.getenv("AWS_SECRET_ACCESS_KEY")

254

if os.getenv("AWS_PROFILE"):

255

s3_config["profile_name"] = os.getenv("AWS_PROFILE")

256

if os.getenv("S3_ENDPOINT_URL"):

257

s3_config["endpoint_url"] = os.getenv("S3_ENDPOINT_URL")

258

259

# Cache configuration

260

cache_mode = os.getenv("CLOUDPATHLIB_CACHE_MODE", "tmp_dir")

261

cache_dir = os.getenv("CLOUDPATHLIB_CACHE_DIR")

262

263

s3_config["file_cache_mode"] = FileCacheMode(cache_mode)

264

if cache_dir:

265

s3_config["local_cache_dir"] = cache_dir

266

267

s3_client = S3Client(**s3_config)

268

s3_client.set_as_default_client()

269

270

# GCS client configuration

271

gs_config = {}

272

if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):

273

gs_config["application_credentials"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

274

if os.getenv("GCP_PROJECT"):

275

gs_config["project"] = os.getenv("GCP_PROJECT")

276

277

gs_config["file_cache_mode"] = FileCacheMode(cache_mode)

278

if cache_dir:

279

gs_config["local_cache_dir"] = cache_dir

280

281

gs_client = GSClient(**gs_config)

282

gs_client.set_as_default_client()

283

284

return s3_client, gs_client

285

286

# Configure from environment

287

s3_client, gs_client = configure_clients_from_env()

288

```

289

290

### Client Context Managers

291

292

```python

293

class TemporaryClient:

294

"""Context manager for temporary client configuration."""

295

296

def __init__(self, client):

297

self.client = client

298

self.original_default = None

299

300

def __enter__(self):

301

# Save current default

302

self.original_default = self.client.__class__.get_default_client()

303

# Set temporary default

304

self.client.set_as_default_client()

305

return self.client

306

307

def __exit__(self, exc_type, exc_val, exc_tb):

308

# Restore original default

309

if self.original_default:

310

self.original_default.set_as_default_client()

311

312

# Usage

313

temp_client = S3Client(profile_name="temporary-profile")

314

315

with TemporaryClient(temp_client):

316

# Inside context, paths use temporary client

317

path = CloudPath("s3://temp-bucket/file.txt")

318

content = path.read_text()

319

320

# Outside context, original default is restored

321

```

322

323

### Client Health Checks

324

325

```python

326

def check_client_connectivity(client):

327

"""Check if client can connect to cloud service."""

328

try:

329

# Try to list a path (this tests authentication and connectivity)

330

test_path = client.CloudPath("s3://test-bucket/")

331

list(test_path.iterdir())

332

return True, "Connection successful"

333

except Exception as e:

334

return False, str(e)

335

336

# Check all configured clients

337

clients = {

338

"S3": S3Client.get_default_client(),

339

"GCS": GSClient.get_default_client(),

340

"Azure": AzureBlobClient.get_default_client()

341

}

342

343

for name, client in clients.items():

344

if client:

345

is_healthy, message = check_client_connectivity(client)

346

print(f"{name} client: {'✓' if is_healthy else '✗'} {message}")

347

else:

348

print(f"{name} client: Not configured")

349

```

350

351

### Advanced Cache Configuration

352

353

```python

354

import tempfile

355

import shutil

356

from pathlib import Path

357

358

class ManagedCacheClient:

359

"""Client wrapper with advanced cache management."""

360

361

def __init__(self, base_client, max_cache_size_mb=1000):

362

self.base_client = base_client

363

self.max_cache_size_mb = max_cache_size_mb

364

self.cache_dir = Path(tempfile.mkdtemp(prefix="cloudpath_"))

365

366

# Configure client with managed cache directory

367

self.base_client.local_cache_dir = str(self.cache_dir)

368

self.base_client.file_cache_mode = FileCacheMode.persistent

369

370

def get_cache_size_mb(self):

371

"""Get current cache size in MB."""

372

total_size = sum(

373

f.stat().st_size for f in self.cache_dir.rglob('*') if f.is_file()

374

)

375

return total_size / (1024 * 1024)

376

377

def cleanup_old_files(self):

378

"""Remove old cached files if cache is too large."""

379

current_size = self.get_cache_size_mb()

380

381

if current_size <= self.max_cache_size_mb:

382

return

383

384

# Get all cached files with modification times

385

cached_files = [

386

(f, f.stat().st_mtime) for f in self.cache_dir.rglob('*')

387

if f.is_file()

388

]

389

390

# Sort by modification time (oldest first)

391

cached_files.sort(key=lambda x: x[1])

392

393

# Remove files until under size limit

394

for file_path, _ in cached_files:

395

file_path.unlink()

396

current_size = self.get_cache_size_mb()

397

if current_size <= self.max_cache_size_mb:

398

break

399

400

def CloudPath(self, *args, **kwargs):

401

"""Create CloudPath and manage cache."""

402

self.cleanup_old_files()

403

return self.base_client.CloudPath(*args, **kwargs)

404

405

def __del__(self):

406

"""Clean up temporary cache directory."""

407

if self.cache_dir.exists():

408

shutil.rmtree(self.cache_dir)

409

410

# Usage

411

base_s3_client = S3Client(profile_name="default")

412

managed_client = ManagedCacheClient(base_s3_client, max_cache_size_mb=500)

413

414

# Paths automatically benefit from managed caching

415

path = managed_client.CloudPath("s3://large-data-bucket/dataset.csv")

416

data = path.read_text() # Cached with size management

417

```

418

419

### Multi-Region Client Setup

420

421

```python

422

class MultiRegionS3Client:

423

"""Wrapper for managing S3 clients across multiple regions."""

424

425

def __init__(self, regions, credentials):

426

self.clients = {}

427

self.credentials = credentials

428

429

for region in regions:

430

self.clients[region] = S3Client(

431

region_name=region,

432

**credentials

433

)

434

435

def get_client_for_bucket(self, bucket_name):

436

"""Get appropriate client for bucket based on region."""

437

# This would require boto3 to determine bucket region

438

# Simplified example assumes bucket naming convention

439

for region, client in self.clients.items():

440

if region in bucket_name:

441

return client

442

443

# Return default region client

444

return next(iter(self.clients.values()))

445

446

def CloudPath(self, path_str):

447

"""Create CloudPath with region-appropriate client."""

448

# Extract bucket name from path

449

bucket_name = path_str.split('/')[2] # s3://bucket/key

450

client = self.get_client_for_bucket(bucket_name)

451

return client.CloudPath(path_str)

452

453

# Usage

454

multi_region_client = MultiRegionS3Client(

455

regions=["us-east-1", "us-west-2", "eu-west-1"],

456

credentials={

457

"aws_access_key_id": "your-key",

458

"aws_secret_access_key": "your-secret"

459

}

460

)

461

462

# Automatically uses appropriate regional client

463

us_path = multi_region_client.CloudPath("s3://us-east-1-bucket/data.txt")

464

eu_path = multi_region_client.CloudPath("s3://eu-west-1-bucket/data.txt")

465

```

466

467

### Client Monitoring and Metrics

468

469

```python

470

import time

471

from collections import defaultdict

472

473

class MonitoringClient:

474

"""Client wrapper that tracks usage metrics."""

475

476

def __init__(self, base_client):

477

self.base_client = base_client

478

self.metrics = defaultdict(int)

479

self.operation_times = defaultdict(list)

480

481

def CloudPath(self, *args, **kwargs):

482

"""Create monitored CloudPath."""

483

self.metrics["paths_created"] += 1

484

return MonitoredCloudPath(

485

self.base_client.CloudPath(*args, **kwargs),

486

self

487

)

488

489

def record_operation(self, operation, duration):

490

"""Record operation metrics."""

491

self.metrics[f"{operation}_count"] += 1

492

self.operation_times[operation].append(duration)

493

494

def get_metrics(self):

495

"""Get collected metrics."""

496

summary = dict(self.metrics)

497

498

for operation, times in self.operation_times.items():

499

if times:

500

summary[f"{operation}_avg_time"] = sum(times) / len(times)

501

summary[f"{operation}_total_time"] = sum(times)

502

503

return summary

504

505

class MonitoredCloudPath:

506

"""CloudPath wrapper that tracks operations."""

507

508

def __init__(self, path, monitor):

509

self.path = path

510

self.monitor = monitor

511

512

def read_text(self):

513

start_time = time.time()

514

try:

515

result = self.path.read_text()

516

duration = time.time() - start_time

517

self.monitor.record_operation("read_text", duration)

518

return result

519

except Exception:

520

self.monitor.record_operation("read_text_error", 0)

521

raise

522

523

def write_text(self, data):

524

start_time = time.time()

525

try:

526

result = self.path.write_text(data)

527

duration = time.time() - start_time

528

self.monitor.record_operation("write_text", duration)

529

return result

530

except Exception:

531

self.monitor.record_operation("write_text_error", 0)

532

raise

533

534

# Delegate other methods to wrapped path

535

def __getattr__(self, name):

536

return getattr(self.path, name)

537

538

# Usage

539

base_client = S3Client()

540

monitoring_client = MonitoringClient(base_client)

541

542

# All operations are monitored

543

path = monitoring_client.CloudPath("s3://my-bucket/file.txt")

544

path.write_text("Hello, world!")

545

content = path.read_text()

546

547

# Check metrics

548

metrics = monitoring_client.get_metrics()

549

print(f"Operations performed: {metrics}")

550

```