CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Pending
Overview
Eval results
Files

filestore.mddocs/

Cloud File Storage

Cloud-agnostic file storage abstraction using LibCloud to provide consistent API across local filesystem, AWS S3, and other cloud storage providers. The FileStoreFactory enables seamless switching between storage backends with automatic provider detection and configuration management for enterprise data workflows.

Capabilities

FileStore Factory

Factory pattern implementation for creating configured cloud storage instances with support for multiple providers including local filesystem, AWS S3, and other LibCloud-supported storage services.

class FileStoreFactory:
    """
    FileStore abstraction to integrate with cloud storage providers. 
    Creates configured instances of libcloud StorageDriver.
    
    Class Attributes:
    - logger - LogManager instance for FileStoreFactory
    """
    
    def __init__(self) -> None:
        """Constructor"""
        ...
    
    @staticmethod
    def create_file_store(name: str) -> StorageDriver:
        """
        Create and return configured file store instance.
        
        Parameters:
        - name: str - Name of the file store configuration
        
        Returns:
        StorageDriver - Configured LibCloud storage driver instance
        """
        ...
    
    @staticmethod
    def create_local_file_store(name: str, filtered, cls) -> StorageDriver:
        """
        Static method for local file store creation.
        
        Parameters:
        - name: str - Configuration name
        - filtered - Filtered configuration parameters
        - cls - Storage driver class
        
        Returns:
        StorageDriver - Local filesystem storage driver
        """
        ...
    
    @staticmethod
    def create_s3_file_store(name: str, filtered, provider) -> StorageDriver:
        """
        Static method for S3 file store creation.
        
        Parameters:
        - name: str - Configuration name
        - filtered - Filtered configuration parameters  
        - provider - S3 provider configuration
        
        Returns:
        StorageDriver - S3 storage driver instance
        """
        ...

Usage Examples

Basic File Store Creation

from aissemble_core_filestore.file_store_factory import FileStoreFactory
from libcloud.storage.types import Provider

# Create file store using configuration name
file_store = FileStoreFactory.create_file_store("my-s3-config")

# Use LibCloud StorageDriver interface
containers = file_store.list_containers()
print(f"Available containers: {[c.name for c in containers]}")

# Get or create container
try:
    container = file_store.get_container("ml-data-bucket")
except Exception:
    container = file_store.create_container("ml-data-bucket")

# List objects in container
objects = file_store.list_container_objects(container)
print(f"Objects in container: {[obj.name for obj in objects]}")

File Upload and Download Operations

from aissemble_core_filestore.file_store_factory import FileStoreFactory
import os
from datetime import datetime

# Initialize file store
file_store = FileStoreFactory.create_file_store("data-lake-storage")

# Get container for data storage
container = file_store.get_container("ml-datasets")

# Upload local file to cloud storage
local_file_path = "/tmp/training_data.csv"
remote_object_name = f"datasets/{datetime.now().strftime('%Y/%m/%d')}/training_data.csv"

with open(local_file_path, 'rb') as file_handle:
    uploaded_object = file_store.upload_object_via_stream(
        iterator=file_handle,
        container=container,
        object_name=remote_object_name
    )

print(f"Uploaded file: {uploaded_object.name}")
print(f"File size: {uploaded_object.size} bytes")

# Download file from cloud storage
download_path = "/tmp/downloaded_training_data.csv"
downloaded_object = file_store.get_object(container.name, remote_object_name)

with open(download_path, 'wb') as file_handle:
    for chunk in file_store.download_object_as_stream(downloaded_object):
        file_handle.write(chunk)

print(f"Downloaded file to: {download_path}")

# Delete object after processing
file_store.delete_object(downloaded_object)
print(f"Deleted remote object: {remote_object_name}")

Multi-Environment File Store Manager

from aissemble_core_filestore.file_store_factory import FileStoreFactory
from typing import Dict, List
import json
import os

class MultiEnvironmentFileStoreManager:
    """Utility class for managing file stores across environments"""
    
    def __init__(self):
        self.file_stores: Dict[str, any] = {}
        self.load_configurations()
    
    def load_configurations(self):
        """Load file store configurations for different environments"""
        environments = ["development", "staging", "production"]
        
        for env in environments:
            try:
                store = FileStoreFactory.create_file_store(f"{env}-storage")
                self.file_stores[env] = store
                print(f"Loaded {env} file store configuration")
            except Exception as e:
                print(f"Could not load {env} configuration: {e}")
    
    def get_store(self, environment: str):
        """Get file store for specific environment"""
        if environment not in self.file_stores:
            raise ValueError(f"Environment {environment} not configured")
        return self.file_stores[environment]
    
    def sync_files(self, source_env: str, target_env: str, container_name: str, prefix: str = ""):
        """Sync files between environments"""
        source_store = self.get_store(source_env)
        target_store = self.get_store(target_env)
        
        # Get containers
        source_container = source_store.get_container(container_name)
        try:
            target_container = target_store.get_container(container_name)
        except:
            target_container = target_store.create_container(container_name)
        
        # List objects with prefix filter
        objects = source_store.list_container_objects(source_container)
        if prefix:
            objects = [obj for obj in objects if obj.name.startswith(prefix)]
        
        # Sync each object
        for obj in objects:
            print(f"Syncing {obj.name}...")
            
            # Download from source
            content = b""
            for chunk in source_store.download_object_as_stream(obj):
                content += chunk
            
            # Upload to target
            target_store.upload_object_via_stream(
                iterator=iter([content]),
                container=target_container,
                object_name=obj.name
            )
        
        print(f"Synced {len(objects)} objects from {source_env} to {target_env}")
    
    def backup_container(self, environment: str, container_name: str, backup_prefix: str):
        """Create backup of container with timestamp prefix"""
        store = self.get_store(environment)
        container = store.get_container(container_name)
        
        # Create backup container
        backup_container_name = f"{container_name}-backup"
        try:
            backup_container = store.get_container(backup_container_name)
        except:
            backup_container = store.create_container(backup_container_name)
        
        # Get timestamp for backup
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # Copy all objects with backup prefix
        objects = store.list_container_objects(container)
        for obj in objects:
            backup_name = f"{backup_prefix}_{timestamp}/{obj.name}"
            
            # Download original
            content = b""
            for chunk in store.download_object_as_stream(obj):
                content += chunk
            
            # Upload as backup
            store.upload_object_via_stream(
                iterator=iter([content]),
                container=backup_container,
                object_name=backup_name
            )
        
        print(f"Backed up {len(objects)} objects with prefix {backup_prefix}_{timestamp}")

# Usage example
manager = MultiEnvironmentFileStoreManager()

# Sync development data to staging
manager.sync_files("development", "staging", "ml-datasets", "experiment_001/")

# Create backup before deployment
manager.backup_container("production", "ml-models", "pre_deployment")

Data Pipeline File Processing

from aissemble_core_filestore.file_store_factory import FileStoreFactory
import pandas as pd
import io
from typing import Generator, List

class DataPipelineFileProcessor:
    """File processor for data pipeline workflows"""
    
    def __init__(self, config_name: str):
        self.file_store = FileStoreFactory.create_file_store(config_name)
        self.processing_stats = {
            "files_processed": 0,
            "total_size": 0,
            "errors": []
        }
    
    def process_csv_files(self, container_name: str, input_prefix: str, output_prefix: str) -> dict:
        """Process CSV files with data transformations"""
        container = self.file_store.get_container(container_name)
        objects = self.file_store.list_container_objects(container)
        
        # Filter for CSV files with input prefix
        csv_objects = [obj for obj in objects 
                      if obj.name.startswith(input_prefix) and obj.name.endswith('.csv')]
        
        for obj in csv_objects:
            try:
                print(f"Processing {obj.name}...")
                
                # Download CSV data
                csv_content = b""
                for chunk in self.file_store.download_object_as_stream(obj):
                    csv_content += chunk
                
                # Process with pandas
                df = pd.read_csv(io.BytesIO(csv_content))
                
                # Apply transformations (example)
                processed_df = self._apply_transformations(df)
                
                # Convert back to CSV
                output_buffer = io.StringIO()
                processed_df.to_csv(output_buffer, index=False)
                processed_csv = output_buffer.getvalue().encode('utf-8')
                
                # Upload processed file
                output_name = obj.name.replace(input_prefix, output_prefix)
                self.file_store.upload_object_via_stream(
                    iterator=iter([processed_csv]),
                    container=container,
                    object_name=output_name
                )
                
                # Update stats
                self.processing_stats["files_processed"] += 1
                self.processing_stats["total_size"] += len(processed_csv)
                
                print(f"Processed {obj.name} -> {output_name}")
                
            except Exception as e:
                error_msg = f"Error processing {obj.name}: {str(e)}"
                print(error_msg)
                self.processing_stats["errors"].append(error_msg)
        
        return self.processing_stats
    
    def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply data transformations"""
        # Example transformations
        # Remove duplicates
        df = df.drop_duplicates()
        
        # Handle missing values
        df = df.fillna(method='forward')
        
        # Add processing timestamp
        df['processed_at'] = pd.Timestamp.now()
        
        return df
    
    def batch_upload_directory(self, local_dir: str, container_name: str, remote_prefix: str):
        """Upload entire local directory to cloud storage"""
        container = self.file_store.get_container(container_name)
        
        for root, dirs, files in os.walk(local_dir):
            for file in files:
                local_path = os.path.join(root, file)
                relative_path = os.path.relpath(local_path, local_dir)
                remote_path = f"{remote_prefix}/{relative_path}".replace("\\", "/")
                
                with open(local_path, 'rb') as file_handle:
                    self.file_store.upload_object_via_stream(
                        iterator=file_handle,
                        container=container,
                        object_name=remote_path
                    )
                
                print(f"Uploaded {local_path} -> {remote_path}")
    
    def stream_large_file_download(self, container_name: str, object_name: str, 
                                 local_path: str, chunk_size: int = 8192):
        """Download large file in chunks to manage memory"""
        container = self.file_store.get_container(container_name)
        obj = self.file_store.get_object(container_name, object_name)
        
        with open(local_path, 'wb') as file_handle:
            total_downloaded = 0
            
            for chunk in self.file_store.download_object_as_stream(obj, chunk_size=chunk_size):
                file_handle.write(chunk)
                total_downloaded += len(chunk)
                
                # Progress indicator for large files
                if total_downloaded % (chunk_size * 100) == 0:
                    print(f"Downloaded {total_downloaded} bytes...")
        
        print(f"Completed download: {local_path} ({total_downloaded} bytes)")

# Usage example
processor = DataPipelineFileProcessor("ml-data-lake")

# Process raw CSV files into cleaned datasets
stats = processor.process_csv_files(
    container_name="raw-data",
    input_prefix="incoming/2024/09/",
    output_prefix="processed/2024/09/"
)

print(f"Processing complete: {stats}")

# Batch upload model artifacts
processor.batch_upload_directory(
    local_dir="/tmp/model_artifacts",
    container_name="ml-models",
    remote_prefix="models/fraud_detection_v2.1"
)

# Download large dataset for local processing
processor.stream_large_file_download(
    container_name="datasets",
    object_name="large_training_set.parquet",
    local_path="/tmp/training_data.parquet"
)

Storage Configuration Manager

from aissemble_core_filestore.file_store_factory import FileStoreFactory
from libcloud.storage.types import Provider
import json
from typing import Dict, Any

class StorageConfigurationManager:
    """Manages storage configurations and provider-specific optimizations"""
    
    def __init__(self):
        self.configurations = {}
        self.provider_settings = {
            Provider.S3: {
                "multipart_threshold": 64 * 1024 * 1024,  # 64MB
                "max_concurrency": 10,
                "region_optimization": True
            },
            Provider.LOCAL: {
                "create_dirs": True,
                "file_permissions": 0o644
            }
        }
    
    def register_configuration(self, name: str, provider: str, **kwargs):
        """Register new storage configuration"""
        config = {
            "provider": provider,
            "settings": kwargs,
            "optimizations": self.provider_settings.get(provider, {})
        }
        self.configurations[name] = config
        print(f"Registered configuration: {name} ({provider})")
    
    def create_optimized_store(self, config_name: str):
        """Create file store with provider-specific optimizations"""
        if config_name not in self.configurations:
            # Fallback to factory default
            return FileStoreFactory.create_file_store(config_name)
        
        config = self.configurations[config_name]
        
        # Apply provider-specific optimizations
        if config["provider"] == Provider.S3:
            return self._create_s3_optimized_store(config_name, config)
        elif config["provider"] == Provider.LOCAL:
            return self._create_local_optimized_store(config_name, config)
        else:
            return FileStoreFactory.create_file_store(config_name)
    
    def _create_s3_optimized_store(self, name: str, config: Dict[str, Any]):
        """Create S3 store with optimizations"""
        # Apply S3-specific settings
        optimizations = config["optimizations"]
        
        # Create store with S3 optimizations
        store = FileStoreFactory.create_s3_file_store(
            name, 
            config["settings"], 
            config["provider"]
        )
        
        # Apply runtime optimizations
        if hasattr(store, 'connection'):
            store.connection.timeout = 30  # Connection timeout
            
        return store
    
    def _create_local_optimized_store(self, name: str, config: Dict[str, Any]):
        """Create local store with optimizations"""
        optimizations = config["optimizations"]
        
        store = FileStoreFactory.create_local_file_store(
            name,
            config["settings"],
            config["provider"]
        )
        
        return store
    
    def validate_configuration(self, config_name: str) -> bool:
        """Validate storage configuration by testing connectivity"""
        try:
            store = self.create_optimized_store(config_name)
            
            # Test basic operations
            containers = store.list_containers()
            print(f"Configuration {config_name} validated: {len(containers)} containers accessible")
            return True
            
        except Exception as e:
            print(f"Configuration {config_name} validation failed: {e}")
            return False
    
    def get_configuration_info(self, config_name: str) -> Dict[str, Any]:
        """Get detailed configuration information"""
        if config_name in self.configurations:
            return self.configurations[config_name]
        else:
            return {"status": "Using factory default configuration"}

# Usage example
config_manager = StorageConfigurationManager()

# Register custom configurations
config_manager.register_configuration(
    name="high-performance-s3",
    provider=Provider.S3,
    region="us-west-2",
    bucket="ml-data-lake",
    access_key_id="AKIA...",
    secret_access_key="SECRET..."
)

config_manager.register_configuration(
    name="local-dev",
    provider=Provider.LOCAL,
    path="/tmp/local_storage"
)

# Create optimized stores
s3_store = config_manager.create_optimized_store("high-performance-s3")
local_store = config_manager.create_optimized_store("local-dev")

# Validate configurations
config_manager.validate_configuration("high-performance-s3")
config_manager.validate_configuration("local-dev")

# Get configuration details
s3_info = config_manager.get_configuration_info("high-performance-s3")
print(f"S3 Configuration: {s3_info}")

Best Practices

Provider Selection

  • Use local filesystem for development and testing
  • Choose S3 for production cloud deployments
  • Consider provider-specific features and pricing
  • Implement fallback storage options for reliability

Performance Optimization

  • Use streaming operations for large files
  • Implement parallel uploads/downloads for batch operations
  • Configure appropriate chunk sizes for memory efficiency
  • Monitor storage costs and optimize access patterns

Security Considerations

  • Use IAM roles and policies for cloud storage access
  • Implement encryption for sensitive data
  • Regular access audits and credential rotation
  • Network security for storage endpoints

Error Handling and Reliability

  • Implement retry logic for transient failures
  • Use exponential backoff for rate-limited operations
  • Monitor storage health and availability
  • Implement proper logging for troubleshooting

Install with Tessl CLI

npx tessl i tessl/pypi-aissemble-foundation-core-python

docs

auth.md

bom.md

config.md

filestore.md

index.md

inference.md

metadata.md

policy.md

tile.json