or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth.mdbom.mdconfig.mdfilestore.mdindex.mdinference.mdmetadata.mdpolicy.md

filestore.mddocs/

0

# Cloud File Storage

1

2

Cloud-agnostic file storage abstraction using LibCloud to provide consistent API across local filesystem, AWS S3, and other cloud storage providers. The FileStoreFactory enables seamless switching between storage backends with automatic provider detection and configuration management for enterprise data workflows.

3

4

## Capabilities

5

6

### FileStore Factory

7

8

Factory pattern implementation for creating configured cloud storage instances with support for multiple providers including local filesystem, AWS S3, and other LibCloud-supported storage services.

9

10

```python { .api }

11

class FileStoreFactory:

12

"""

13

FileStore abstraction to integrate with cloud storage providers.

14

Creates configured instances of libcloud StorageDriver.

15

16

Class Attributes:

17

- logger - LogManager instance for FileStoreFactory

18

"""

19

20

def __init__(self) -> None:

21

"""Constructor"""

22

...

23

24

@staticmethod

25

def create_file_store(name: str) -> StorageDriver:

26

"""

27

Create and return configured file store instance.

28

29

Parameters:

30

- name: str - Name of the file store configuration

31

32

Returns:

33

StorageDriver - Configured LibCloud storage driver instance

34

"""

35

...

36

37

@staticmethod

38

def create_local_file_store(name: str, filtered, cls) -> StorageDriver:

39

"""

40

Static method for local file store creation.

41

42

Parameters:

43

- name: str - Configuration name

44

- filtered - Filtered configuration parameters

45

- cls - Storage driver class

46

47

Returns:

48

StorageDriver - Local filesystem storage driver

49

"""

50

...

51

52

@staticmethod

53

def create_s3_file_store(name: str, filtered, provider) -> StorageDriver:

54

"""

55

Static method for S3 file store creation.

56

57

Parameters:

58

- name: str - Configuration name

59

- filtered - Filtered configuration parameters

60

- provider - S3 provider configuration

61

62

Returns:

63

StorageDriver - S3 storage driver instance

64

"""

65

...

66

```

67

68

## Usage Examples

69

70

### Basic File Store Creation

71

72

```python

73

from aissemble_core_filestore.file_store_factory import FileStoreFactory

74

from libcloud.storage.types import Provider

75

76

# Create file store using configuration name

77

file_store = FileStoreFactory.create_file_store("my-s3-config")

78

79

# Use LibCloud StorageDriver interface

80

containers = file_store.list_containers()

81

print(f"Available containers: {[c.name for c in containers]}")

82

83

# Get or create container

84

try:

85

container = file_store.get_container("ml-data-bucket")

86

except Exception:

87

container = file_store.create_container("ml-data-bucket")

88

89

# List objects in container

90

objects = file_store.list_container_objects(container)

91

print(f"Objects in container: {[obj.name for obj in objects]}")

92

```

93

94

### File Upload and Download Operations

95

96

```python

97

from aissemble_core_filestore.file_store_factory import FileStoreFactory

98

import os

99

from datetime import datetime

100

101

# Initialize file store

102

file_store = FileStoreFactory.create_file_store("data-lake-storage")

103

104

# Get container for data storage

105

container = file_store.get_container("ml-datasets")

106

107

# Upload local file to cloud storage

108

local_file_path = "/tmp/training_data.csv"

109

remote_object_name = f"datasets/{datetime.now().strftime('%Y/%m/%d')}/training_data.csv"

110

111

with open(local_file_path, 'rb') as file_handle:

112

uploaded_object = file_store.upload_object_via_stream(

113

iterator=file_handle,

114

container=container,

115

object_name=remote_object_name

116

)

117

118

print(f"Uploaded file: {uploaded_object.name}")

119

print(f"File size: {uploaded_object.size} bytes")

120

121

# Download file from cloud storage

122

download_path = "/tmp/downloaded_training_data.csv"

123

downloaded_object = file_store.get_object(container.name, remote_object_name)

124

125

with open(download_path, 'wb') as file_handle:

126

for chunk in file_store.download_object_as_stream(downloaded_object):

127

file_handle.write(chunk)

128

129

print(f"Downloaded file to: {download_path}")

130

131

# Delete object after processing

132

file_store.delete_object(downloaded_object)

133

print(f"Deleted remote object: {remote_object_name}")

134

```

135

136

### Multi-Environment File Store Manager

137

138

```python

139

from aissemble_core_filestore.file_store_factory import FileStoreFactory

140

from typing import Dict, List

141

import json

142

import os

143

144

class MultiEnvironmentFileStoreManager:

145

"""Utility class for managing file stores across environments"""

146

147

def __init__(self):

148

self.file_stores: Dict[str, any] = {}

149

self.load_configurations()

150

151

def load_configurations(self):

152

"""Load file store configurations for different environments"""

153

environments = ["development", "staging", "production"]

154

155

for env in environments:

156

try:

157

store = FileStoreFactory.create_file_store(f"{env}-storage")

158

self.file_stores[env] = store

159

print(f"Loaded {env} file store configuration")

160

except Exception as e:

161

print(f"Could not load {env} configuration: {e}")

162

163

def get_store(self, environment: str):

164

"""Get file store for specific environment"""

165

if environment not in self.file_stores:

166

raise ValueError(f"Environment {environment} not configured")

167

return self.file_stores[environment]

168

169

def sync_files(self, source_env: str, target_env: str, container_name: str, prefix: str = ""):

170

"""Sync files between environments"""

171

source_store = self.get_store(source_env)

172

target_store = self.get_store(target_env)

173

174

# Get containers

175

source_container = source_store.get_container(container_name)

176

try:

177

target_container = target_store.get_container(container_name)

178

except:

179

target_container = target_store.create_container(container_name)

180

181

# List objects with prefix filter

182

objects = source_store.list_container_objects(source_container)

183

if prefix:

184

objects = [obj for obj in objects if obj.name.startswith(prefix)]

185

186

# Sync each object

187

for obj in objects:

188

print(f"Syncing {obj.name}...")

189

190

# Download from source

191

content = b""

192

for chunk in source_store.download_object_as_stream(obj):

193

content += chunk

194

195

# Upload to target

196

target_store.upload_object_via_stream(

197

iterator=iter([content]),

198

container=target_container,

199

object_name=obj.name

200

)

201

202

print(f"Synced {len(objects)} objects from {source_env} to {target_env}")

203

204

def backup_container(self, environment: str, container_name: str, backup_prefix: str):

205

"""Create backup of container with timestamp prefix"""

206

store = self.get_store(environment)

207

container = store.get_container(container_name)

208

209

# Create backup container

210

backup_container_name = f"{container_name}-backup"

211

try:

212

backup_container = store.get_container(backup_container_name)

213

except:

214

backup_container = store.create_container(backup_container_name)

215

216

# Get timestamp for backup

217

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

218

219

# Copy all objects with backup prefix

220

objects = store.list_container_objects(container)

221

for obj in objects:

222

backup_name = f"{backup_prefix}_{timestamp}/{obj.name}"

223

224

# Download original

225

content = b""

226

for chunk in store.download_object_as_stream(obj):

227

content += chunk

228

229

# Upload as backup

230

store.upload_object_via_stream(

231

iterator=iter([content]),

232

container=backup_container,

233

object_name=backup_name

234

)

235

236

print(f"Backed up {len(objects)} objects with prefix {backup_prefix}_{timestamp}")

237

238

# Usage example

239

manager = MultiEnvironmentFileStoreManager()

240

241

# Sync development data to staging

242

manager.sync_files("development", "staging", "ml-datasets", "experiment_001/")

243

244

# Create backup before deployment

245

manager.backup_container("production", "ml-models", "pre_deployment")

246

```

247

248

### Data Pipeline File Processing

249

250

```python

251

from aissemble_core_filestore.file_store_factory import FileStoreFactory

252

import pandas as pd

253

import io

254

from typing import Generator, List

255

256

class DataPipelineFileProcessor:

257

"""File processor for data pipeline workflows"""

258

259

def __init__(self, config_name: str):

260

self.file_store = FileStoreFactory.create_file_store(config_name)

261

self.processing_stats = {

262

"files_processed": 0,

263

"total_size": 0,

264

"errors": []

265

}

266

267

def process_csv_files(self, container_name: str, input_prefix: str, output_prefix: str) -> dict:

268

"""Process CSV files with data transformations"""

269

container = self.file_store.get_container(container_name)

270

objects = self.file_store.list_container_objects(container)

271

272

# Filter for CSV files with input prefix

273

csv_objects = [obj for obj in objects

274

if obj.name.startswith(input_prefix) and obj.name.endswith('.csv')]

275

276

for obj in csv_objects:

277

try:

278

print(f"Processing {obj.name}...")

279

280

# Download CSV data

281

csv_content = b""

282

for chunk in self.file_store.download_object_as_stream(obj):

283

csv_content += chunk

284

285

# Process with pandas

286

df = pd.read_csv(io.BytesIO(csv_content))

287

288

# Apply transformations (example)

289

processed_df = self._apply_transformations(df)

290

291

# Convert back to CSV

292

output_buffer = io.StringIO()

293

processed_df.to_csv(output_buffer, index=False)

294

processed_csv = output_buffer.getvalue().encode('utf-8')

295

296

# Upload processed file

297

output_name = obj.name.replace(input_prefix, output_prefix)

298

self.file_store.upload_object_via_stream(

299

iterator=iter([processed_csv]),

300

container=container,

301

object_name=output_name

302

)

303

304

# Update stats

305

self.processing_stats["files_processed"] += 1

306

self.processing_stats["total_size"] += len(processed_csv)

307

308

print(f"Processed {obj.name} -> {output_name}")

309

310

except Exception as e:

311

error_msg = f"Error processing {obj.name}: {str(e)}"

312

print(error_msg)

313

self.processing_stats["errors"].append(error_msg)

314

315

return self.processing_stats

316

317

def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:

318

"""Apply data transformations"""

319

# Example transformations

320

# Remove duplicates

321

df = df.drop_duplicates()

322

323

# Handle missing values

324

df = df.fillna(method='forward')

325

326

# Add processing timestamp

327

df['processed_at'] = pd.Timestamp.now()

328

329

return df

330

331

def batch_upload_directory(self, local_dir: str, container_name: str, remote_prefix: str):

332

"""Upload entire local directory to cloud storage"""

333

container = self.file_store.get_container(container_name)

334

335

for root, dirs, files in os.walk(local_dir):

336

for file in files:

337

local_path = os.path.join(root, file)

338

relative_path = os.path.relpath(local_path, local_dir)

339

remote_path = f"{remote_prefix}/{relative_path}".replace("\\", "/")

340

341

with open(local_path, 'rb') as file_handle:

342

self.file_store.upload_object_via_stream(

343

iterator=file_handle,

344

container=container,

345

object_name=remote_path

346

)

347

348

print(f"Uploaded {local_path} -> {remote_path}")

349

350

def stream_large_file_download(self, container_name: str, object_name: str,

351

local_path: str, chunk_size: int = 8192):

352

"""Download large file in chunks to manage memory"""

353

container = self.file_store.get_container(container_name)

354

obj = self.file_store.get_object(container_name, object_name)

355

356

with open(local_path, 'wb') as file_handle:

357

total_downloaded = 0

358

359

for chunk in self.file_store.download_object_as_stream(obj, chunk_size=chunk_size):

360

file_handle.write(chunk)

361

total_downloaded += len(chunk)

362

363

# Progress indicator for large files

364

if total_downloaded % (chunk_size * 100) == 0:

365

print(f"Downloaded {total_downloaded} bytes...")

366

367

print(f"Completed download: {local_path} ({total_downloaded} bytes)")

368

369

# Usage example

370

processor = DataPipelineFileProcessor("ml-data-lake")

371

372

# Process raw CSV files into cleaned datasets

373

stats = processor.process_csv_files(

374

container_name="raw-data",

375

input_prefix="incoming/2024/09/",

376

output_prefix="processed/2024/09/"

377

)

378

379

print(f"Processing complete: {stats}")

380

381

# Batch upload model artifacts

382

processor.batch_upload_directory(

383

local_dir="/tmp/model_artifacts",

384

container_name="ml-models",

385

remote_prefix="models/fraud_detection_v2.1"

386

)

387

388

# Download large dataset for local processing

389

processor.stream_large_file_download(

390

container_name="datasets",

391

object_name="large_training_set.parquet",

392

local_path="/tmp/training_data.parquet"

393

)

394

```

395

396

### Storage Configuration Manager

397

398

```python

399

from aissemble_core_filestore.file_store_factory import FileStoreFactory

400

from libcloud.storage.types import Provider

401

import json

402

from typing import Dict, Any

403

404

class StorageConfigurationManager:

405

"""Manages storage configurations and provider-specific optimizations"""

406

407

def __init__(self):

408

self.configurations = {}

409

self.provider_settings = {

410

Provider.S3: {

411

"multipart_threshold": 64 * 1024 * 1024, # 64MB

412

"max_concurrency": 10,

413

"region_optimization": True

414

},

415

Provider.LOCAL: {

416

"create_dirs": True,

417

"file_permissions": 0o644

418

}

419

}

420

421

def register_configuration(self, name: str, provider: str, **kwargs):

422

"""Register new storage configuration"""

423

config = {

424

"provider": provider,

425

"settings": kwargs,

426

"optimizations": self.provider_settings.get(provider, {})

427

}

428

self.configurations[name] = config

429

print(f"Registered configuration: {name} ({provider})")

430

431

def create_optimized_store(self, config_name: str):

432

"""Create file store with provider-specific optimizations"""

433

if config_name not in self.configurations:

434

# Fallback to factory default

435

return FileStoreFactory.create_file_store(config_name)

436

437

config = self.configurations[config_name]

438

439

# Apply provider-specific optimizations

440

if config["provider"] == Provider.S3:

441

return self._create_s3_optimized_store(config_name, config)

442

elif config["provider"] == Provider.LOCAL:

443

return self._create_local_optimized_store(config_name, config)

444

else:

445

return FileStoreFactory.create_file_store(config_name)

446

447

def _create_s3_optimized_store(self, name: str, config: Dict[str, Any]):

448

"""Create S3 store with optimizations"""

449

# Apply S3-specific settings

450

optimizations = config["optimizations"]

451

452

# Create store with S3 optimizations

453

store = FileStoreFactory.create_s3_file_store(

454

name,

455

config["settings"],

456

config["provider"]

457

)

458

459

# Apply runtime optimizations

460

if hasattr(store, 'connection'):

461

store.connection.timeout = 30 # Connection timeout

462

463

return store

464

465

def _create_local_optimized_store(self, name: str, config: Dict[str, Any]):

466

"""Create local store with optimizations"""

467

optimizations = config["optimizations"]

468

469

store = FileStoreFactory.create_local_file_store(

470

name,

471

config["settings"],

472

config["provider"]

473

)

474

475

return store

476

477

def validate_configuration(self, config_name: str) -> bool:

478

"""Validate storage configuration by testing connectivity"""

479

try:

480

store = self.create_optimized_store(config_name)

481

482

# Test basic operations

483

containers = store.list_containers()

484

print(f"Configuration {config_name} validated: {len(containers)} containers accessible")

485

return True

486

487

except Exception as e:

488

print(f"Configuration {config_name} validation failed: {e}")

489

return False

490

491

def get_configuration_info(self, config_name: str) -> Dict[str, Any]:

492

"""Get detailed configuration information"""

493

if config_name in self.configurations:

494

return self.configurations[config_name]

495

else:

496

return {"status": "Using factory default configuration"}

497

498

# Usage example

499

config_manager = StorageConfigurationManager()

500

501

# Register custom configurations

502

config_manager.register_configuration(

503

name="high-performance-s3",

504

provider=Provider.S3,

505

region="us-west-2",

506

bucket="ml-data-lake",

507

access_key_id="AKIA...",

508

secret_access_key="SECRET..."

509

)

510

511

config_manager.register_configuration(

512

name="local-dev",

513

provider=Provider.LOCAL,

514

path="/tmp/local_storage"

515

)

516

517

# Create optimized stores

518

s3_store = config_manager.create_optimized_store("high-performance-s3")

519

local_store = config_manager.create_optimized_store("local-dev")

520

521

# Validate configurations

522

config_manager.validate_configuration("high-performance-s3")

523

config_manager.validate_configuration("local-dev")

524

525

# Get configuration details

526

s3_info = config_manager.get_configuration_info("high-performance-s3")

527

print(f"S3 Configuration: {s3_info}")

528

```

529

530

## Best Practices

531

532

### Provider Selection

533

- Use local filesystem for development and testing

534

- Choose S3 for production cloud deployments

535

- Consider provider-specific features and pricing

536

- Implement fallback storage options for reliability

537

538

### Performance Optimization

539

- Use streaming operations for large files

540

- Implement parallel uploads/downloads for batch operations

541

- Configure appropriate chunk sizes for memory efficiency

542

- Monitor storage costs and optimize access patterns

543

544

### Security Considerations

545

- Use IAM roles and policies for cloud storage access

546

- Implement encryption for sensitive data

547

- Regular access audits and credential rotation

548

- Network security for storage endpoints

549

550

### Error Handling and Reliability

551

- Implement retry logic for transient failures

552

- Use exponential backoff for rate-limited operations

553

- Monitor storage health and availability

554

- Implement proper logging for troubleshooting