Pipeline management software for clusters.
Overall
score
67%
Toil's file management system provides comprehensive file handling capabilities for workflows, including temporary file management, shared file storage, caching, and I/O operations. The system abstracts file operations across different storage backends while providing efficient caching, automatic cleanup, and seamless integration with job execution. File stores manage both local temporary files and globally accessible shared files that can be passed between jobs in a workflow.
{ .api }
The AbstractFileStore provides the core interface for all file operations within job execution.
from toil.fileStores.abstractFileStore import AbstractFileStore
from typing import IO, Optional
import logging
class CustomFileStore(AbstractFileStore):
"""Custom file store implementation for specialized storage needs."""
def writeGlobalFile(self, localFileName: str, cleanup: bool = True) -> str:
"""Write local file to globally accessible storage."""
# Generate unique global file ID
global_file_id = self.generate_global_file_id()
# Copy file to shared storage
with open(localFileName, 'rb') as local_file:
file_data = local_file.read()
self.store_global_file(global_file_id, file_data)
# Register for cleanup if requested
if cleanup:
self.register_for_cleanup(global_file_id)
return global_file_id
def readGlobalFile(self, fileStoreID: str, userPath: Optional[str] = None,
cache: bool = True, mutable: bool = None) -> str:
"""Read globally stored file to local path."""
# Determine output path
if userPath is None:
userPath = self.getLocalTempFile(
prefix=f"global_{fileStoreID}_",
suffix=".tmp"
)
# Check cache first if enabled
if cache and self.is_cached(fileStoreID):
cached_path = self.get_cached_path(fileStoreID)
if not mutable:
return cached_path
else:
# Make mutable copy
shutil.copy2(cached_path, userPath)
return userPath
# Retrieve from global storage
file_data = self.retrieve_global_file(fileStoreID)
with open(userPath, 'wb') as output_file:
output_file.write(file_data)
# Add to cache if enabled
if cache:
self.add_to_cache(fileStoreID, userPath)
return userPath
def deleteGlobalFile(self, fileStoreID: str) -> None:
"""Delete globally stored file."""
if not self.global_file_exists(fileStoreID):
return
# Remove from storage
self.remove_global_file(fileStoreID)
# Remove from cache
self.remove_from_cache(fileStoreID)
# Unregister from cleanup
self.unregister_cleanup(fileStoreID)
def writeLocalFile(self, localFileName: str, cleanup: bool = True) -> str:
"""Write file to job-local storage."""
local_file_id = self.generate_local_file_id()
# Copy to local storage area
local_path = self.get_local_storage_path(local_file_id)
shutil.copy2(localFileName, local_path)
if cleanup:
self.register_local_cleanup(local_file_id)
return local_file_id
def readLocalFile(self, localFileStoreID: str) -> str:
"""Get path to locally stored file."""
if not self.local_file_exists(localFileStoreID):
raise FileNotFoundError(f"Local file not found: {localFileStoreID}")
return self.get_local_storage_path(localFileStoreID)
def getLocalTempDir(self) -> str:
"""Get temporary directory for this job."""
if not hasattr(self, '_temp_dir'):
self._temp_dir = self.create_job_temp_dir()
return self._temp_dir
def getLocalTempFile(self, suffix: Optional[str] = None,
prefix: Optional[str] = 'tmp') -> str:
"""Create temporary file and return path."""
import tempfile
temp_dir = self.getLocalTempDir()
fd, temp_path = tempfile.mkstemp(
suffix=suffix,
prefix=prefix,
dir=temp_dir
)
os.close(fd) # Close file descriptor, keep file
return temp_path
def logToMaster(self, text: str, level: int = logging.INFO) -> None:
"""Send log message to workflow leader."""
log_message = {
'job_id': self.job_id,
'timestamp': time.time(),
'level': level,
'message': text
}
self.send_log_to_master(log_message){ .api }
Global files are accessible across all jobs in a workflow and persist in the job store.
from toil.fileStores.abstractFileStore import AbstractFileStore
def demonstrate_global_files(fileStore: AbstractFileStore):
"""Demonstrate global file operations."""
# Create input data file
input_file = fileStore.getLocalTempFile(suffix=".txt")
with open(input_file, 'w') as f:
f.write("Sample data for processing\n")
f.write("Line 2 of data\n")
f.write("Line 3 of data\n")
# Write to global storage - accessible by other jobs
global_file_id = fileStore.writeGlobalFile(input_file, cleanup=True)
fileStore.logToMaster(f"Created global file: {global_file_id}")
# Read global file in same or different job
# Cache enabled by default for performance
cached_path = fileStore.readGlobalFile(
global_file_id,
cache=True, # Enable caching
mutable=False # Read-only access
)
# Read global file to specific location
output_path = fileStore.getLocalTempFile(suffix=".processed")
fileStore.readGlobalFile(
global_file_id,
userPath=output_path,
cache=False, # Skip cache
mutable=True # Allow modifications
)
# Process the file
with open(output_path, 'r') as f:
lines = f.readlines()
processed_lines = [line.upper() for line in lines]
with open(output_path, 'w') as f:
f.writelines(processed_lines)
# Store processed result as new global file
processed_file_id = fileStore.writeGlobalFile(output_path)
# Return file ID for use by downstream jobs
return processed_file_id
def chain_file_processing(fileStore: AbstractFileStore, input_file_id: str):
"""Chain multiple file processing operations."""
# Step 1: Read input file
input_path = fileStore.readGlobalFile(input_file_id, mutable=True)
# Step 2: First processing stage
with open(input_path, 'r') as f:
data = f.read()
# Add timestamp
import datetime
processed_data = f"Processed at {datetime.datetime.now()}\n{data}"
stage1_file = fileStore.getLocalTempFile(suffix=".stage1")
with open(stage1_file, 'w') as f:
f.write(processed_data)
stage1_id = fileStore.writeGlobalFile(stage1_file)
# Step 3: Second processing stage
stage1_path = fileStore.readGlobalFile(stage1_id, mutable=True)
with open(stage1_path, 'r') as f:
lines = f.readlines()
# Add line numbers
numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
stage2_file = fileStore.getLocalTempFile(suffix=".stage2")
with open(stage2_file, 'w') as f:
f.writelines(numbered_lines)
final_id = fileStore.writeGlobalFile(stage2_file)
# Cleanup intermediate files
fileStore.deleteGlobalFile(stage1_id)
return final_id{ .api }
Local files are job-specific and automatically cleaned up when the job completes.
def demonstrate_local_files(fileStore: AbstractFileStore):
"""Demonstrate local file operations."""
# Create temporary working directory
temp_dir = fileStore.getLocalTempDir()
fileStore.logToMaster(f"Working in directory: {temp_dir}")
# Create multiple temporary files
temp_files = []
for i in range(3):
temp_file = fileStore.getLocalTempFile(
prefix=f"work_{i}_",
suffix=".dat"
)
temp_files.append(temp_file)
# Write some data
with open(temp_file, 'w') as f:
f.write(f"Temporary data for file {i}\n")
f.write(f"Created in job: {fileStore.jobID}\n")
# Store local files for job-specific access
local_file_ids = []
for temp_file in temp_files:
local_id = fileStore.writeLocalFile(temp_file, cleanup=True)
local_file_ids.append(local_id)
fileStore.logToMaster(f"Stored local file: {local_id}")
# Access local files
processed_results = []
for local_id in local_file_ids:
local_path = fileStore.readLocalFile(local_id)
with open(local_path, 'r') as f:
content = f.read()
processed_results.append(content.upper())
# Local files automatically cleaned up
# Create consolidated result
result_file = fileStore.getLocalTempFile(suffix=".result")
with open(result_file, 'w') as f:
f.write("Consolidated Results:\n")
f.write("=" * 50 + "\n")
for i, result in enumerate(processed_results):
f.write(f"\nFile {i}:\n{result}\n")
# Store final result as global file for other jobs
return fileStore.writeGlobalFile(result_file)
def advanced_local_file_patterns(fileStore: AbstractFileStore):
"""Advanced patterns for local file management."""
# Create structured temporary directory
base_temp = fileStore.getLocalTempDir()
# Create subdirectories for organization
input_dir = os.path.join(base_temp, "input")
output_dir = os.path.join(base_temp, "output")
work_dir = os.path.join(base_temp, "work")
os.makedirs(input_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(work_dir, exist_ok=True)
# Use context manager for temporary files
import tempfile
import contextlib
@contextlib.contextmanager
def temp_file_context(directory, suffix=".tmp"):
"""Context manager for temporary files."""
fd, temp_path = tempfile.mkstemp(suffix=suffix, dir=directory)
try:
os.close(fd)
yield temp_path
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
# Process files with automatic cleanup
results = []
with temp_file_context(input_dir, ".input") as input_file:
# Create input data
with open(input_file, 'w') as f:
f.write("Input data for processing")
with temp_file_context(work_dir, ".work") as work_file:
# Process data
with open(input_file, 'r') as inf, open(work_file, 'w') as outf:
data = inf.read()
processed = data.replace("Input", "Processed")
outf.write(processed)
with temp_file_context(output_dir, ".output") as output_file:
# Finalize results
with open(work_file, 'r') as inf, open(output_file, 'w') as outf:
final_data = inf.read() + "\nProcessing complete."
outf.write(final_data)
# Store final result
final_id = fileStore.writeGlobalFile(output_file)
results.append(final_id)
return results{ .api }
Stream-based file operations for large files and real-time processing.
from typing import IO
def demonstrate_streaming_operations(fileStore: AbstractFileStore):
"""Demonstrate streaming file operations."""
# Write streaming data to shared file
shared_file_name = "streaming_data.log"
stream_id, write_stream = fileStore.writeSharedFileStream(
shared_file_name,
cleanup=True
)
try:
# Write streaming data
for i in range(1000):
line = f"Log entry {i}: Processing item {i}\n"
write_stream.write(line.encode('utf-8'))
if i % 100 == 0:
write_stream.flush() # Periodic flush
finally:
write_stream.close()
fileStore.logToMaster(f"Created streaming file: {stream_id}")
# Read streaming data
read_stream = fileStore.readSharedFileStream(stream_id)
try:
# Process stream in chunks
chunk_size = 1024
processed_lines = 0
while True:
chunk = read_stream.read(chunk_size)
if not chunk:
break
# Process chunk (count lines)
lines_in_chunk = chunk.decode('utf-8').count('\n')
processed_lines += lines_in_chunk
finally:
read_stream.close()
fileStore.logToMaster(f"Processed {processed_lines} lines from stream")
return stream_id
def large_file_processing(fileStore: AbstractFileStore, large_file_id: str):
"""Process large files efficiently using streaming."""
# Stream large file for processing
input_stream = fileStore.readSharedFileStream(large_file_id)
# Create output stream
output_file_name = "processed_large_file.out"
output_id, output_stream = fileStore.writeSharedFileStream(output_file_name)
try:
# Process file line by line to manage memory
buffer = b""
chunk_size = 8192
while True:
chunk = input_stream.read(chunk_size)
if not chunk:
# Process remaining buffer
if buffer:
lines = buffer.split(b'\n')
for line in lines:
if line:
processed_line = process_line(line)
output_stream.write(processed_line + b'\n')
break
buffer += chunk
# Process complete lines
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
processed_line = process_line(line)
output_stream.write(processed_line + b'\n')
finally:
input_stream.close()
output_stream.close()
return output_id
def process_line(line: bytes) -> bytes:
"""Process individual line (example transformation)."""
# Convert to uppercase and add timestamp
import time
timestamp = str(int(time.time())).encode()
return timestamp + b": " + line.upper(){ .api }
The CachingFileStore provides automatic caching for improved performance with frequently accessed files.
from toil.fileStores.cachingFileStore import CachingFileStore
def demonstrate_caching_filestore():
"""Demonstrate caching file store capabilities."""
# Caching file store automatically caches global files
# for improved performance on repeated access
class OptimizedJob(Job):
def __init__(self, input_file_ids):
super().__init__(memory="2G", cores=1, disk="1G")
self.input_file_ids = input_file_ids
def run(self, fileStore: CachingFileStore):
# First access - files downloaded and cached
cached_files = []
for file_id in self.input_file_ids:
# File cached on first read
cached_path = fileStore.readGlobalFile(
file_id,
cache=True # Enable caching (default)
)
cached_files.append(cached_path)
# Subsequent reads use cache (much faster)
same_cached_path = fileStore.readGlobalFile(file_id, cache=True)
assert cached_path == same_cached_path
# Process cached files
results = []
for cached_path in cached_files:
with open(cached_path, 'r') as f:
data = f.read()
results.append(len(data)) # Simple processing
# Cache statistics
cache_hits = fileStore.get_cache_hits() # Implementation specific
cache_misses = fileStore.get_cache_misses()
fileStore.logToMaster(f"Cache hits: {cache_hits}, misses: {cache_misses}")
return sum(results)
def cache_management_strategies(fileStore: CachingFileStore):
"""Advanced caching strategies."""
# Control cache behavior
large_file_id = "large_dataset_file"
# Read without caching (for very large files)
temp_path = fileStore.readGlobalFile(
large_file_id,
cache=False # Skip cache for large files
)
# Read with mutable access (creates copy, doesn't use cache)
mutable_path = fileStore.readGlobalFile(
large_file_id,
mutable=True # Need to modify file
)
# Preload frequently used files into cache
frequently_used_files = ["reference_genome.fa", "annotation.gtf", "config.json"]
for file_id in frequently_used_files:
# Preload into cache
fileStore.readGlobalFile(file_id, cache=True)
fileStore.logToMaster(f"Preloaded file: {file_id}")
# Use cached files efficiently
for file_id in frequently_used_files:
# Fast access from cache
cached_path = fileStore.readGlobalFile(file_id, cache=True)
# Process file...{ .api }
Comprehensive logging and monitoring for file operations and job progress.
import logging
from toil.fileStores.abstractFileStore import AbstractFileStore
def advanced_logging_patterns(fileStore: AbstractFileStore):
"""Advanced logging patterns for file operations."""
# Log file operations with different levels
fileStore.logToMaster("Starting file processing job", logging.INFO)
try:
# Log progress for long operations
input_files = ["file1.dat", "file2.dat", "file3.dat", "file4.dat"]
for i, filename in enumerate(input_files):
fileStore.logToMaster(
f"Processing file {i+1}/{len(input_files)}: {filename}",
logging.INFO
)
# Simulate file processing
temp_file = fileStore.getLocalTempFile(suffix=f"_{i}.tmp")
with open(temp_file, 'w') as f:
f.write(f"Processed content for {filename}")
file_id = fileStore.writeGlobalFile(temp_file)
# Log successful completion
fileStore.logToMaster(
f"Successfully processed {filename} -> {file_id}",
logging.DEBUG
)
fileStore.logToMaster("All files processed successfully", logging.INFO)
except Exception as e:
# Log errors with full context
fileStore.logToMaster(
f"Error in file processing: {str(e)}",
logging.ERROR
)
raise
def file_operation_metrics(fileStore: AbstractFileStore):
"""Collect metrics on file operations."""
import time
metrics = {
'files_processed': 0,
'bytes_written': 0,
'bytes_read': 0,
'processing_time': 0,
'cache_hits': 0
}
start_time = time.time()
try:
# Simulate file processing with metrics
for i in range(10):
# Create test file
test_file = fileStore.getLocalTempFile()
test_data = f"Test data {i} " * 1000 # ~10KB per file
with open(test_file, 'w') as f:
f.write(test_data)
file_size = os.path.getsize(test_file)
metrics['bytes_written'] += file_size
# Store globally
file_id = fileStore.writeGlobalFile(test_file)
# Read back (may hit cache)
read_path = fileStore.readGlobalFile(file_id, cache=True)
with open(read_path, 'r') as f:
read_data = f.read()
metrics['bytes_read'] += len(read_data.encode())
metrics['files_processed'] += 1
# Log progress every few files
if (i + 1) % 5 == 0:
elapsed = time.time() - start_time
fileStore.logToMaster(
f"Processed {i+1} files in {elapsed:.2f}s",
logging.INFO
)
metrics['processing_time'] = time.time() - start_time
# Log final metrics
fileStore.logToMaster(
f"Metrics - Files: {metrics['files_processed']}, "
f"Written: {metrics['bytes_written']} bytes, "
f"Read: {metrics['bytes_read']} bytes, "
f"Time: {metrics['processing_time']:.2f}s",
logging.INFO
)
except Exception as e:
fileStore.logToMaster(f"Metrics collection failed: {e}", logging.ERROR)
raise
return metricsThis file management system provides comprehensive, efficient file handling capabilities with caching, streaming, and robust error handling for complex workflow data management needs.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10