CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

file-management.mddocs/

File Management

Overview

Toil's file management system provides comprehensive file handling capabilities for workflows, including temporary file management, shared file storage, caching, and I/O operations. The system abstracts file operations across different storage backends while providing efficient caching, automatic cleanup, and seamless integration with job execution. File stores manage both local temporary files and globally accessible shared files that can be passed between jobs in a workflow.

Capabilities

Abstract File Store Interface

{ .api }

The AbstractFileStore provides the core interface for all file operations within job execution.

from toil.fileStores.abstractFileStore import AbstractFileStore
from typing import IO, Optional
import logging

class CustomFileStore(AbstractFileStore):
    """Custom file store implementation for specialized storage needs."""
    
    def writeGlobalFile(self, localFileName: str, cleanup: bool = True) -> str:
        """Write local file to globally accessible storage."""
        # Generate unique global file ID
        global_file_id = self.generate_global_file_id()
        
        # Copy file to shared storage
        with open(localFileName, 'rb') as local_file:
            file_data = local_file.read()
            self.store_global_file(global_file_id, file_data)
        
        # Register for cleanup if requested
        if cleanup:
            self.register_for_cleanup(global_file_id)
            
        return global_file_id
    
    def readGlobalFile(self, fileStoreID: str, userPath: Optional[str] = None, 
                      cache: bool = True, mutable: bool = None) -> str:
        """Read globally stored file to local path."""
        
        # Determine output path
        if userPath is None:
            userPath = self.getLocalTempFile(
                prefix=f"global_{fileStoreID}_",
                suffix=".tmp"
            )
        
        # Check cache first if enabled
        if cache and self.is_cached(fileStoreID):
            cached_path = self.get_cached_path(fileStoreID)
            if not mutable:
                return cached_path
            else:
                # Make mutable copy
                shutil.copy2(cached_path, userPath)
                return userPath
        
        # Retrieve from global storage
        file_data = self.retrieve_global_file(fileStoreID)
        
        with open(userPath, 'wb') as output_file:
            output_file.write(file_data)
        
        # Add to cache if enabled
        if cache:
            self.add_to_cache(fileStoreID, userPath)
            
        return userPath
    
    def deleteGlobalFile(self, fileStoreID: str) -> None:
        """Delete globally stored file."""
        if not self.global_file_exists(fileStoreID):
            return
            
        # Remove from storage
        self.remove_global_file(fileStoreID)
        
        # Remove from cache
        self.remove_from_cache(fileStoreID)
        
        # Unregister from cleanup
        self.unregister_cleanup(fileStoreID)
    
    def writeLocalFile(self, localFileName: str, cleanup: bool = True) -> str:
        """Write file to job-local storage."""
        local_file_id = self.generate_local_file_id()
        
        # Copy to local storage area
        local_path = self.get_local_storage_path(local_file_id)
        shutil.copy2(localFileName, local_path)
        
        if cleanup:
            self.register_local_cleanup(local_file_id)
            
        return local_file_id
    
    def readLocalFile(self, localFileStoreID: str) -> str:
        """Get path to locally stored file."""
        if not self.local_file_exists(localFileStoreID):
            raise FileNotFoundError(f"Local file not found: {localFileStoreID}")
            
        return self.get_local_storage_path(localFileStoreID)
    
    def getLocalTempDir(self) -> str:
        """Get temporary directory for this job."""
        if not hasattr(self, '_temp_dir'):
            self._temp_dir = self.create_job_temp_dir()
            
        return self._temp_dir
    
    def getLocalTempFile(self, suffix: Optional[str] = None, 
                        prefix: Optional[str] = 'tmp') -> str:
        """Create temporary file and return path."""
        import tempfile
        
        temp_dir = self.getLocalTempDir()
        fd, temp_path = tempfile.mkstemp(
            suffix=suffix,
            prefix=prefix,
            dir=temp_dir
        )
        os.close(fd)  # Close file descriptor, keep file
        
        return temp_path
    
    def logToMaster(self, text: str, level: int = logging.INFO) -> None:
        """Send log message to workflow leader."""
        log_message = {
            'job_id': self.job_id,
            'timestamp': time.time(),
            'level': level,
            'message': text
        }
        
        self.send_log_to_master(log_message)

Global File Operations

{ .api }

Global files are accessible across all jobs in a workflow and persist in the job store.

from toil.fileStores.abstractFileStore import AbstractFileStore

def demonstrate_global_files(fileStore: AbstractFileStore):
    """Demonstrate global file operations."""
    
    # Create input data file
    input_file = fileStore.getLocalTempFile(suffix=".txt")
    with open(input_file, 'w') as f:
        f.write("Sample data for processing\n")
        f.write("Line 2 of data\n")
        f.write("Line 3 of data\n")
    
    # Write to global storage - accessible by other jobs
    global_file_id = fileStore.writeGlobalFile(input_file, cleanup=True)
    fileStore.logToMaster(f"Created global file: {global_file_id}")
    
    # Read global file in same or different job
    # Cache enabled by default for performance
    cached_path = fileStore.readGlobalFile(
        global_file_id,
        cache=True,          # Enable caching
        mutable=False        # Read-only access
    )
    
    # Read global file to specific location
    output_path = fileStore.getLocalTempFile(suffix=".processed")
    fileStore.readGlobalFile(
        global_file_id,
        userPath=output_path,
        cache=False,         # Skip cache
        mutable=True         # Allow modifications
    )
    
    # Process the file
    with open(output_path, 'r') as f:
        lines = f.readlines()
    
    processed_lines = [line.upper() for line in lines]
    
    with open(output_path, 'w') as f:
        f.writelines(processed_lines)
    
    # Store processed result as new global file
    processed_file_id = fileStore.writeGlobalFile(output_path)
    
    # Return file ID for use by downstream jobs
    return processed_file_id

def chain_file_processing(fileStore: AbstractFileStore, input_file_id: str):
    """Chain multiple file processing operations."""
    
    # Step 1: Read input file
    input_path = fileStore.readGlobalFile(input_file_id, mutable=True)
    
    # Step 2: First processing stage  
    with open(input_path, 'r') as f:
        data = f.read()
    
    # Add timestamp
    import datetime
    processed_data = f"Processed at {datetime.datetime.now()}\n{data}"
    
    stage1_file = fileStore.getLocalTempFile(suffix=".stage1")
    with open(stage1_file, 'w') as f:
        f.write(processed_data)
    
    stage1_id = fileStore.writeGlobalFile(stage1_file)
    
    # Step 3: Second processing stage
    stage1_path = fileStore.readGlobalFile(stage1_id, mutable=True)
    
    with open(stage1_path, 'r') as f:
        lines = f.readlines()
    
    # Add line numbers
    numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
    
    stage2_file = fileStore.getLocalTempFile(suffix=".stage2")
    with open(stage2_file, 'w') as f:
        f.writelines(numbered_lines)
    
    final_id = fileStore.writeGlobalFile(stage2_file)
    
    # Cleanup intermediate files
    fileStore.deleteGlobalFile(stage1_id)
    
    return final_id

Local File Operations

{ .api }

Local files are job-specific and automatically cleaned up when the job completes.

def demonstrate_local_files(fileStore: AbstractFileStore):
    """Demonstrate local file operations."""
    
    # Create temporary working directory
    temp_dir = fileStore.getLocalTempDir()
    fileStore.logToMaster(f"Working in directory: {temp_dir}")
    
    # Create multiple temporary files
    temp_files = []
    for i in range(3):
        temp_file = fileStore.getLocalTempFile(
            prefix=f"work_{i}_",
            suffix=".dat"
        )
        temp_files.append(temp_file)
        
        # Write some data
        with open(temp_file, 'w') as f:
            f.write(f"Temporary data for file {i}\n")
            f.write(f"Created in job: {fileStore.jobID}\n")
    
    # Store local files for job-specific access
    local_file_ids = []
    for temp_file in temp_files:
        local_id = fileStore.writeLocalFile(temp_file, cleanup=True)
        local_file_ids.append(local_id)
        fileStore.logToMaster(f"Stored local file: {local_id}")
    
    # Access local files
    processed_results = []
    for local_id in local_file_ids:
        local_path = fileStore.readLocalFile(local_id)
        
        with open(local_path, 'r') as f:
            content = f.read()
            processed_results.append(content.upper())
        
        # Local files automatically cleaned up
    
    # Create consolidated result
    result_file = fileStore.getLocalTempFile(suffix=".result")
    with open(result_file, 'w') as f:
        f.write("Consolidated Results:\n")
        f.write("=" * 50 + "\n")
        for i, result in enumerate(processed_results):
            f.write(f"\nFile {i}:\n{result}\n")
    
    # Store final result as global file for other jobs
    return fileStore.writeGlobalFile(result_file)

def advanced_local_file_patterns(fileStore: AbstractFileStore):
    """Advanced patterns for local file management."""
    
    # Create structured temporary directory
    base_temp = fileStore.getLocalTempDir()
    
    # Create subdirectories for organization
    input_dir = os.path.join(base_temp, "input")
    output_dir = os.path.join(base_temp, "output")
    work_dir = os.path.join(base_temp, "work")
    
    os.makedirs(input_dir, exist_ok=True)
    os.makedirs(output_dir, exist_ok=True) 
    os.makedirs(work_dir, exist_ok=True)
    
    # Use context manager for temporary files
    import tempfile
    import contextlib
    
    @contextlib.contextmanager
    def temp_file_context(directory, suffix=".tmp"):
        """Context manager for temporary files."""
        fd, temp_path = tempfile.mkstemp(suffix=suffix, dir=directory)
        try:
            os.close(fd)
            yield temp_path
        finally:
            if os.path.exists(temp_path):
                os.unlink(temp_path)
    
    # Process files with automatic cleanup
    results = []
    
    with temp_file_context(input_dir, ".input") as input_file:
        # Create input data
        with open(input_file, 'w') as f:
            f.write("Input data for processing")
        
        with temp_file_context(work_dir, ".work") as work_file:
            # Process data
            with open(input_file, 'r') as inf, open(work_file, 'w') as outf:
                data = inf.read()
                processed = data.replace("Input", "Processed")
                outf.write(processed)
            
            with temp_file_context(output_dir, ".output") as output_file:
                # Finalize results
                with open(work_file, 'r') as inf, open(output_file, 'w') as outf:
                    final_data = inf.read() + "\nProcessing complete."
                    outf.write(final_data)
                
                # Store final result
                final_id = fileStore.writeGlobalFile(output_file)
                results.append(final_id)
    
    return results

Streaming File Operations

{ .api }

Stream-based file operations for large files and real-time processing.

from typing import IO

def demonstrate_streaming_operations(fileStore: AbstractFileStore):
    """Demonstrate streaming file operations."""
    
    # Write streaming data to shared file
    shared_file_name = "streaming_data.log"
    
    stream_id, write_stream = fileStore.writeSharedFileStream(
        shared_file_name,
        cleanup=True
    )
    
    try:
        # Write streaming data
        for i in range(1000):
            line = f"Log entry {i}: Processing item {i}\n"
            write_stream.write(line.encode('utf-8'))
            
            if i % 100 == 0:
                write_stream.flush()  # Periodic flush
                
    finally:
        write_stream.close()
    
    fileStore.logToMaster(f"Created streaming file: {stream_id}")
    
    # Read streaming data
    read_stream = fileStore.readSharedFileStream(stream_id)
    
    try:
        # Process stream in chunks
        chunk_size = 1024
        processed_lines = 0
        
        while True:
            chunk = read_stream.read(chunk_size)
            if not chunk:
                break
                
            # Process chunk (count lines)
            lines_in_chunk = chunk.decode('utf-8').count('\n')
            processed_lines += lines_in_chunk
            
    finally:
        read_stream.close()
    
    fileStore.logToMaster(f"Processed {processed_lines} lines from stream")
    
    return stream_id

def large_file_processing(fileStore: AbstractFileStore, large_file_id: str):
    """Process large files efficiently using streaming."""
    
    # Stream large file for processing
    input_stream = fileStore.readSharedFileStream(large_file_id)
    
    # Create output stream
    output_file_name = "processed_large_file.out"
    output_id, output_stream = fileStore.writeSharedFileStream(output_file_name)
    
    try:
        # Process file line by line to manage memory
        buffer = b""
        chunk_size = 8192
        
        while True:
            chunk = input_stream.read(chunk_size)
            if not chunk:
                # Process remaining buffer
                if buffer:
                    lines = buffer.split(b'\n')
                    for line in lines:
                        if line:
                            processed_line = process_line(line)
                            output_stream.write(processed_line + b'\n')
                break
            
            buffer += chunk
            
            # Process complete lines
            while b'\n' in buffer:
                line, buffer = buffer.split(b'\n', 1)
                processed_line = process_line(line)
                output_stream.write(processed_line + b'\n')
                
    finally:
        input_stream.close()
        output_stream.close()
    
    return output_id

def process_line(line: bytes) -> bytes:
    """Process individual line (example transformation)."""
    # Convert to uppercase and add timestamp
    import time
    timestamp = str(int(time.time())).encode()
    return timestamp + b": " + line.upper()

Caching File Store

{ .api }

The CachingFileStore provides automatic caching for improved performance with frequently accessed files.

from toil.fileStores.cachingFileStore import CachingFileStore

def demonstrate_caching_filestore():
    """Demonstrate caching file store capabilities."""
    
    # Caching file store automatically caches global files
    # for improved performance on repeated access
    
    class OptimizedJob(Job):
        def __init__(self, input_file_ids):
            super().__init__(memory="2G", cores=1, disk="1G")
            self.input_file_ids = input_file_ids
        
        def run(self, fileStore: CachingFileStore):
            # First access - files downloaded and cached
            cached_files = []
            
            for file_id in self.input_file_ids:
                # File cached on first read
                cached_path = fileStore.readGlobalFile(
                    file_id,
                    cache=True  # Enable caching (default)
                )
                cached_files.append(cached_path)
                
                # Subsequent reads use cache (much faster)
                same_cached_path = fileStore.readGlobalFile(file_id, cache=True)
                assert cached_path == same_cached_path
            
            # Process cached files
            results = []
            for cached_path in cached_files:
                with open(cached_path, 'r') as f:
                    data = f.read()
                    results.append(len(data))  # Simple processing
            
            # Cache statistics
            cache_hits = fileStore.get_cache_hits()  # Implementation specific
            cache_misses = fileStore.get_cache_misses()
            
            fileStore.logToMaster(f"Cache hits: {cache_hits}, misses: {cache_misses}")
            
            return sum(results)

def cache_management_strategies(fileStore: CachingFileStore):
    """Advanced caching strategies."""
    
    # Control cache behavior
    large_file_id = "large_dataset_file"
    
    # Read without caching (for very large files)
    temp_path = fileStore.readGlobalFile(
        large_file_id,
        cache=False  # Skip cache for large files
    )
    
    # Read with mutable access (creates copy, doesn't use cache)
    mutable_path = fileStore.readGlobalFile(
        large_file_id,
        mutable=True  # Need to modify file
    )
    
    # Preload frequently used files into cache
    frequently_used_files = ["reference_genome.fa", "annotation.gtf", "config.json"]
    
    for file_id in frequently_used_files:
        # Preload into cache
        fileStore.readGlobalFile(file_id, cache=True)
        fileStore.logToMaster(f"Preloaded file: {file_id}")
    
    # Use cached files efficiently
    for file_id in frequently_used_files:
        # Fast access from cache
        cached_path = fileStore.readGlobalFile(file_id, cache=True)
        # Process file...

File Store Logging and Monitoring

{ .api }

Comprehensive logging and monitoring for file operations and job progress.

import logging
from toil.fileStores.abstractFileStore import AbstractFileStore

def advanced_logging_patterns(fileStore: AbstractFileStore):
    """Advanced logging patterns for file operations."""
    
    # Log file operations with different levels
    fileStore.logToMaster("Starting file processing job", logging.INFO)
    
    try:
        # Log progress for long operations
        input_files = ["file1.dat", "file2.dat", "file3.dat", "file4.dat"]
        
        for i, filename in enumerate(input_files):
            fileStore.logToMaster(
                f"Processing file {i+1}/{len(input_files)}: {filename}",
                logging.INFO
            )
            
            # Simulate file processing
            temp_file = fileStore.getLocalTempFile(suffix=f"_{i}.tmp")
            
            with open(temp_file, 'w') as f:
                f.write(f"Processed content for {filename}")
            
            file_id = fileStore.writeGlobalFile(temp_file)
            
            # Log successful completion
            fileStore.logToMaster(
                f"Successfully processed {filename} -> {file_id}",
                logging.DEBUG
            )
        
        fileStore.logToMaster("All files processed successfully", logging.INFO)
        
    except Exception as e:
        # Log errors with full context
        fileStore.logToMaster(
            f"Error in file processing: {str(e)}",
            logging.ERROR
        )
        raise

def file_operation_metrics(fileStore: AbstractFileStore):
    """Collect metrics on file operations."""
    
    import time
    
    metrics = {
        'files_processed': 0,
        'bytes_written': 0,
        'bytes_read': 0,
        'processing_time': 0,
        'cache_hits': 0
    }
    
    start_time = time.time()
    
    try:
        # Simulate file processing with metrics
        for i in range(10):
            # Create test file
            test_file = fileStore.getLocalTempFile()
            test_data = f"Test data {i} " * 1000  # ~10KB per file
            
            with open(test_file, 'w') as f:
                f.write(test_data)
            
            file_size = os.path.getsize(test_file)
            metrics['bytes_written'] += file_size
            
            # Store globally
            file_id = fileStore.writeGlobalFile(test_file)
            
            # Read back (may hit cache)
            read_path = fileStore.readGlobalFile(file_id, cache=True)
            
            with open(read_path, 'r') as f:
                read_data = f.read()
                metrics['bytes_read'] += len(read_data.encode())
            
            metrics['files_processed'] += 1
            
            # Log progress every few files
            if (i + 1) % 5 == 0:
                elapsed = time.time() - start_time
                fileStore.logToMaster(
                    f"Processed {i+1} files in {elapsed:.2f}s",
                    logging.INFO
                )
        
        metrics['processing_time'] = time.time() - start_time
        
        # Log final metrics
        fileStore.logToMaster(
            f"Metrics - Files: {metrics['files_processed']}, "
            f"Written: {metrics['bytes_written']} bytes, "
            f"Read: {metrics['bytes_read']} bytes, "
            f"Time: {metrics['processing_time']:.2f}s",
            logging.INFO
        )
        
    except Exception as e:
        fileStore.logToMaster(f"Metrics collection failed: {e}", logging.ERROR)
        raise
    
    return metrics

This file management system provides comprehensive, efficient file handling capabilities with caching, streaming, and robust error handling for complex workflow data management needs.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json