0
# Cloud File Storage
1
2
Cloud-agnostic file storage abstraction using LibCloud to provide consistent API across local filesystem, AWS S3, and other cloud storage providers. The FileStoreFactory enables seamless switching between storage backends with automatic provider detection and configuration management for enterprise data workflows.
3
4
## Capabilities
5
6
### FileStore Factory
7
8
Factory pattern implementation for creating configured cloud storage instances with support for multiple providers including local filesystem, AWS S3, and other LibCloud-supported storage services.
9
10
```python { .api }
11
class FileStoreFactory:
12
"""
13
FileStore abstraction to integrate with cloud storage providers.
14
Creates configured instances of libcloud StorageDriver.
15
16
Class Attributes:
17
- logger - LogManager instance for FileStoreFactory
18
"""
19
20
def __init__(self) -> None:
21
"""Constructor"""
22
...
23
24
@staticmethod
25
def create_file_store(name: str) -> StorageDriver:
26
"""
27
Create and return configured file store instance.
28
29
Parameters:
30
- name: str - Name of the file store configuration
31
32
Returns:
33
StorageDriver - Configured LibCloud storage driver instance
34
"""
35
...
36
37
@staticmethod
38
def create_local_file_store(name: str, filtered, cls) -> StorageDriver:
39
"""
40
Static method for local file store creation.
41
42
Parameters:
43
- name: str - Configuration name
44
- filtered - Filtered configuration parameters
45
- cls - Storage driver class
46
47
Returns:
48
StorageDriver - Local filesystem storage driver
49
"""
50
...
51
52
@staticmethod
53
def create_s3_file_store(name: str, filtered, provider) -> StorageDriver:
54
"""
55
Static method for S3 file store creation.
56
57
Parameters:
58
- name: str - Configuration name
59
- filtered - Filtered configuration parameters
60
- provider - S3 provider configuration
61
62
Returns:
63
StorageDriver - S3 storage driver instance
64
"""
65
...
66
```
67
68
## Usage Examples
69
70
### Basic File Store Creation
71
72
```python
73
from aissemble_core_filestore.file_store_factory import FileStoreFactory
74
from libcloud.storage.types import Provider
75
76
# Create file store using configuration name
77
file_store = FileStoreFactory.create_file_store("my-s3-config")
78
79
# Use LibCloud StorageDriver interface
80
containers = file_store.list_containers()
81
print(f"Available containers: {[c.name for c in containers]}")
82
83
# Get or create container
84
try:
85
container = file_store.get_container("ml-data-bucket")
86
except Exception:
87
container = file_store.create_container("ml-data-bucket")
88
89
# List objects in container
90
objects = file_store.list_container_objects(container)
91
print(f"Objects in container: {[obj.name for obj in objects]}")
92
```
93
94
### File Upload and Download Operations
95
96
```python
97
from aissemble_core_filestore.file_store_factory import FileStoreFactory
98
import os
99
from datetime import datetime
100
101
# Initialize file store
102
file_store = FileStoreFactory.create_file_store("data-lake-storage")
103
104
# Get container for data storage
105
container = file_store.get_container("ml-datasets")
106
107
# Upload local file to cloud storage
108
local_file_path = "/tmp/training_data.csv"
109
remote_object_name = f"datasets/{datetime.now().strftime('%Y/%m/%d')}/training_data.csv"
110
111
with open(local_file_path, 'rb') as file_handle:
112
uploaded_object = file_store.upload_object_via_stream(
113
iterator=file_handle,
114
container=container,
115
object_name=remote_object_name
116
)
117
118
print(f"Uploaded file: {uploaded_object.name}")
119
print(f"File size: {uploaded_object.size} bytes")
120
121
# Download file from cloud storage
122
download_path = "/tmp/downloaded_training_data.csv"
123
downloaded_object = file_store.get_object(container.name, remote_object_name)
124
125
with open(download_path, 'wb') as file_handle:
126
for chunk in file_store.download_object_as_stream(downloaded_object):
127
file_handle.write(chunk)
128
129
print(f"Downloaded file to: {download_path}")
130
131
# Delete object after processing
132
file_store.delete_object(downloaded_object)
133
print(f"Deleted remote object: {remote_object_name}")
134
```
135
136
### Multi-Environment File Store Manager
137
138
```python
139
from aissemble_core_filestore.file_store_factory import FileStoreFactory
140
from typing import Dict, List
141
import json
142
import os
143
144
class MultiEnvironmentFileStoreManager:
145
"""Utility class for managing file stores across environments"""
146
147
def __init__(self):
148
self.file_stores: Dict[str, any] = {}
149
self.load_configurations()
150
151
def load_configurations(self):
152
"""Load file store configurations for different environments"""
153
environments = ["development", "staging", "production"]
154
155
for env in environments:
156
try:
157
store = FileStoreFactory.create_file_store(f"{env}-storage")
158
self.file_stores[env] = store
159
print(f"Loaded {env} file store configuration")
160
except Exception as e:
161
print(f"Could not load {env} configuration: {e}")
162
163
def get_store(self, environment: str):
164
"""Get file store for specific environment"""
165
if environment not in self.file_stores:
166
raise ValueError(f"Environment {environment} not configured")
167
return self.file_stores[environment]
168
169
def sync_files(self, source_env: str, target_env: str, container_name: str, prefix: str = ""):
170
"""Sync files between environments"""
171
source_store = self.get_store(source_env)
172
target_store = self.get_store(target_env)
173
174
# Get containers
175
source_container = source_store.get_container(container_name)
176
try:
177
target_container = target_store.get_container(container_name)
178
except:
179
target_container = target_store.create_container(container_name)
180
181
# List objects with prefix filter
182
objects = source_store.list_container_objects(source_container)
183
if prefix:
184
objects = [obj for obj in objects if obj.name.startswith(prefix)]
185
186
# Sync each object
187
for obj in objects:
188
print(f"Syncing {obj.name}...")
189
190
# Download from source
191
content = b""
192
for chunk in source_store.download_object_as_stream(obj):
193
content += chunk
194
195
# Upload to target
196
target_store.upload_object_via_stream(
197
iterator=iter([content]),
198
container=target_container,
199
object_name=obj.name
200
)
201
202
print(f"Synced {len(objects)} objects from {source_env} to {target_env}")
203
204
def backup_container(self, environment: str, container_name: str, backup_prefix: str):
205
"""Create backup of container with timestamp prefix"""
206
store = self.get_store(environment)
207
container = store.get_container(container_name)
208
209
# Create backup container
210
backup_container_name = f"{container_name}-backup"
211
try:
212
backup_container = store.get_container(backup_container_name)
213
except:
214
backup_container = store.create_container(backup_container_name)
215
216
# Get timestamp for backup
217
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
218
219
# Copy all objects with backup prefix
220
objects = store.list_container_objects(container)
221
for obj in objects:
222
backup_name = f"{backup_prefix}_{timestamp}/{obj.name}"
223
224
# Download original
225
content = b""
226
for chunk in store.download_object_as_stream(obj):
227
content += chunk
228
229
# Upload as backup
230
store.upload_object_via_stream(
231
iterator=iter([content]),
232
container=backup_container,
233
object_name=backup_name
234
)
235
236
print(f"Backed up {len(objects)} objects with prefix {backup_prefix}_{timestamp}")
237
238
# Usage example
239
manager = MultiEnvironmentFileStoreManager()
240
241
# Sync development data to staging
242
manager.sync_files("development", "staging", "ml-datasets", "experiment_001/")
243
244
# Create backup before deployment
245
manager.backup_container("production", "ml-models", "pre_deployment")
246
```
247
248
### Data Pipeline File Processing
249
250
```python
251
from aissemble_core_filestore.file_store_factory import FileStoreFactory
252
import pandas as pd
253
import io
254
from typing import Generator, List
255
256
class DataPipelineFileProcessor:
257
"""File processor for data pipeline workflows"""
258
259
def __init__(self, config_name: str):
260
self.file_store = FileStoreFactory.create_file_store(config_name)
261
self.processing_stats = {
262
"files_processed": 0,
263
"total_size": 0,
264
"errors": []
265
}
266
267
def process_csv_files(self, container_name: str, input_prefix: str, output_prefix: str) -> dict:
268
"""Process CSV files with data transformations"""
269
container = self.file_store.get_container(container_name)
270
objects = self.file_store.list_container_objects(container)
271
272
# Filter for CSV files with input prefix
273
csv_objects = [obj for obj in objects
274
if obj.name.startswith(input_prefix) and obj.name.endswith('.csv')]
275
276
for obj in csv_objects:
277
try:
278
print(f"Processing {obj.name}...")
279
280
# Download CSV data
281
csv_content = b""
282
for chunk in self.file_store.download_object_as_stream(obj):
283
csv_content += chunk
284
285
# Process with pandas
286
df = pd.read_csv(io.BytesIO(csv_content))
287
288
# Apply transformations (example)
289
processed_df = self._apply_transformations(df)
290
291
# Convert back to CSV
292
output_buffer = io.StringIO()
293
processed_df.to_csv(output_buffer, index=False)
294
processed_csv = output_buffer.getvalue().encode('utf-8')
295
296
# Upload processed file
297
output_name = obj.name.replace(input_prefix, output_prefix)
298
self.file_store.upload_object_via_stream(
299
iterator=iter([processed_csv]),
300
container=container,
301
object_name=output_name
302
)
303
304
# Update stats
305
self.processing_stats["files_processed"] += 1
306
self.processing_stats["total_size"] += len(processed_csv)
307
308
print(f"Processed {obj.name} -> {output_name}")
309
310
except Exception as e:
311
error_msg = f"Error processing {obj.name}: {str(e)}"
312
print(error_msg)
313
self.processing_stats["errors"].append(error_msg)
314
315
return self.processing_stats
316
317
def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
318
"""Apply data transformations"""
319
# Example transformations
320
# Remove duplicates
321
df = df.drop_duplicates()
322
323
# Handle missing values
324
df = df.fillna(method='forward')
325
326
# Add processing timestamp
327
df['processed_at'] = pd.Timestamp.now()
328
329
return df
330
331
def batch_upload_directory(self, local_dir: str, container_name: str, remote_prefix: str):
332
"""Upload entire local directory to cloud storage"""
333
container = self.file_store.get_container(container_name)
334
335
for root, dirs, files in os.walk(local_dir):
336
for file in files:
337
local_path = os.path.join(root, file)
338
relative_path = os.path.relpath(local_path, local_dir)
339
remote_path = f"{remote_prefix}/{relative_path}".replace("\\", "/")
340
341
with open(local_path, 'rb') as file_handle:
342
self.file_store.upload_object_via_stream(
343
iterator=file_handle,
344
container=container,
345
object_name=remote_path
346
)
347
348
print(f"Uploaded {local_path} -> {remote_path}")
349
350
def stream_large_file_download(self, container_name: str, object_name: str,
351
local_path: str, chunk_size: int = 8192):
352
"""Download large file in chunks to manage memory"""
353
container = self.file_store.get_container(container_name)
354
obj = self.file_store.get_object(container_name, object_name)
355
356
with open(local_path, 'wb') as file_handle:
357
total_downloaded = 0
358
359
for chunk in self.file_store.download_object_as_stream(obj, chunk_size=chunk_size):
360
file_handle.write(chunk)
361
total_downloaded += len(chunk)
362
363
# Progress indicator for large files
364
if total_downloaded % (chunk_size * 100) == 0:
365
print(f"Downloaded {total_downloaded} bytes...")
366
367
print(f"Completed download: {local_path} ({total_downloaded} bytes)")
368
369
# Usage example
370
processor = DataPipelineFileProcessor("ml-data-lake")
371
372
# Process raw CSV files into cleaned datasets
373
stats = processor.process_csv_files(
374
container_name="raw-data",
375
input_prefix="incoming/2024/09/",
376
output_prefix="processed/2024/09/"
377
)
378
379
print(f"Processing complete: {stats}")
380
381
# Batch upload model artifacts
382
processor.batch_upload_directory(
383
local_dir="/tmp/model_artifacts",
384
container_name="ml-models",
385
remote_prefix="models/fraud_detection_v2.1"
386
)
387
388
# Download large dataset for local processing
389
processor.stream_large_file_download(
390
container_name="datasets",
391
object_name="large_training_set.parquet",
392
local_path="/tmp/training_data.parquet"
393
)
394
```
395
396
### Storage Configuration Manager
397
398
```python
399
from aissemble_core_filestore.file_store_factory import FileStoreFactory
400
from libcloud.storage.types import Provider
401
import json
402
from typing import Dict, Any
403
404
class StorageConfigurationManager:
405
"""Manages storage configurations and provider-specific optimizations"""
406
407
def __init__(self):
408
self.configurations = {}
409
self.provider_settings = {
410
Provider.S3: {
411
"multipart_threshold": 64 * 1024 * 1024, # 64MB
412
"max_concurrency": 10,
413
"region_optimization": True
414
},
415
Provider.LOCAL: {
416
"create_dirs": True,
417
"file_permissions": 0o644
418
}
419
}
420
421
def register_configuration(self, name: str, provider: str, **kwargs):
422
"""Register new storage configuration"""
423
config = {
424
"provider": provider,
425
"settings": kwargs,
426
"optimizations": self.provider_settings.get(provider, {})
427
}
428
self.configurations[name] = config
429
print(f"Registered configuration: {name} ({provider})")
430
431
def create_optimized_store(self, config_name: str):
432
"""Create file store with provider-specific optimizations"""
433
if config_name not in self.configurations:
434
# Fallback to factory default
435
return FileStoreFactory.create_file_store(config_name)
436
437
config = self.configurations[config_name]
438
439
# Apply provider-specific optimizations
440
if config["provider"] == Provider.S3:
441
return self._create_s3_optimized_store(config_name, config)
442
elif config["provider"] == Provider.LOCAL:
443
return self._create_local_optimized_store(config_name, config)
444
else:
445
return FileStoreFactory.create_file_store(config_name)
446
447
def _create_s3_optimized_store(self, name: str, config: Dict[str, Any]):
448
"""Create S3 store with optimizations"""
449
# Apply S3-specific settings
450
optimizations = config["optimizations"]
451
452
# Create store with S3 optimizations
453
store = FileStoreFactory.create_s3_file_store(
454
name,
455
config["settings"],
456
config["provider"]
457
)
458
459
# Apply runtime optimizations
460
if hasattr(store, 'connection'):
461
store.connection.timeout = 30 # Connection timeout
462
463
return store
464
465
def _create_local_optimized_store(self, name: str, config: Dict[str, Any]):
466
"""Create local store with optimizations"""
467
optimizations = config["optimizations"]
468
469
store = FileStoreFactory.create_local_file_store(
470
name,
471
config["settings"],
472
config["provider"]
473
)
474
475
return store
476
477
def validate_configuration(self, config_name: str) -> bool:
478
"""Validate storage configuration by testing connectivity"""
479
try:
480
store = self.create_optimized_store(config_name)
481
482
# Test basic operations
483
containers = store.list_containers()
484
print(f"Configuration {config_name} validated: {len(containers)} containers accessible")
485
return True
486
487
except Exception as e:
488
print(f"Configuration {config_name} validation failed: {e}")
489
return False
490
491
def get_configuration_info(self, config_name: str) -> Dict[str, Any]:
492
"""Get detailed configuration information"""
493
if config_name in self.configurations:
494
return self.configurations[config_name]
495
else:
496
return {"status": "Using factory default configuration"}
497
498
# Usage example
499
config_manager = StorageConfigurationManager()
500
501
# Register custom configurations
502
config_manager.register_configuration(
503
name="high-performance-s3",
504
provider=Provider.S3,
505
region="us-west-2",
506
bucket="ml-data-lake",
507
access_key_id="AKIA...",
508
secret_access_key="SECRET..."
509
)
510
511
config_manager.register_configuration(
512
name="local-dev",
513
provider=Provider.LOCAL,
514
path="/tmp/local_storage"
515
)
516
517
# Create optimized stores
518
s3_store = config_manager.create_optimized_store("high-performance-s3")
519
local_store = config_manager.create_optimized_store("local-dev")
520
521
# Validate configurations
522
config_manager.validate_configuration("high-performance-s3")
523
config_manager.validate_configuration("local-dev")
524
525
# Get configuration details
526
s3_info = config_manager.get_configuration_info("high-performance-s3")
527
print(f"S3 Configuration: {s3_info}")
528
```
529
530
## Best Practices
531
532
### Provider Selection
533
- Use local filesystem for development and testing
534
- Choose S3 for production cloud deployments
535
- Consider provider-specific features and pricing
536
- Implement fallback storage options for reliability
537
538
### Performance Optimization
539
- Use streaming operations for large files
540
- Implement parallel uploads/downloads for batch operations
541
- Configure appropriate chunk sizes for memory efficiency
542
- Monitor storage costs and optimize access patterns
543
544
### Security Considerations
545
- Use IAM roles and policies for cloud storage access
546
- Implement encryption for sensitive data
547
- Regular access audits and credential rotation
548
- Network security for storage endpoints
549
550
### Error Handling and Reliability
551
- Implement retry logic for transient failures
552
- Use exponential backoff for rate-limited operations
553
- Monitor storage health and availability
554
- Implement proper logging for troubleshooting