Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.
—
Cloud-agnostic file storage abstraction using LibCloud to provide consistent API across local filesystem, AWS S3, and other cloud storage providers. The FileStoreFactory enables seamless switching between storage backends with automatic provider detection and configuration management for enterprise data workflows.
Factory pattern implementation for creating configured cloud storage instances with support for multiple providers including local filesystem, AWS S3, and other LibCloud-supported storage services.
class FileStoreFactory:
"""
FileStore abstraction to integrate with cloud storage providers.
Creates configured instances of libcloud StorageDriver.
Class Attributes:
- logger - LogManager instance for FileStoreFactory
"""
def __init__(self) -> None:
"""Constructor"""
...
@staticmethod
def create_file_store(name: str) -> StorageDriver:
"""
Create and return configured file store instance.
Parameters:
- name: str - Name of the file store configuration
Returns:
StorageDriver - Configured LibCloud storage driver instance
"""
...
@staticmethod
def create_local_file_store(name: str, filtered, cls) -> StorageDriver:
"""
Static method for local file store creation.
Parameters:
- name: str - Configuration name
- filtered - Filtered configuration parameters
- cls - Storage driver class
Returns:
StorageDriver - Local filesystem storage driver
"""
...
@staticmethod
def create_s3_file_store(name: str, filtered, provider) -> StorageDriver:
"""
Static method for S3 file store creation.
Parameters:
- name: str - Configuration name
- filtered - Filtered configuration parameters
- provider - S3 provider configuration
Returns:
StorageDriver - S3 storage driver instance
"""
...from aissemble_core_filestore.file_store_factory import FileStoreFactory
from libcloud.storage.types import Provider
# Create file store using configuration name
file_store = FileStoreFactory.create_file_store("my-s3-config")
# Use LibCloud StorageDriver interface
containers = file_store.list_containers()
print(f"Available containers: {[c.name for c in containers]}")
# Get or create container
try:
container = file_store.get_container("ml-data-bucket")
except Exception:
container = file_store.create_container("ml-data-bucket")
# List objects in container
objects = file_store.list_container_objects(container)
print(f"Objects in container: {[obj.name for obj in objects]}")from aissemble_core_filestore.file_store_factory import FileStoreFactory
import os
from datetime import datetime
# Initialize file store
file_store = FileStoreFactory.create_file_store("data-lake-storage")
# Get container for data storage
container = file_store.get_container("ml-datasets")
# Upload local file to cloud storage
local_file_path = "/tmp/training_data.csv"
remote_object_name = f"datasets/{datetime.now().strftime('%Y/%m/%d')}/training_data.csv"
with open(local_file_path, 'rb') as file_handle:
uploaded_object = file_store.upload_object_via_stream(
iterator=file_handle,
container=container,
object_name=remote_object_name
)
print(f"Uploaded file: {uploaded_object.name}")
print(f"File size: {uploaded_object.size} bytes")
# Download file from cloud storage
download_path = "/tmp/downloaded_training_data.csv"
downloaded_object = file_store.get_object(container.name, remote_object_name)
with open(download_path, 'wb') as file_handle:
for chunk in file_store.download_object_as_stream(downloaded_object):
file_handle.write(chunk)
print(f"Downloaded file to: {download_path}")
# Delete object after processing
file_store.delete_object(downloaded_object)
print(f"Deleted remote object: {remote_object_name}")from aissemble_core_filestore.file_store_factory import FileStoreFactory
from typing import Dict, List
import json
import os
class MultiEnvironmentFileStoreManager:
"""Utility class for managing file stores across environments"""
def __init__(self):
self.file_stores: Dict[str, any] = {}
self.load_configurations()
def load_configurations(self):
"""Load file store configurations for different environments"""
environments = ["development", "staging", "production"]
for env in environments:
try:
store = FileStoreFactory.create_file_store(f"{env}-storage")
self.file_stores[env] = store
print(f"Loaded {env} file store configuration")
except Exception as e:
print(f"Could not load {env} configuration: {e}")
def get_store(self, environment: str):
"""Get file store for specific environment"""
if environment not in self.file_stores:
raise ValueError(f"Environment {environment} not configured")
return self.file_stores[environment]
def sync_files(self, source_env: str, target_env: str, container_name: str, prefix: str = ""):
"""Sync files between environments"""
source_store = self.get_store(source_env)
target_store = self.get_store(target_env)
# Get containers
source_container = source_store.get_container(container_name)
try:
target_container = target_store.get_container(container_name)
except:
target_container = target_store.create_container(container_name)
# List objects with prefix filter
objects = source_store.list_container_objects(source_container)
if prefix:
objects = [obj for obj in objects if obj.name.startswith(prefix)]
# Sync each object
for obj in objects:
print(f"Syncing {obj.name}...")
# Download from source
content = b""
for chunk in source_store.download_object_as_stream(obj):
content += chunk
# Upload to target
target_store.upload_object_via_stream(
iterator=iter([content]),
container=target_container,
object_name=obj.name
)
print(f"Synced {len(objects)} objects from {source_env} to {target_env}")
def backup_container(self, environment: str, container_name: str, backup_prefix: str):
"""Create backup of container with timestamp prefix"""
store = self.get_store(environment)
container = store.get_container(container_name)
# Create backup container
backup_container_name = f"{container_name}-backup"
try:
backup_container = store.get_container(backup_container_name)
except:
backup_container = store.create_container(backup_container_name)
# Get timestamp for backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Copy all objects with backup prefix
objects = store.list_container_objects(container)
for obj in objects:
backup_name = f"{backup_prefix}_{timestamp}/{obj.name}"
# Download original
content = b""
for chunk in store.download_object_as_stream(obj):
content += chunk
# Upload as backup
store.upload_object_via_stream(
iterator=iter([content]),
container=backup_container,
object_name=backup_name
)
print(f"Backed up {len(objects)} objects with prefix {backup_prefix}_{timestamp}")
# Usage example
manager = MultiEnvironmentFileStoreManager()
# Sync development data to staging
manager.sync_files("development", "staging", "ml-datasets", "experiment_001/")
# Create backup before deployment
manager.backup_container("production", "ml-models", "pre_deployment")from aissemble_core_filestore.file_store_factory import FileStoreFactory
import pandas as pd
import io
from typing import Generator, List
class DataPipelineFileProcessor:
"""File processor for data pipeline workflows"""
def __init__(self, config_name: str):
self.file_store = FileStoreFactory.create_file_store(config_name)
self.processing_stats = {
"files_processed": 0,
"total_size": 0,
"errors": []
}
def process_csv_files(self, container_name: str, input_prefix: str, output_prefix: str) -> dict:
"""Process CSV files with data transformations"""
container = self.file_store.get_container(container_name)
objects = self.file_store.list_container_objects(container)
# Filter for CSV files with input prefix
csv_objects = [obj for obj in objects
if obj.name.startswith(input_prefix) and obj.name.endswith('.csv')]
for obj in csv_objects:
try:
print(f"Processing {obj.name}...")
# Download CSV data
csv_content = b""
for chunk in self.file_store.download_object_as_stream(obj):
csv_content += chunk
# Process with pandas
df = pd.read_csv(io.BytesIO(csv_content))
# Apply transformations (example)
processed_df = self._apply_transformations(df)
# Convert back to CSV
output_buffer = io.StringIO()
processed_df.to_csv(output_buffer, index=False)
processed_csv = output_buffer.getvalue().encode('utf-8')
# Upload processed file
output_name = obj.name.replace(input_prefix, output_prefix)
self.file_store.upload_object_via_stream(
iterator=iter([processed_csv]),
container=container,
object_name=output_name
)
# Update stats
self.processing_stats["files_processed"] += 1
self.processing_stats["total_size"] += len(processed_csv)
print(f"Processed {obj.name} -> {output_name}")
except Exception as e:
error_msg = f"Error processing {obj.name}: {str(e)}"
print(error_msg)
self.processing_stats["errors"].append(error_msg)
return self.processing_stats
def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply data transformations"""
# Example transformations
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
df = df.fillna(method='forward')
# Add processing timestamp
df['processed_at'] = pd.Timestamp.now()
return df
def batch_upload_directory(self, local_dir: str, container_name: str, remote_prefix: str):
"""Upload entire local directory to cloud storage"""
container = self.file_store.get_container(container_name)
for root, dirs, files in os.walk(local_dir):
for file in files:
local_path = os.path.join(root, file)
relative_path = os.path.relpath(local_path, local_dir)
remote_path = f"{remote_prefix}/{relative_path}".replace("\\", "/")
with open(local_path, 'rb') as file_handle:
self.file_store.upload_object_via_stream(
iterator=file_handle,
container=container,
object_name=remote_path
)
print(f"Uploaded {local_path} -> {remote_path}")
def stream_large_file_download(self, container_name: str, object_name: str,
local_path: str, chunk_size: int = 8192):
"""Download large file in chunks to manage memory"""
container = self.file_store.get_container(container_name)
obj = self.file_store.get_object(container_name, object_name)
with open(local_path, 'wb') as file_handle:
total_downloaded = 0
for chunk in self.file_store.download_object_as_stream(obj, chunk_size=chunk_size):
file_handle.write(chunk)
total_downloaded += len(chunk)
# Progress indicator for large files
if total_downloaded % (chunk_size * 100) == 0:
print(f"Downloaded {total_downloaded} bytes...")
print(f"Completed download: {local_path} ({total_downloaded} bytes)")
# Usage example
processor = DataPipelineFileProcessor("ml-data-lake")
# Process raw CSV files into cleaned datasets
stats = processor.process_csv_files(
container_name="raw-data",
input_prefix="incoming/2024/09/",
output_prefix="processed/2024/09/"
)
print(f"Processing complete: {stats}")
# Batch upload model artifacts
processor.batch_upload_directory(
local_dir="/tmp/model_artifacts",
container_name="ml-models",
remote_prefix="models/fraud_detection_v2.1"
)
# Download large dataset for local processing
processor.stream_large_file_download(
container_name="datasets",
object_name="large_training_set.parquet",
local_path="/tmp/training_data.parquet"
)from aissemble_core_filestore.file_store_factory import FileStoreFactory
from libcloud.storage.types import Provider
import json
from typing import Dict, Any
class StorageConfigurationManager:
"""Manages storage configurations and provider-specific optimizations"""
def __init__(self):
self.configurations = {}
self.provider_settings = {
Provider.S3: {
"multipart_threshold": 64 * 1024 * 1024, # 64MB
"max_concurrency": 10,
"region_optimization": True
},
Provider.LOCAL: {
"create_dirs": True,
"file_permissions": 0o644
}
}
def register_configuration(self, name: str, provider: str, **kwargs):
"""Register new storage configuration"""
config = {
"provider": provider,
"settings": kwargs,
"optimizations": self.provider_settings.get(provider, {})
}
self.configurations[name] = config
print(f"Registered configuration: {name} ({provider})")
def create_optimized_store(self, config_name: str):
"""Create file store with provider-specific optimizations"""
if config_name not in self.configurations:
# Fallback to factory default
return FileStoreFactory.create_file_store(config_name)
config = self.configurations[config_name]
# Apply provider-specific optimizations
if config["provider"] == Provider.S3:
return self._create_s3_optimized_store(config_name, config)
elif config["provider"] == Provider.LOCAL:
return self._create_local_optimized_store(config_name, config)
else:
return FileStoreFactory.create_file_store(config_name)
def _create_s3_optimized_store(self, name: str, config: Dict[str, Any]):
"""Create S3 store with optimizations"""
# Apply S3-specific settings
optimizations = config["optimizations"]
# Create store with S3 optimizations
store = FileStoreFactory.create_s3_file_store(
name,
config["settings"],
config["provider"]
)
# Apply runtime optimizations
if hasattr(store, 'connection'):
store.connection.timeout = 30 # Connection timeout
return store
def _create_local_optimized_store(self, name: str, config: Dict[str, Any]):
"""Create local store with optimizations"""
optimizations = config["optimizations"]
store = FileStoreFactory.create_local_file_store(
name,
config["settings"],
config["provider"]
)
return store
def validate_configuration(self, config_name: str) -> bool:
"""Validate storage configuration by testing connectivity"""
try:
store = self.create_optimized_store(config_name)
# Test basic operations
containers = store.list_containers()
print(f"Configuration {config_name} validated: {len(containers)} containers accessible")
return True
except Exception as e:
print(f"Configuration {config_name} validation failed: {e}")
return False
def get_configuration_info(self, config_name: str) -> Dict[str, Any]:
"""Get detailed configuration information"""
if config_name in self.configurations:
return self.configurations[config_name]
else:
return {"status": "Using factory default configuration"}
# Usage example
config_manager = StorageConfigurationManager()
# Register custom configurations
config_manager.register_configuration(
name="high-performance-s3",
provider=Provider.S3,
region="us-west-2",
bucket="ml-data-lake",
access_key_id="AKIA...",
secret_access_key="SECRET..."
)
config_manager.register_configuration(
name="local-dev",
provider=Provider.LOCAL,
path="/tmp/local_storage"
)
# Create optimized stores
s3_store = config_manager.create_optimized_store("high-performance-s3")
local_store = config_manager.create_optimized_store("local-dev")
# Validate configurations
config_manager.validate_configuration("high-performance-s3")
config_manager.validate_configuration("local-dev")
# Get configuration details
s3_info = config_manager.get_configuration_info("high-performance-s3")
print(f"S3 Configuration: {s3_info}")Install with Tessl CLI
npx tessl i tessl/pypi-aissemble-foundation-core-python