or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md

file-utilities.mddocs/

0

# File Utilities and Progress Tracking

1

2

File handling utilities including chunk readers, progress streams, OS operations, and callback support for monitoring transfer progress and managing file operations efficiently.

3

4

## Capabilities

5

6

### ReadFileChunk

7

8

Enhanced file chunk reader that provides progress callbacks, transfer state management, and efficient reading of file segments.

9

10

```python { .api }

11

class ReadFileChunk:

12

"""

13

File-like object for reading chunks of files with progress callbacks and transfer state management.

14

15

Args:

16

fileobj: File object to read from

17

start_byte (int): Starting position in file

18

chunk_size (int): Maximum chunk size to read

19

full_file_size (int): Total file size

20

callback (callable, optional): Progress callback function(bytes_read)

21

enable_callback (bool): Whether to enable callbacks initially

22

"""

23

def __init__(

24

self,

25

fileobj,

26

start_byte: int,

27

chunk_size: int,

28

full_file_size: int,

29

callback=None,

30

enable_callback: bool = True

31

): ...

32

33

@classmethod

34

def from_filename(

35

cls,

36

filename: str,

37

start_byte: int,

38

chunk_size: int,

39

callback=None,

40

enable_callback: bool = True

41

):

42

"""

43

Create ReadFileChunk from filename.

44

45

Args:

46

filename (str): Path to file

47

start_byte (int): Starting position in file

48

chunk_size (int): Maximum chunk size to read

49

callback (callable, optional): Progress callback function

50

enable_callback (bool): Whether to enable callbacks initially

51

52

Returns:

53

ReadFileChunk: New instance

54

"""

55

56

def read(self, amount=None) -> bytes:

57

"""

58

Read data from chunk.

59

60

Args:

61

amount (int, optional): Number of bytes to read (default: all remaining)

62

63

Returns:

64

bytes: Data read from chunk

65

"""

66

67

def seek(self, where: int):

68

"""

69

Seek to position within chunk.

70

71

Args:

72

where (int): Position to seek to (relative to chunk start)

73

"""

74

75

def tell(self) -> int:

76

"""

77

Get current position within chunk.

78

79

Returns:

80

int: Current position relative to chunk start

81

"""

82

83

def close(self):

84

"""Close the underlying file object."""

85

86

def signal_transferring(self):

87

"""Signal that transfer is currently active."""

88

89

def signal_not_transferring(self):

90

"""Signal that transfer is not currently active."""

91

92

def enable_callback(self):

93

"""Enable progress callbacks."""

94

95

def disable_callback(self):

96

"""Disable progress callbacks."""

97

98

def __len__(self) -> int:

99

"""Return the size of this chunk."""

100

101

def __enter__(self):

102

"""Context manager entry."""

103

return self

104

105

def __exit__(self, *args, **kwargs):

106

"""Context manager exit."""

107

self.close()

108

```

109

110

### StreamReaderProgress

111

112

Wrapper for read-only streams that adds progress callback functionality for monitoring data consumption.

113

114

```python { .api }

115

class StreamReaderProgress:

116

"""

117

Wrapper for read-only streams that adds progress callbacks.

118

119

Args:

120

stream: Stream to wrap (must support read())

121

callback (callable, optional): Progress callback function(bytes_read)

122

"""

123

def __init__(self, stream, callback=None): ...

124

125

def read(self, *args, **kwargs) -> bytes:

126

"""

127

Read from stream with progress tracking.

128

129

Args:

130

*args: Arguments passed to underlying stream.read()

131

**kwargs: Keyword arguments passed to underlying stream.read()

132

133

Returns:

134

bytes: Data read from stream

135

"""

136

```

137

138

### DeferredOpenFile

139

140

File-like object that defers opening the actual file until first access, useful for preparing file operations without immediate resource consumption.

141

142

```python { .api }

143

class DeferredOpenFile:

144

"""

145

File-like object that defers opening until first access.

146

147

Args:

148

filename (str): Path to file

149

mode (str): File open mode

150

open_func (callable, optional): Function to use for opening file

151

"""

152

def __init__(self, filename: str, mode: str, open_func=None): ...

153

154

def read(self, amount=None) -> bytes:

155

"""

156

Read from file, opening if necessary.

157

158

Args:

159

amount (int, optional): Number of bytes to read

160

161

Returns:

162

bytes: Data read from file

163

"""

164

165

def write(self, data: bytes) -> int:

166

"""

167

Write to file, opening if necessary.

168

169

Args:

170

data (bytes): Data to write

171

172

Returns:

173

int: Number of bytes written

174

"""

175

176

def seek(self, where: int, whence: int = 0):

177

"""

178

Seek to position in file, opening if necessary.

179

180

Args:

181

where (int): Position to seek to

182

whence (int): Reference point for position

183

"""

184

185

def tell(self) -> int:

186

"""

187

Get current position in file, opening if necessary.

188

189

Returns:

190

int: Current file position

191

"""

192

193

def close(self):

194

"""Close file if open."""

195

196

@property

197

def name(self) -> str:

198

"""

199

Get filename.

200

201

Returns:

202

str: Filename

203

"""

204

```

205

206

### OSUtils

207

208

Enhanced OS utility functions providing file operations, size queries, and file chunk reader creation with comprehensive error handling.

209

210

```python { .api }

211

class OSUtils:

212

"""

213

Enhanced OS utility functions for file operations.

214

"""

215

def get_file_size(self, filename: str) -> int:

216

"""

217

Get file size in bytes.

218

219

Args:

220

filename (str): Path to file

221

222

Returns:

223

int: File size in bytes

224

225

Raises:

226

OSError: If file cannot be accessed

227

"""

228

229

def open_file_chunk_reader(self, filename: str, start_byte: int, size: int, callbacks):

230

"""

231

Open a file chunk reader with progress callback.

232

233

Args:

234

filename (str): Path to file

235

start_byte (int): Starting position in file

236

size (int): Size of chunk to read

237

callbacks: Progress callback functions (list or single callback)

238

239

Returns:

240

ReadFileChunk: File chunk reader instance

241

"""

242

243

def open_file_chunk_reader_from_fileobj(self, fileobj, chunk_size, full_file_size, callbacks, close_callbacks=None):

244

"""

245

Open a file chunk reader from existing file object.

246

247

Args:

248

fileobj: File object to read from

249

chunk_size: Size of chunk to read

250

full_file_size: Full size of the file

251

callbacks: Progress callback functions (list or single callback)

252

close_callbacks: Callbacks to execute when closing (optional)

253

254

Returns:

255

ReadFileChunk: File chunk reader instance

256

"""

257

258

def open(self, filename: str, mode: str):

259

"""

260

Open a file.

261

262

Args:

263

filename (str): Path to file

264

mode (str): File open mode

265

266

Returns:

267

File object

268

"""

269

270

def remove_file(self, filename: str):

271

"""

272

Remove a file (no-op if doesn't exist).

273

274

Args:

275

filename (str): Path to file to remove

276

"""

277

278

def rename_file(self, current_filename: str, new_filename: str):

279

"""

280

Rename a file.

281

282

Args:

283

current_filename (str): Current filename

284

new_filename (str): New filename

285

"""

286

287

def is_special_file(self, filename: str) -> bool:

288

"""

289

Check if file is a special file (device, pipe, etc.).

290

291

Args:

292

filename (str): Path to file

293

294

Returns:

295

bool: True if file is special, False otherwise

296

"""

297

298

def get_temp_filename(self, filename: str) -> str:

299

"""

300

Get a temporary filename based on the given filename.

301

302

Args:

303

filename (str): Base filename

304

305

Returns:

306

str: Temporary filename

307

"""

308

309

def allocate(self, filename: str, size: int):

310

"""

311

Allocate space for a file.

312

313

Args:

314

filename (str): Path to file

315

size (int): Size in bytes to allocate

316

"""

317

```

318

319

### Utility Classes

320

321

Additional utility classes for managing callbacks, function containers, and semaphores.

322

323

```python { .api }

324

class CallArgs:

325

"""

326

Records and stores call arguments as attributes.

327

328

Args:

329

**kwargs: Keyword arguments to store as attributes

330

"""

331

def __init__(self, **kwargs): ...

332

333

class FunctionContainer:

334

"""

335

Container for storing function with args and kwargs.

336

337

Args:

338

function: Function to store

339

*args: Positional arguments for function

340

**kwargs: Keyword arguments for function

341

"""

342

def __init__(self, function, *args, **kwargs): ...

343

344

class CountCallbackInvoker:

345

"""

346

Invokes callback when internal count reaches zero.

347

348

Args:

349

callback (callable): Function to call when count reaches zero

350

"""

351

def __init__(self, callback): ...

352

353

def increment(self):

354

"""Increment the counter."""

355

356

def decrement(self):

357

"""Decrement the counter, calling callback if it reaches zero."""

358

359

def finalize(self):

360

"""Force callback invocation regardless of count."""

361

362

@property

363

def current_count(self) -> int:

364

"""

365

Current count value.

366

367

Returns:

368

int: Current count

369

"""

370

371

class TaskSemaphore:

372

"""

373

Semaphore for coordinating task execution with tagging support.

374

375

Args:

376

capacity (int): Maximum number of permits

377

"""

378

def __init__(self, capacity: int): ...

379

380

def acquire(self, task_tag, blocking: bool = True):

381

"""

382

Acquire a permit.

383

384

Args:

385

task_tag: Tag identifying the task type

386

blocking (bool): Whether to block if no permits available

387

388

Returns:

389

Token: Acquire token for later release

390

"""

391

392

def release(self, task_tag, acquire_token):

393

"""

394

Release a permit.

395

396

Args:

397

task_tag: Tag identifying the task type

398

acquire_token: Token from acquire() call

399

"""

400

401

class ChunksizeAdjuster:

402

"""

403

Adjusts chunk sizes to comply with S3 multipart upload limits.

404

"""

405

def adjust_chunksize(self, current_chunksize: int, file_size: int, max_parts: int = 10000) -> int:

406

"""

407

Adjust chunk size to ensure number of parts doesn't exceed limit.

408

409

Args:

410

current_chunksize (int): Current chunk size

411

file_size (int): Total file size

412

max_parts (int): Maximum number of parts allowed

413

414

Returns:

415

int: Adjusted chunk size

416

"""

417

```

418

419

## Usage Examples

420

421

### Basic File Chunk Reading

422

423

```python

424

from s3transfer.utils import ReadFileChunk

425

426

def progress_callback(bytes_read):

427

print(f"Read {bytes_read} bytes")

428

429

# Read a specific chunk of a file

430

with ReadFileChunk.from_filename(

431

'/tmp/large_file.dat',

432

start_byte=1024, # Start at byte 1024

433

chunk_size=8 * 1024 * 1024, # Read up to 8MB

434

callback=progress_callback

435

) as chunk:

436

data = chunk.read(1024) # Read 1KB

437

print(f"Current position: {chunk.tell()}")

438

439

chunk.seek(2048) # Seek to byte 2048 within chunk

440

more_data = chunk.read() # Read remaining data in chunk

441

```

442

443

### Progress Tracking for Stream Reading

444

445

```python

446

from s3transfer.utils import StreamReaderProgress

447

import boto3

448

449

def download_progress(bytes_read):

450

print(f"Downloaded {bytes_read} bytes")

451

452

# Download with progress tracking

453

client = boto3.client('s3')

454

response = client.get_object(Bucket='my-bucket', Key='large-file.dat')

455

456

# Wrap the streaming body with progress tracking

457

progress_stream = StreamReaderProgress(response['Body'], download_progress)

458

459

# Read in chunks

460

with open('/tmp/downloaded.dat', 'wb') as f:

461

while True:

462

chunk = progress_stream.read(8192) # 8KB chunks

463

if not chunk:

464

break

465

f.write(chunk)

466

```

467

468

### Deferred File Operations

469

470

```python

471

from s3transfer.utils import DeferredOpenFile

472

473

# Create deferred file (doesn't open yet)

474

deferred_file = DeferredOpenFile('/tmp/output.txt', 'w')

475

476

# File is opened only when first accessed

477

deferred_file.write(b'Hello, world!') # File opened here

478

deferred_file.write(b'More data') # File already open

479

480

print(f"Filename: {deferred_file.name}")

481

deferred_file.close()

482

```

483

484

### Advanced OS Utilities

485

486

```python

487

from s3transfer.utils import OSUtils

488

import os

489

490

osutil = OSUtils()

491

492

# File size operations

493

filename = '/tmp/test_file.dat'

494

file_size = osutil.get_file_size(filename)

495

print(f"File size: {file_size} bytes")

496

497

# Check if file is special (device, pipe, etc.)

498

if osutil.is_special_file(filename):

499

print("File is a special file")

500

else:

501

print("File is a regular file")

502

503

# Get temporary filename

504

temp_filename = osutil.get_temp_filename(filename)

505

print(f"Temporary filename: {temp_filename}")

506

507

# Safe file operations

508

osutil.remove_file('/tmp/might_not_exist.txt') # No error if doesn't exist

509

510

# Allocate space for large file (on supported filesystems)

511

try:

512

osutil.allocate('/tmp/large_file.dat', 1024 * 1024 * 1024) # 1GB

513

print("Space allocated successfully")

514

except OSError as e:

515

print(f"Space allocation failed: {e}")

516

```

517

518

### Chunk Size Adjustment

519

520

```python

521

from s3transfer.utils import ChunksizeAdjuster

522

523

adjuster = ChunksizeAdjuster()

524

525

# Adjust chunk size for large file to stay within S3 limits

526

file_size = 5 * 1024 * 1024 * 1024 # 5GB

527

current_chunk_size = 8 * 1024 * 1024 # 8MB

528

529

adjusted_size = adjuster.adjust_chunksize(

530

current_chunksize=current_chunk_size,

531

file_size=file_size,

532

max_parts=10000 # S3 limit

533

)

534

535

print(f"Original chunk size: {current_chunk_size}")

536

print(f"Adjusted chunk size: {adjusted_size}")

537

print(f"Number of parts: {file_size // adjusted_size}")

538

```

539

540

### Callback Management

541

542

```python

543

from s3transfer.utils import CountCallbackInvoker

544

545

def completion_callback():

546

print("All operations completed!")

547

548

# Create callback invoker that triggers when count reaches zero

549

invoker = CountCallbackInvoker(completion_callback)

550

551

# Simulate multiple operations

552

operations = ['upload1', 'upload2', 'upload3']

553

554

# Increment for each operation

555

for op in operations:

556

invoker.increment()

557

print(f"Started operation: {op}")

558

559

print(f"Current count: {invoker.current_count}")

560

561

# Decrement as operations complete

562

for op in operations:

563

invoker.decrement()

564

print(f"Completed operation: {op}, remaining: {invoker.current_count}")

565

# Callback is called when count reaches zero

566

```

567

568

### Task Coordination with Semaphores

569

570

```python

571

from s3transfer.utils import TaskSemaphore

572

573

# Create semaphore for limiting concurrent operations

574

semaphore = TaskSemaphore(capacity=5) # Max 5 concurrent operations

575

576

def perform_operation(task_id):

577

# Acquire permit

578

token = semaphore.acquire('upload_task')

579

try:

580

print(f"Performing operation {task_id}")

581

# Simulate work

582

time.sleep(1)

583

print(f"Completed operation {task_id}")

584

finally:

585

# Always release permit

586

semaphore.release('upload_task', token)

587

588

# Start multiple operations (only 5 will run concurrently)

589

import threading

590

591

threads = []

592

for i in range(10):

593

thread = threading.Thread(target=perform_operation, args=(i,))

594

threads.append(thread)

595

thread.start()

596

597

# Wait for all to complete

598

for thread in threads:

599

thread.join()

600

```

601

602

### File Chunk Reading with Transfer State

603

604

```python

605

from s3transfer.utils import ReadFileChunk

606

607

def transfer_progress(bytes_read):

608

print(f"Transfer progress: {bytes_read} bytes")

609

610

filename = '/tmp/large_upload.dat'

611

chunk_size = 64 * 1024 * 1024 # 64MB chunks

612

file_size = os.path.getsize(filename)

613

614

# Read file in chunks for multipart upload

615

chunks_processed = 0

616

start_byte = 0

617

618

while start_byte < file_size:

619

with ReadFileChunk.from_filename(

620

filename,

621

start_byte=start_byte,

622

chunk_size=chunk_size,

623

callback=transfer_progress

624

) as chunk:

625

# Signal that transfer is active

626

chunk.signal_transferring()

627

628

try:

629

# Process chunk (simulate upload)

630

data = chunk.read()

631

print(f"Processing chunk {chunks_processed + 1}, size: {len(data)}")

632

633

# Simulate upload process

634

bytes_uploaded = 0

635

while bytes_uploaded < len(data):

636

# Upload in smaller increments

637

increment = min(8192, len(data) - bytes_uploaded)

638

bytes_uploaded += increment

639

# Progress is automatically reported via callback

640

641

finally:

642

# Signal transfer is no longer active

643

chunk.signal_not_transferring()

644

645

chunks_processed += 1

646

start_byte += chunk_size

647

648

print(f"Processed {chunks_processed} chunks total")

649

```

650

651

## Utility Functions

652

653

### Progress and Callback Utilities

654

655

```python { .api }

656

def get_callbacks(subscribers, callback_type: str) -> List[callable]:

657

"""

658

Extract callbacks of a specific type from subscriber objects.

659

660

Args:

661

subscribers: List of subscriber objects

662

callback_type (str): Type of callback to extract

663

664

Returns:

665

list: List of callback functions

666

"""

667

668

def invoke_progress_callbacks(callbacks: List[callable], bytes_transferred: int):

669

"""

670

Invoke progress callbacks with bytes transferred.

671

672

Args:

673

callbacks: List of callback functions

674

bytes_transferred (int): Number of bytes transferred

675

"""

676

677

def calculate_num_parts(size: int, part_size: int) -> int:

678

"""

679

Calculate number of parts needed for multipart upload.

680

681

Args:

682

size (int): Total size in bytes

683

part_size (int): Size per part in bytes

684

685

Returns:

686

int: Number of parts needed

687

"""

688

689

def calculate_range_parameter(start_range: int, end_range: int) -> str:

690

"""

691

Calculate HTTP Range parameter for partial downloads.

692

693

Args:

694

start_range (int): Start byte position

695

end_range (int): End byte position

696

697

Returns:

698

str: Range parameter string (e.g., 'bytes=0-1023')

699

"""

700

701

def get_filtered_dict(original_dict: dict, allowed_keys: List[str]) -> dict:

702

"""

703

Filter dictionary to only include allowed keys.

704

705

Args:

706

original_dict (dict): Original dictionary

707

allowed_keys (list): List of allowed keys

708

709

Returns:

710

dict: Filtered dictionary

711

"""

712

713

def random_file_extension(num_digits: int = 8) -> str:

714

"""

715

Generate random file extension.

716

717

Args:

718

num_digits (int): Number of digits in extension

719

720

Returns:

721

str: Random file extension

722

"""

723

```

724

725

## Best Practices

726

727

### File Chunk Reading

728

729

1. **Use context managers**: Always use `with` statements for ReadFileChunk

730

2. **Handle large files**: Use appropriate chunk sizes for memory management

731

3. **Monitor progress**: Implement progress callbacks for user feedback

732

4. **Signal transfer state**: Use `signal_transferring()` and `signal_not_transferring()`

733

734

### Progress Tracking

735

736

1. **Provide meaningful feedback**: Use progress callbacks to inform users

737

2. **Handle zero-byte transfers**: Check for empty files or streams

738

3. **Aggregate progress**: Combine progress from multiple sources when needed

739

4. **Debounce callbacks**: Avoid excessive callback frequency for performance

740

741

### Resource Management

742

743

1. **Close files properly**: Use context managers or explicit close() calls

744

2. **Handle exceptions**: Ensure cleanup even when errors occur

745

3. **Limit memory usage**: Don't read entire large files into memory

746

4. **Validate file operations**: Check file existence and permissions

747

748

### OS Utilities

749

750

1. **Handle cross-platform differences**: Use OSUtils for portable file operations

751

2. **Check special files**: Use `is_special_file()` before operations

752

3. **Safe file removal**: Use `remove_file()` which handles missing files

753

4. **Temporary files**: Use `get_temp_filename()` for atomic operations