0
# File Management
1
2
## Overview
3
4
Toil's file management system provides comprehensive file handling capabilities for workflows, including temporary file management, shared file storage, caching, and I/O operations. The system abstracts file operations across different storage backends while providing efficient caching, automatic cleanup, and seamless integration with job execution. File stores manage both local temporary files and globally accessible shared files that can be passed between jobs in a workflow.
5
6
## Capabilities
7
8
### Abstract File Store Interface
9
{ .api }
10
11
The `AbstractFileStore` provides the core interface for all file operations within job execution.
12
13
```python
14
from toil.fileStores.abstractFileStore import AbstractFileStore
15
from typing import IO, Optional
16
import logging
17
18
class CustomFileStore(AbstractFileStore):
19
"""Custom file store implementation for specialized storage needs."""
20
21
def writeGlobalFile(self, localFileName: str, cleanup: bool = True) -> str:
22
"""Write local file to globally accessible storage."""
23
# Generate unique global file ID
24
global_file_id = self.generate_global_file_id()
25
26
# Copy file to shared storage
27
with open(localFileName, 'rb') as local_file:
28
file_data = local_file.read()
29
self.store_global_file(global_file_id, file_data)
30
31
# Register for cleanup if requested
32
if cleanup:
33
self.register_for_cleanup(global_file_id)
34
35
return global_file_id
36
37
def readGlobalFile(self, fileStoreID: str, userPath: Optional[str] = None,
38
cache: bool = True, mutable: bool = None) -> str:
39
"""Read globally stored file to local path."""
40
41
# Determine output path
42
if userPath is None:
43
userPath = self.getLocalTempFile(
44
prefix=f"global_{fileStoreID}_",
45
suffix=".tmp"
46
)
47
48
# Check cache first if enabled
49
if cache and self.is_cached(fileStoreID):
50
cached_path = self.get_cached_path(fileStoreID)
51
if not mutable:
52
return cached_path
53
else:
54
# Make mutable copy
55
shutil.copy2(cached_path, userPath)
56
return userPath
57
58
# Retrieve from global storage
59
file_data = self.retrieve_global_file(fileStoreID)
60
61
with open(userPath, 'wb') as output_file:
62
output_file.write(file_data)
63
64
# Add to cache if enabled
65
if cache:
66
self.add_to_cache(fileStoreID, userPath)
67
68
return userPath
69
70
def deleteGlobalFile(self, fileStoreID: str) -> None:
71
"""Delete globally stored file."""
72
if not self.global_file_exists(fileStoreID):
73
return
74
75
# Remove from storage
76
self.remove_global_file(fileStoreID)
77
78
# Remove from cache
79
self.remove_from_cache(fileStoreID)
80
81
# Unregister from cleanup
82
self.unregister_cleanup(fileStoreID)
83
84
def writeLocalFile(self, localFileName: str, cleanup: bool = True) -> str:
85
"""Write file to job-local storage."""
86
local_file_id = self.generate_local_file_id()
87
88
# Copy to local storage area
89
local_path = self.get_local_storage_path(local_file_id)
90
shutil.copy2(localFileName, local_path)
91
92
if cleanup:
93
self.register_local_cleanup(local_file_id)
94
95
return local_file_id
96
97
def readLocalFile(self, localFileStoreID: str) -> str:
98
"""Get path to locally stored file."""
99
if not self.local_file_exists(localFileStoreID):
100
raise FileNotFoundError(f"Local file not found: {localFileStoreID}")
101
102
return self.get_local_storage_path(localFileStoreID)
103
104
def getLocalTempDir(self) -> str:
105
"""Get temporary directory for this job."""
106
if not hasattr(self, '_temp_dir'):
107
self._temp_dir = self.create_job_temp_dir()
108
109
return self._temp_dir
110
111
def getLocalTempFile(self, suffix: Optional[str] = None,
112
prefix: Optional[str] = 'tmp') -> str:
113
"""Create temporary file and return path."""
114
import tempfile
115
116
temp_dir = self.getLocalTempDir()
117
fd, temp_path = tempfile.mkstemp(
118
suffix=suffix,
119
prefix=prefix,
120
dir=temp_dir
121
)
122
os.close(fd) # Close file descriptor, keep file
123
124
return temp_path
125
126
def logToMaster(self, text: str, level: int = logging.INFO) -> None:
127
"""Send log message to workflow leader."""
128
log_message = {
129
'job_id': self.job_id,
130
'timestamp': time.time(),
131
'level': level,
132
'message': text
133
}
134
135
self.send_log_to_master(log_message)
136
```
137
138
### Global File Operations
139
{ .api }
140
141
Global files are accessible across all jobs in a workflow and persist in the job store.
142
143
```python
144
from toil.fileStores.abstractFileStore import AbstractFileStore
145
146
def demonstrate_global_files(fileStore: AbstractFileStore):
147
"""Demonstrate global file operations."""
148
149
# Create input data file
150
input_file = fileStore.getLocalTempFile(suffix=".txt")
151
with open(input_file, 'w') as f:
152
f.write("Sample data for processing\n")
153
f.write("Line 2 of data\n")
154
f.write("Line 3 of data\n")
155
156
# Write to global storage - accessible by other jobs
157
global_file_id = fileStore.writeGlobalFile(input_file, cleanup=True)
158
fileStore.logToMaster(f"Created global file: {global_file_id}")
159
160
# Read global file in same or different job
161
# Cache enabled by default for performance
162
cached_path = fileStore.readGlobalFile(
163
global_file_id,
164
cache=True, # Enable caching
165
mutable=False # Read-only access
166
)
167
168
# Read global file to specific location
169
output_path = fileStore.getLocalTempFile(suffix=".processed")
170
fileStore.readGlobalFile(
171
global_file_id,
172
userPath=output_path,
173
cache=False, # Skip cache
174
mutable=True # Allow modifications
175
)
176
177
# Process the file
178
with open(output_path, 'r') as f:
179
lines = f.readlines()
180
181
processed_lines = [line.upper() for line in lines]
182
183
with open(output_path, 'w') as f:
184
f.writelines(processed_lines)
185
186
# Store processed result as new global file
187
processed_file_id = fileStore.writeGlobalFile(output_path)
188
189
# Return file ID for use by downstream jobs
190
return processed_file_id
191
192
def chain_file_processing(fileStore: AbstractFileStore, input_file_id: str):
193
"""Chain multiple file processing operations."""
194
195
# Step 1: Read input file
196
input_path = fileStore.readGlobalFile(input_file_id, mutable=True)
197
198
# Step 2: First processing stage
199
with open(input_path, 'r') as f:
200
data = f.read()
201
202
# Add timestamp
203
import datetime
204
processed_data = f"Processed at {datetime.datetime.now()}\n{data}"
205
206
stage1_file = fileStore.getLocalTempFile(suffix=".stage1")
207
with open(stage1_file, 'w') as f:
208
f.write(processed_data)
209
210
stage1_id = fileStore.writeGlobalFile(stage1_file)
211
212
# Step 3: Second processing stage
213
stage1_path = fileStore.readGlobalFile(stage1_id, mutable=True)
214
215
with open(stage1_path, 'r') as f:
216
lines = f.readlines()
217
218
# Add line numbers
219
numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
220
221
stage2_file = fileStore.getLocalTempFile(suffix=".stage2")
222
with open(stage2_file, 'w') as f:
223
f.writelines(numbered_lines)
224
225
final_id = fileStore.writeGlobalFile(stage2_file)
226
227
# Cleanup intermediate files
228
fileStore.deleteGlobalFile(stage1_id)
229
230
return final_id
231
```
232
233
### Local File Operations
234
{ .api }
235
236
Local files are job-specific and automatically cleaned up when the job completes.
237
238
```python
239
def demonstrate_local_files(fileStore: AbstractFileStore):
240
"""Demonstrate local file operations."""
241
242
# Create temporary working directory
243
temp_dir = fileStore.getLocalTempDir()
244
fileStore.logToMaster(f"Working in directory: {temp_dir}")
245
246
# Create multiple temporary files
247
temp_files = []
248
for i in range(3):
249
temp_file = fileStore.getLocalTempFile(
250
prefix=f"work_{i}_",
251
suffix=".dat"
252
)
253
temp_files.append(temp_file)
254
255
# Write some data
256
with open(temp_file, 'w') as f:
257
f.write(f"Temporary data for file {i}\n")
258
f.write(f"Created in job: {fileStore.jobID}\n")
259
260
# Store local files for job-specific access
261
local_file_ids = []
262
for temp_file in temp_files:
263
local_id = fileStore.writeLocalFile(temp_file, cleanup=True)
264
local_file_ids.append(local_id)
265
fileStore.logToMaster(f"Stored local file: {local_id}")
266
267
# Access local files
268
processed_results = []
269
for local_id in local_file_ids:
270
local_path = fileStore.readLocalFile(local_id)
271
272
with open(local_path, 'r') as f:
273
content = f.read()
274
processed_results.append(content.upper())
275
276
# Local files automatically cleaned up
277
278
# Create consolidated result
279
result_file = fileStore.getLocalTempFile(suffix=".result")
280
with open(result_file, 'w') as f:
281
f.write("Consolidated Results:\n")
282
f.write("=" * 50 + "\n")
283
for i, result in enumerate(processed_results):
284
f.write(f"\nFile {i}:\n{result}\n")
285
286
# Store final result as global file for other jobs
287
return fileStore.writeGlobalFile(result_file)
288
289
def advanced_local_file_patterns(fileStore: AbstractFileStore):
290
"""Advanced patterns for local file management."""
291
292
# Create structured temporary directory
293
base_temp = fileStore.getLocalTempDir()
294
295
# Create subdirectories for organization
296
input_dir = os.path.join(base_temp, "input")
297
output_dir = os.path.join(base_temp, "output")
298
work_dir = os.path.join(base_temp, "work")
299
300
os.makedirs(input_dir, exist_ok=True)
301
os.makedirs(output_dir, exist_ok=True)
302
os.makedirs(work_dir, exist_ok=True)
303
304
# Use context manager for temporary files
305
import tempfile
306
import contextlib
307
308
@contextlib.contextmanager
309
def temp_file_context(directory, suffix=".tmp"):
310
"""Context manager for temporary files."""
311
fd, temp_path = tempfile.mkstemp(suffix=suffix, dir=directory)
312
try:
313
os.close(fd)
314
yield temp_path
315
finally:
316
if os.path.exists(temp_path):
317
os.unlink(temp_path)
318
319
# Process files with automatic cleanup
320
results = []
321
322
with temp_file_context(input_dir, ".input") as input_file:
323
# Create input data
324
with open(input_file, 'w') as f:
325
f.write("Input data for processing")
326
327
with temp_file_context(work_dir, ".work") as work_file:
328
# Process data
329
with open(input_file, 'r') as inf, open(work_file, 'w') as outf:
330
data = inf.read()
331
processed = data.replace("Input", "Processed")
332
outf.write(processed)
333
334
with temp_file_context(output_dir, ".output") as output_file:
335
# Finalize results
336
with open(work_file, 'r') as inf, open(output_file, 'w') as outf:
337
final_data = inf.read() + "\nProcessing complete."
338
outf.write(final_data)
339
340
# Store final result
341
final_id = fileStore.writeGlobalFile(output_file)
342
results.append(final_id)
343
344
return results
345
```
346
347
### Streaming File Operations
348
{ .api }
349
350
Stream-based file operations for large files and real-time processing.
351
352
```python
353
from typing import IO
354
355
def demonstrate_streaming_operations(fileStore: AbstractFileStore):
356
"""Demonstrate streaming file operations."""
357
358
# Write streaming data to shared file
359
shared_file_name = "streaming_data.log"
360
361
stream_id, write_stream = fileStore.writeSharedFileStream(
362
shared_file_name,
363
cleanup=True
364
)
365
366
try:
367
# Write streaming data
368
for i in range(1000):
369
line = f"Log entry {i}: Processing item {i}\n"
370
write_stream.write(line.encode('utf-8'))
371
372
if i % 100 == 0:
373
write_stream.flush() # Periodic flush
374
375
finally:
376
write_stream.close()
377
378
fileStore.logToMaster(f"Created streaming file: {stream_id}")
379
380
# Read streaming data
381
read_stream = fileStore.readSharedFileStream(stream_id)
382
383
try:
384
# Process stream in chunks
385
chunk_size = 1024
386
processed_lines = 0
387
388
while True:
389
chunk = read_stream.read(chunk_size)
390
if not chunk:
391
break
392
393
# Process chunk (count lines)
394
lines_in_chunk = chunk.decode('utf-8').count('\n')
395
processed_lines += lines_in_chunk
396
397
finally:
398
read_stream.close()
399
400
fileStore.logToMaster(f"Processed {processed_lines} lines from stream")
401
402
return stream_id
403
404
def large_file_processing(fileStore: AbstractFileStore, large_file_id: str):
405
"""Process large files efficiently using streaming."""
406
407
# Stream large file for processing
408
input_stream = fileStore.readSharedFileStream(large_file_id)
409
410
# Create output stream
411
output_file_name = "processed_large_file.out"
412
output_id, output_stream = fileStore.writeSharedFileStream(output_file_name)
413
414
try:
415
# Process file line by line to manage memory
416
buffer = b""
417
chunk_size = 8192
418
419
while True:
420
chunk = input_stream.read(chunk_size)
421
if not chunk:
422
# Process remaining buffer
423
if buffer:
424
lines = buffer.split(b'\n')
425
for line in lines:
426
if line:
427
processed_line = process_line(line)
428
output_stream.write(processed_line + b'\n')
429
break
430
431
buffer += chunk
432
433
# Process complete lines
434
while b'\n' in buffer:
435
line, buffer = buffer.split(b'\n', 1)
436
processed_line = process_line(line)
437
output_stream.write(processed_line + b'\n')
438
439
finally:
440
input_stream.close()
441
output_stream.close()
442
443
return output_id
444
445
def process_line(line: bytes) -> bytes:
446
"""Process individual line (example transformation)."""
447
# Convert to uppercase and add timestamp
448
import time
449
timestamp = str(int(time.time())).encode()
450
return timestamp + b": " + line.upper()
451
```
452
453
### Caching File Store
454
{ .api }
455
456
The `CachingFileStore` provides automatic caching for improved performance with frequently accessed files.
457
458
```python
459
from toil.fileStores.cachingFileStore import CachingFileStore
460
461
def demonstrate_caching_filestore():
462
"""Demonstrate caching file store capabilities."""
463
464
# Caching file store automatically caches global files
465
# for improved performance on repeated access
466
467
class OptimizedJob(Job):
468
def __init__(self, input_file_ids):
469
super().__init__(memory="2G", cores=1, disk="1G")
470
self.input_file_ids = input_file_ids
471
472
def run(self, fileStore: CachingFileStore):
473
# First access - files downloaded and cached
474
cached_files = []
475
476
for file_id in self.input_file_ids:
477
# File cached on first read
478
cached_path = fileStore.readGlobalFile(
479
file_id,
480
cache=True # Enable caching (default)
481
)
482
cached_files.append(cached_path)
483
484
# Subsequent reads use cache (much faster)
485
same_cached_path = fileStore.readGlobalFile(file_id, cache=True)
486
assert cached_path == same_cached_path
487
488
# Process cached files
489
results = []
490
for cached_path in cached_files:
491
with open(cached_path, 'r') as f:
492
data = f.read()
493
results.append(len(data)) # Simple processing
494
495
# Cache statistics
496
cache_hits = fileStore.get_cache_hits() # Implementation specific
497
cache_misses = fileStore.get_cache_misses()
498
499
fileStore.logToMaster(f"Cache hits: {cache_hits}, misses: {cache_misses}")
500
501
return sum(results)
502
503
def cache_management_strategies(fileStore: CachingFileStore):
504
"""Advanced caching strategies."""
505
506
# Control cache behavior
507
large_file_id = "large_dataset_file"
508
509
# Read without caching (for very large files)
510
temp_path = fileStore.readGlobalFile(
511
large_file_id,
512
cache=False # Skip cache for large files
513
)
514
515
# Read with mutable access (creates copy, doesn't use cache)
516
mutable_path = fileStore.readGlobalFile(
517
large_file_id,
518
mutable=True # Need to modify file
519
)
520
521
# Preload frequently used files into cache
522
frequently_used_files = ["reference_genome.fa", "annotation.gtf", "config.json"]
523
524
for file_id in frequently_used_files:
525
# Preload into cache
526
fileStore.readGlobalFile(file_id, cache=True)
527
fileStore.logToMaster(f"Preloaded file: {file_id}")
528
529
# Use cached files efficiently
530
for file_id in frequently_used_files:
531
# Fast access from cache
532
cached_path = fileStore.readGlobalFile(file_id, cache=True)
533
# Process file...
534
```
535
536
### File Store Logging and Monitoring
537
{ .api }
538
539
Comprehensive logging and monitoring for file operations and job progress.
540
541
```python
542
import logging
543
from toil.fileStores.abstractFileStore import AbstractFileStore
544
545
def advanced_logging_patterns(fileStore: AbstractFileStore):
546
"""Advanced logging patterns for file operations."""
547
548
# Log file operations with different levels
549
fileStore.logToMaster("Starting file processing job", logging.INFO)
550
551
try:
552
# Log progress for long operations
553
input_files = ["file1.dat", "file2.dat", "file3.dat", "file4.dat"]
554
555
for i, filename in enumerate(input_files):
556
fileStore.logToMaster(
557
f"Processing file {i+1}/{len(input_files)}: {filename}",
558
logging.INFO
559
)
560
561
# Simulate file processing
562
temp_file = fileStore.getLocalTempFile(suffix=f"_{i}.tmp")
563
564
with open(temp_file, 'w') as f:
565
f.write(f"Processed content for {filename}")
566
567
file_id = fileStore.writeGlobalFile(temp_file)
568
569
# Log successful completion
570
fileStore.logToMaster(
571
f"Successfully processed {filename} -> {file_id}",
572
logging.DEBUG
573
)
574
575
fileStore.logToMaster("All files processed successfully", logging.INFO)
576
577
except Exception as e:
578
# Log errors with full context
579
fileStore.logToMaster(
580
f"Error in file processing: {str(e)}",
581
logging.ERROR
582
)
583
raise
584
585
def file_operation_metrics(fileStore: AbstractFileStore):
586
"""Collect metrics on file operations."""
587
588
import time
589
590
metrics = {
591
'files_processed': 0,
592
'bytes_written': 0,
593
'bytes_read': 0,
594
'processing_time': 0,
595
'cache_hits': 0
596
}
597
598
start_time = time.time()
599
600
try:
601
# Simulate file processing with metrics
602
for i in range(10):
603
# Create test file
604
test_file = fileStore.getLocalTempFile()
605
test_data = f"Test data {i} " * 1000 # ~10KB per file
606
607
with open(test_file, 'w') as f:
608
f.write(test_data)
609
610
file_size = os.path.getsize(test_file)
611
metrics['bytes_written'] += file_size
612
613
# Store globally
614
file_id = fileStore.writeGlobalFile(test_file)
615
616
# Read back (may hit cache)
617
read_path = fileStore.readGlobalFile(file_id, cache=True)
618
619
with open(read_path, 'r') as f:
620
read_data = f.read()
621
metrics['bytes_read'] += len(read_data.encode())
622
623
metrics['files_processed'] += 1
624
625
# Log progress every few files
626
if (i + 1) % 5 == 0:
627
elapsed = time.time() - start_time
628
fileStore.logToMaster(
629
f"Processed {i+1} files in {elapsed:.2f}s",
630
logging.INFO
631
)
632
633
metrics['processing_time'] = time.time() - start_time
634
635
# Log final metrics
636
fileStore.logToMaster(
637
f"Metrics - Files: {metrics['files_processed']}, "
638
f"Written: {metrics['bytes_written']} bytes, "
639
f"Read: {metrics['bytes_read']} bytes, "
640
f"Time: {metrics['processing_time']:.2f}s",
641
logging.INFO
642
)
643
644
except Exception as e:
645
fileStore.logToMaster(f"Metrics collection failed: {e}", logging.ERROR)
646
raise
647
648
return metrics
649
```
650
651
This file management system provides comprehensive, efficient file handling capabilities with caching, streaming, and robust error handling for complex workflow data management needs.