or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-systems.mdcore-workflow.mdfile-management.mdindex.mdjob-stores.mdprovisioning.mdutilities.mdworkflow-languages.md

file-management.mddocs/

0

# File Management

1

2

## Overview

3

4

Toil's file management system provides comprehensive file handling capabilities for workflows, including temporary file management, shared file storage, caching, and I/O operations. The system abstracts file operations across different storage backends while providing efficient caching, automatic cleanup, and seamless integration with job execution. File stores manage both local temporary files and globally accessible shared files that can be passed between jobs in a workflow.

5

6

## Capabilities

7

8

### Abstract File Store Interface

9

{ .api }

10

11

The `AbstractFileStore` provides the core interface for all file operations within job execution.

12

13

```python

14

from toil.fileStores.abstractFileStore import AbstractFileStore

15

from typing import IO, Optional

16

import logging

17

18

class CustomFileStore(AbstractFileStore):

19

"""Custom file store implementation for specialized storage needs."""

20

21

def writeGlobalFile(self, localFileName: str, cleanup: bool = True) -> str:

22

"""Write local file to globally accessible storage."""

23

# Generate unique global file ID

24

global_file_id = self.generate_global_file_id()

25

26

# Copy file to shared storage

27

with open(localFileName, 'rb') as local_file:

28

file_data = local_file.read()

29

self.store_global_file(global_file_id, file_data)

30

31

# Register for cleanup if requested

32

if cleanup:

33

self.register_for_cleanup(global_file_id)

34

35

return global_file_id

36

37

def readGlobalFile(self, fileStoreID: str, userPath: Optional[str] = None,

38

cache: bool = True, mutable: bool = None) -> str:

39

"""Read globally stored file to local path."""

40

41

# Determine output path

42

if userPath is None:

43

userPath = self.getLocalTempFile(

44

prefix=f"global_{fileStoreID}_",

45

suffix=".tmp"

46

)

47

48

# Check cache first if enabled

49

if cache and self.is_cached(fileStoreID):

50

cached_path = self.get_cached_path(fileStoreID)

51

if not mutable:

52

return cached_path

53

else:

54

# Make mutable copy

55

shutil.copy2(cached_path, userPath)

56

return userPath

57

58

# Retrieve from global storage

59

file_data = self.retrieve_global_file(fileStoreID)

60

61

with open(userPath, 'wb') as output_file:

62

output_file.write(file_data)

63

64

# Add to cache if enabled

65

if cache:

66

self.add_to_cache(fileStoreID, userPath)

67

68

return userPath

69

70

def deleteGlobalFile(self, fileStoreID: str) -> None:

71

"""Delete globally stored file."""

72

if not self.global_file_exists(fileStoreID):

73

return

74

75

# Remove from storage

76

self.remove_global_file(fileStoreID)

77

78

# Remove from cache

79

self.remove_from_cache(fileStoreID)

80

81

# Unregister from cleanup

82

self.unregister_cleanup(fileStoreID)

83

84

def writeLocalFile(self, localFileName: str, cleanup: bool = True) -> str:

85

"""Write file to job-local storage."""

86

local_file_id = self.generate_local_file_id()

87

88

# Copy to local storage area

89

local_path = self.get_local_storage_path(local_file_id)

90

shutil.copy2(localFileName, local_path)

91

92

if cleanup:

93

self.register_local_cleanup(local_file_id)

94

95

return local_file_id

96

97

def readLocalFile(self, localFileStoreID: str) -> str:

98

"""Get path to locally stored file."""

99

if not self.local_file_exists(localFileStoreID):

100

raise FileNotFoundError(f"Local file not found: {localFileStoreID}")

101

102

return self.get_local_storage_path(localFileStoreID)

103

104

def getLocalTempDir(self) -> str:

105

"""Get temporary directory for this job."""

106

if not hasattr(self, '_temp_dir'):

107

self._temp_dir = self.create_job_temp_dir()

108

109

return self._temp_dir

110

111

def getLocalTempFile(self, suffix: Optional[str] = None,

112

prefix: Optional[str] = 'tmp') -> str:

113

"""Create temporary file and return path."""

114

import tempfile

115

116

temp_dir = self.getLocalTempDir()

117

fd, temp_path = tempfile.mkstemp(

118

suffix=suffix,

119

prefix=prefix,

120

dir=temp_dir

121

)

122

os.close(fd) # Close file descriptor, keep file

123

124

return temp_path

125

126

def logToMaster(self, text: str, level: int = logging.INFO) -> None:

127

"""Send log message to workflow leader."""

128

log_message = {

129

'job_id': self.job_id,

130

'timestamp': time.time(),

131

'level': level,

132

'message': text

133

}

134

135

self.send_log_to_master(log_message)

136

```

137

138

### Global File Operations

139

{ .api }

140

141

Global files are accessible across all jobs in a workflow and persist in the job store.

142

143

```python

144

from toil.fileStores.abstractFileStore import AbstractFileStore

145

146

def demonstrate_global_files(fileStore: AbstractFileStore):

147

"""Demonstrate global file operations."""

148

149

# Create input data file

150

input_file = fileStore.getLocalTempFile(suffix=".txt")

151

with open(input_file, 'w') as f:

152

f.write("Sample data for processing\n")

153

f.write("Line 2 of data\n")

154

f.write("Line 3 of data\n")

155

156

# Write to global storage - accessible by other jobs

157

global_file_id = fileStore.writeGlobalFile(input_file, cleanup=True)

158

fileStore.logToMaster(f"Created global file: {global_file_id}")

159

160

# Read global file in same or different job

161

# Cache enabled by default for performance

162

cached_path = fileStore.readGlobalFile(

163

global_file_id,

164

cache=True, # Enable caching

165

mutable=False # Read-only access

166

)

167

168

# Read global file to specific location

169

output_path = fileStore.getLocalTempFile(suffix=".processed")

170

fileStore.readGlobalFile(

171

global_file_id,

172

userPath=output_path,

173

cache=False, # Skip cache

174

mutable=True # Allow modifications

175

)

176

177

# Process the file

178

with open(output_path, 'r') as f:

179

lines = f.readlines()

180

181

processed_lines = [line.upper() for line in lines]

182

183

with open(output_path, 'w') as f:

184

f.writelines(processed_lines)

185

186

# Store processed result as new global file

187

processed_file_id = fileStore.writeGlobalFile(output_path)

188

189

# Return file ID for use by downstream jobs

190

return processed_file_id

191

192

def chain_file_processing(fileStore: AbstractFileStore, input_file_id: str):

193

"""Chain multiple file processing operations."""

194

195

# Step 1: Read input file

196

input_path = fileStore.readGlobalFile(input_file_id, mutable=True)

197

198

# Step 2: First processing stage

199

with open(input_path, 'r') as f:

200

data = f.read()

201

202

# Add timestamp

203

import datetime

204

processed_data = f"Processed at {datetime.datetime.now()}\n{data}"

205

206

stage1_file = fileStore.getLocalTempFile(suffix=".stage1")

207

with open(stage1_file, 'w') as f:

208

f.write(processed_data)

209

210

stage1_id = fileStore.writeGlobalFile(stage1_file)

211

212

# Step 3: Second processing stage

213

stage1_path = fileStore.readGlobalFile(stage1_id, mutable=True)

214

215

with open(stage1_path, 'r') as f:

216

lines = f.readlines()

217

218

# Add line numbers

219

numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]

220

221

stage2_file = fileStore.getLocalTempFile(suffix=".stage2")

222

with open(stage2_file, 'w') as f:

223

f.writelines(numbered_lines)

224

225

final_id = fileStore.writeGlobalFile(stage2_file)

226

227

# Cleanup intermediate files

228

fileStore.deleteGlobalFile(stage1_id)

229

230

return final_id

231

```

232

233

### Local File Operations

234

{ .api }

235

236

Local files are job-specific and automatically cleaned up when the job completes.

237

238

```python

239

def demonstrate_local_files(fileStore: AbstractFileStore):

240

"""Demonstrate local file operations."""

241

242

# Create temporary working directory

243

temp_dir = fileStore.getLocalTempDir()

244

fileStore.logToMaster(f"Working in directory: {temp_dir}")

245

246

# Create multiple temporary files

247

temp_files = []

248

for i in range(3):

249

temp_file = fileStore.getLocalTempFile(

250

prefix=f"work_{i}_",

251

suffix=".dat"

252

)

253

temp_files.append(temp_file)

254

255

# Write some data

256

with open(temp_file, 'w') as f:

257

f.write(f"Temporary data for file {i}\n")

258

f.write(f"Created in job: {fileStore.jobID}\n")

259

260

# Store local files for job-specific access

261

local_file_ids = []

262

for temp_file in temp_files:

263

local_id = fileStore.writeLocalFile(temp_file, cleanup=True)

264

local_file_ids.append(local_id)

265

fileStore.logToMaster(f"Stored local file: {local_id}")

266

267

# Access local files

268

processed_results = []

269

for local_id in local_file_ids:

270

local_path = fileStore.readLocalFile(local_id)

271

272

with open(local_path, 'r') as f:

273

content = f.read()

274

processed_results.append(content.upper())

275

276

# Local files automatically cleaned up

277

278

# Create consolidated result

279

result_file = fileStore.getLocalTempFile(suffix=".result")

280

with open(result_file, 'w') as f:

281

f.write("Consolidated Results:\n")

282

f.write("=" * 50 + "\n")

283

for i, result in enumerate(processed_results):

284

f.write(f"\nFile {i}:\n{result}\n")

285

286

# Store final result as global file for other jobs

287

return fileStore.writeGlobalFile(result_file)

288

289

def advanced_local_file_patterns(fileStore: AbstractFileStore):

290

"""Advanced patterns for local file management."""

291

292

# Create structured temporary directory

293

base_temp = fileStore.getLocalTempDir()

294

295

# Create subdirectories for organization

296

input_dir = os.path.join(base_temp, "input")

297

output_dir = os.path.join(base_temp, "output")

298

work_dir = os.path.join(base_temp, "work")

299

300

os.makedirs(input_dir, exist_ok=True)

301

os.makedirs(output_dir, exist_ok=True)

302

os.makedirs(work_dir, exist_ok=True)

303

304

# Use context manager for temporary files

305

import tempfile

306

import contextlib

307

308

@contextlib.contextmanager

309

def temp_file_context(directory, suffix=".tmp"):

310

"""Context manager for temporary files."""

311

fd, temp_path = tempfile.mkstemp(suffix=suffix, dir=directory)

312

try:

313

os.close(fd)

314

yield temp_path

315

finally:

316

if os.path.exists(temp_path):

317

os.unlink(temp_path)

318

319

# Process files with automatic cleanup

320

results = []

321

322

with temp_file_context(input_dir, ".input") as input_file:

323

# Create input data

324

with open(input_file, 'w') as f:

325

f.write("Input data for processing")

326

327

with temp_file_context(work_dir, ".work") as work_file:

328

# Process data

329

with open(input_file, 'r') as inf, open(work_file, 'w') as outf:

330

data = inf.read()

331

processed = data.replace("Input", "Processed")

332

outf.write(processed)

333

334

with temp_file_context(output_dir, ".output") as output_file:

335

# Finalize results

336

with open(work_file, 'r') as inf, open(output_file, 'w') as outf:

337

final_data = inf.read() + "\nProcessing complete."

338

outf.write(final_data)

339

340

# Store final result

341

final_id = fileStore.writeGlobalFile(output_file)

342

results.append(final_id)

343

344

return results

345

```

346

347

### Streaming File Operations

348

{ .api }

349

350

Stream-based file operations for large files and real-time processing.

351

352

```python

353

from typing import IO

354

355

def demonstrate_streaming_operations(fileStore: AbstractFileStore):

356

"""Demonstrate streaming file operations."""

357

358

# Write streaming data to shared file

359

shared_file_name = "streaming_data.log"

360

361

stream_id, write_stream = fileStore.writeSharedFileStream(

362

shared_file_name,

363

cleanup=True

364

)

365

366

try:

367

# Write streaming data

368

for i in range(1000):

369

line = f"Log entry {i}: Processing item {i}\n"

370

write_stream.write(line.encode('utf-8'))

371

372

if i % 100 == 0:

373

write_stream.flush() # Periodic flush

374

375

finally:

376

write_stream.close()

377

378

fileStore.logToMaster(f"Created streaming file: {stream_id}")

379

380

# Read streaming data

381

read_stream = fileStore.readSharedFileStream(stream_id)

382

383

try:

384

# Process stream in chunks

385

chunk_size = 1024

386

processed_lines = 0

387

388

while True:

389

chunk = read_stream.read(chunk_size)

390

if not chunk:

391

break

392

393

# Process chunk (count lines)

394

lines_in_chunk = chunk.decode('utf-8').count('\n')

395

processed_lines += lines_in_chunk

396

397

finally:

398

read_stream.close()

399

400

fileStore.logToMaster(f"Processed {processed_lines} lines from stream")

401

402

return stream_id

403

404

def large_file_processing(fileStore: AbstractFileStore, large_file_id: str):

405

"""Process large files efficiently using streaming."""

406

407

# Stream large file for processing

408

input_stream = fileStore.readSharedFileStream(large_file_id)

409

410

# Create output stream

411

output_file_name = "processed_large_file.out"

412

output_id, output_stream = fileStore.writeSharedFileStream(output_file_name)

413

414

try:

415

# Process file line by line to manage memory

416

buffer = b""

417

chunk_size = 8192

418

419

while True:

420

chunk = input_stream.read(chunk_size)

421

if not chunk:

422

# Process remaining buffer

423

if buffer:

424

lines = buffer.split(b'\n')

425

for line in lines:

426

if line:

427

processed_line = process_line(line)

428

output_stream.write(processed_line + b'\n')

429

break

430

431

buffer += chunk

432

433

# Process complete lines

434

while b'\n' in buffer:

435

line, buffer = buffer.split(b'\n', 1)

436

processed_line = process_line(line)

437

output_stream.write(processed_line + b'\n')

438

439

finally:

440

input_stream.close()

441

output_stream.close()

442

443

return output_id

444

445

def process_line(line: bytes) -> bytes:

446

"""Process individual line (example transformation)."""

447

# Convert to uppercase and add timestamp

448

import time

449

timestamp = str(int(time.time())).encode()

450

return timestamp + b": " + line.upper()

451

```

452

453

### Caching File Store

454

{ .api }

455

456

The `CachingFileStore` provides automatic caching for improved performance with frequently accessed files.

457

458

```python

459

from toil.fileStores.cachingFileStore import CachingFileStore

460

461

def demonstrate_caching_filestore():

462

"""Demonstrate caching file store capabilities."""

463

464

# Caching file store automatically caches global files

465

# for improved performance on repeated access

466

467

class OptimizedJob(Job):

468

def __init__(self, input_file_ids):

469

super().__init__(memory="2G", cores=1, disk="1G")

470

self.input_file_ids = input_file_ids

471

472

def run(self, fileStore: CachingFileStore):

473

# First access - files downloaded and cached

474

cached_files = []

475

476

for file_id in self.input_file_ids:

477

# File cached on first read

478

cached_path = fileStore.readGlobalFile(

479

file_id,

480

cache=True # Enable caching (default)

481

)

482

cached_files.append(cached_path)

483

484

# Subsequent reads use cache (much faster)

485

same_cached_path = fileStore.readGlobalFile(file_id, cache=True)

486

assert cached_path == same_cached_path

487

488

# Process cached files

489

results = []

490

for cached_path in cached_files:

491

with open(cached_path, 'r') as f:

492

data = f.read()

493

results.append(len(data)) # Simple processing

494

495

# Cache statistics

496

cache_hits = fileStore.get_cache_hits() # Implementation specific

497

cache_misses = fileStore.get_cache_misses()

498

499

fileStore.logToMaster(f"Cache hits: {cache_hits}, misses: {cache_misses}")

500

501

return sum(results)

502

503

def cache_management_strategies(fileStore: CachingFileStore):

504

"""Advanced caching strategies."""

505

506

# Control cache behavior

507

large_file_id = "large_dataset_file"

508

509

# Read without caching (for very large files)

510

temp_path = fileStore.readGlobalFile(

511

large_file_id,

512

cache=False # Skip cache for large files

513

)

514

515

# Read with mutable access (creates copy, doesn't use cache)

516

mutable_path = fileStore.readGlobalFile(

517

large_file_id,

518

mutable=True # Need to modify file

519

)

520

521

# Preload frequently used files into cache

522

frequently_used_files = ["reference_genome.fa", "annotation.gtf", "config.json"]

523

524

for file_id in frequently_used_files:

525

# Preload into cache

526

fileStore.readGlobalFile(file_id, cache=True)

527

fileStore.logToMaster(f"Preloaded file: {file_id}")

528

529

# Use cached files efficiently

530

for file_id in frequently_used_files:

531

# Fast access from cache

532

cached_path = fileStore.readGlobalFile(file_id, cache=True)

533

# Process file...

534

```

535

536

### File Store Logging and Monitoring

537

{ .api }

538

539

Comprehensive logging and monitoring for file operations and job progress.

540

541

```python

542

import logging

543

from toil.fileStores.abstractFileStore import AbstractFileStore

544

545

def advanced_logging_patterns(fileStore: AbstractFileStore):

546

"""Advanced logging patterns for file operations."""

547

548

# Log file operations with different levels

549

fileStore.logToMaster("Starting file processing job", logging.INFO)

550

551

try:

552

# Log progress for long operations

553

input_files = ["file1.dat", "file2.dat", "file3.dat", "file4.dat"]

554

555

for i, filename in enumerate(input_files):

556

fileStore.logToMaster(

557

f"Processing file {i+1}/{len(input_files)}: {filename}",

558

logging.INFO

559

)

560

561

# Simulate file processing

562

temp_file = fileStore.getLocalTempFile(suffix=f"_{i}.tmp")

563

564

with open(temp_file, 'w') as f:

565

f.write(f"Processed content for {filename}")

566

567

file_id = fileStore.writeGlobalFile(temp_file)

568

569

# Log successful completion

570

fileStore.logToMaster(

571

f"Successfully processed {filename} -> {file_id}",

572

logging.DEBUG

573

)

574

575

fileStore.logToMaster("All files processed successfully", logging.INFO)

576

577

except Exception as e:

578

# Log errors with full context

579

fileStore.logToMaster(

580

f"Error in file processing: {str(e)}",

581

logging.ERROR

582

)

583

raise

584

585

def file_operation_metrics(fileStore: AbstractFileStore):

586

"""Collect metrics on file operations."""

587

588

import time

589

590

metrics = {

591

'files_processed': 0,

592

'bytes_written': 0,

593

'bytes_read': 0,

594

'processing_time': 0,

595

'cache_hits': 0

596

}

597

598

start_time = time.time()

599

600

try:

601

# Simulate file processing with metrics

602

for i in range(10):

603

# Create test file

604

test_file = fileStore.getLocalTempFile()

605

test_data = f"Test data {i} " * 1000 # ~10KB per file

606

607

with open(test_file, 'w') as f:

608

f.write(test_data)

609

610

file_size = os.path.getsize(test_file)

611

metrics['bytes_written'] += file_size

612

613

# Store globally

614

file_id = fileStore.writeGlobalFile(test_file)

615

616

# Read back (may hit cache)

617

read_path = fileStore.readGlobalFile(file_id, cache=True)

618

619

with open(read_path, 'r') as f:

620

read_data = f.read()

621

metrics['bytes_read'] += len(read_data.encode())

622

623

metrics['files_processed'] += 1

624

625

# Log progress every few files

626

if (i + 1) % 5 == 0:

627

elapsed = time.time() - start_time

628

fileStore.logToMaster(

629

f"Processed {i+1} files in {elapsed:.2f}s",

630

logging.INFO

631

)

632

633

metrics['processing_time'] = time.time() - start_time

634

635

# Log final metrics

636

fileStore.logToMaster(

637

f"Metrics - Files: {metrics['files_processed']}, "

638

f"Written: {metrics['bytes_written']} bytes, "

639

f"Read: {metrics['bytes_read']} bytes, "

640

f"Time: {metrics['processing_time']:.2f}s",

641

logging.INFO

642

)

643

644

except Exception as e:

645

fileStore.logToMaster(f"Metrics collection failed: {e}", logging.ERROR)

646

raise

647

648

return metrics

649

```

650

651

This file management system provides comprehensive, efficient file handling capabilities with caching, streaming, and robust error handling for complex workflow data management needs.