0
# File Utilities and Progress Tracking
1
2
File handling utilities including chunk readers, progress streams, OS operations, and callback support for monitoring transfer progress and managing file operations efficiently.
3
4
## Capabilities
5
6
### ReadFileChunk
7
8
Enhanced file chunk reader that provides progress callbacks, transfer state management, and efficient reading of file segments.
9
10
```python { .api }
11
class ReadFileChunk:
12
"""
13
File-like object for reading chunks of files with progress callbacks and transfer state management.
14
15
Args:
16
fileobj: File object to read from
17
start_byte (int): Starting position in file
18
chunk_size (int): Maximum chunk size to read
19
full_file_size (int): Total file size
20
callback (callable, optional): Progress callback function(bytes_read)
21
enable_callback (bool): Whether to enable callbacks initially
22
"""
23
def __init__(
24
self,
25
fileobj,
26
start_byte: int,
27
chunk_size: int,
28
full_file_size: int,
29
callback=None,
30
enable_callback: bool = True
31
): ...
32
33
@classmethod
34
def from_filename(
35
cls,
36
filename: str,
37
start_byte: int,
38
chunk_size: int,
39
callback=None,
40
enable_callback: bool = True
41
):
42
"""
43
Create ReadFileChunk from filename.
44
45
Args:
46
filename (str): Path to file
47
start_byte (int): Starting position in file
48
chunk_size (int): Maximum chunk size to read
49
callback (callable, optional): Progress callback function
50
enable_callback (bool): Whether to enable callbacks initially
51
52
Returns:
53
ReadFileChunk: New instance
54
"""
55
56
def read(self, amount=None) -> bytes:
57
"""
58
Read data from chunk.
59
60
Args:
61
amount (int, optional): Number of bytes to read (default: all remaining)
62
63
Returns:
64
bytes: Data read from chunk
65
"""
66
67
def seek(self, where: int):
68
"""
69
Seek to position within chunk.
70
71
Args:
72
where (int): Position to seek to (relative to chunk start)
73
"""
74
75
def tell(self) -> int:
76
"""
77
Get current position within chunk.
78
79
Returns:
80
int: Current position relative to chunk start
81
"""
82
83
def close(self):
84
"""Close the underlying file object."""
85
86
def signal_transferring(self):
87
"""Signal that transfer is currently active."""
88
89
def signal_not_transferring(self):
90
"""Signal that transfer is not currently active."""
91
92
def enable_callback(self):
93
"""Enable progress callbacks."""
94
95
def disable_callback(self):
96
"""Disable progress callbacks."""
97
98
def __len__(self) -> int:
99
"""Return the size of this chunk."""
100
101
def __enter__(self):
102
"""Context manager entry."""
103
return self
104
105
def __exit__(self, *args, **kwargs):
106
"""Context manager exit."""
107
self.close()
108
```
109
110
### StreamReaderProgress
111
112
Wrapper for read-only streams that adds progress callback functionality for monitoring data consumption.
113
114
```python { .api }
115
class StreamReaderProgress:
116
"""
117
Wrapper for read-only streams that adds progress callbacks.
118
119
Args:
120
stream: Stream to wrap (must support read())
121
callback (callable, optional): Progress callback function(bytes_read)
122
"""
123
def __init__(self, stream, callback=None): ...
124
125
def read(self, *args, **kwargs) -> bytes:
126
"""
127
Read from stream with progress tracking.
128
129
Args:
130
*args: Arguments passed to underlying stream.read()
131
**kwargs: Keyword arguments passed to underlying stream.read()
132
133
Returns:
134
bytes: Data read from stream
135
"""
136
```
137
138
### DeferredOpenFile
139
140
File-like object that defers opening the actual file until first access, useful for preparing file operations without immediate resource consumption.
141
142
```python { .api }
143
class DeferredOpenFile:
144
"""
145
File-like object that defers opening until first access.
146
147
Args:
148
filename (str): Path to file
149
mode (str): File open mode
150
open_func (callable, optional): Function to use for opening file
151
"""
152
def __init__(self, filename: str, mode: str, open_func=None): ...
153
154
def read(self, amount=None) -> bytes:
155
"""
156
Read from file, opening if necessary.
157
158
Args:
159
amount (int, optional): Number of bytes to read
160
161
Returns:
162
bytes: Data read from file
163
"""
164
165
def write(self, data: bytes) -> int:
166
"""
167
Write to file, opening if necessary.
168
169
Args:
170
data (bytes): Data to write
171
172
Returns:
173
int: Number of bytes written
174
"""
175
176
def seek(self, where: int, whence: int = 0):
177
"""
178
Seek to position in file, opening if necessary.
179
180
Args:
181
where (int): Position to seek to
182
whence (int): Reference point for position
183
"""
184
185
def tell(self) -> int:
186
"""
187
Get current position in file, opening if necessary.
188
189
Returns:
190
int: Current file position
191
"""
192
193
def close(self):
194
"""Close file if open."""
195
196
@property
197
def name(self) -> str:
198
"""
199
Get filename.
200
201
Returns:
202
str: Filename
203
"""
204
```
205
206
### OSUtils
207
208
Enhanced OS utility functions providing file operations, size queries, and file chunk reader creation with comprehensive error handling.
209
210
```python { .api }
211
class OSUtils:
212
"""
213
Enhanced OS utility functions for file operations.
214
"""
215
def get_file_size(self, filename: str) -> int:
216
"""
217
Get file size in bytes.
218
219
Args:
220
filename (str): Path to file
221
222
Returns:
223
int: File size in bytes
224
225
Raises:
226
OSError: If file cannot be accessed
227
"""
228
229
def open_file_chunk_reader(self, filename: str, start_byte: int, size: int, callbacks):
230
"""
231
Open a file chunk reader with progress callback.
232
233
Args:
234
filename (str): Path to file
235
start_byte (int): Starting position in file
236
size (int): Size of chunk to read
237
callbacks: Progress callback functions (list or single callback)
238
239
Returns:
240
ReadFileChunk: File chunk reader instance
241
"""
242
243
def open_file_chunk_reader_from_fileobj(self, fileobj, chunk_size, full_file_size, callbacks, close_callbacks=None):
244
"""
245
Open a file chunk reader from existing file object.
246
247
Args:
248
fileobj: File object to read from
249
chunk_size: Size of chunk to read
250
full_file_size: Full size of the file
251
callbacks: Progress callback functions (list or single callback)
252
close_callbacks: Callbacks to execute when closing (optional)
253
254
Returns:
255
ReadFileChunk: File chunk reader instance
256
"""
257
258
def open(self, filename: str, mode: str):
259
"""
260
Open a file.
261
262
Args:
263
filename (str): Path to file
264
mode (str): File open mode
265
266
Returns:
267
File object
268
"""
269
270
def remove_file(self, filename: str):
271
"""
272
Remove a file (no-op if doesn't exist).
273
274
Args:
275
filename (str): Path to file to remove
276
"""
277
278
def rename_file(self, current_filename: str, new_filename: str):
279
"""
280
Rename a file.
281
282
Args:
283
current_filename (str): Current filename
284
new_filename (str): New filename
285
"""
286
287
def is_special_file(self, filename: str) -> bool:
288
"""
289
Check if file is a special file (device, pipe, etc.).
290
291
Args:
292
filename (str): Path to file
293
294
Returns:
295
bool: True if file is special, False otherwise
296
"""
297
298
def get_temp_filename(self, filename: str) -> str:
299
"""
300
Get a temporary filename based on the given filename.
301
302
Args:
303
filename (str): Base filename
304
305
Returns:
306
str: Temporary filename
307
"""
308
309
def allocate(self, filename: str, size: int):
310
"""
311
Allocate space for a file.
312
313
Args:
314
filename (str): Path to file
315
size (int): Size in bytes to allocate
316
"""
317
```
318
319
### Utility Classes
320
321
Additional utility classes for managing callbacks, function containers, and semaphores.
322
323
```python { .api }
324
class CallArgs:
325
"""
326
Records and stores call arguments as attributes.
327
328
Args:
329
**kwargs: Keyword arguments to store as attributes
330
"""
331
def __init__(self, **kwargs): ...
332
333
class FunctionContainer:
334
"""
335
Container for storing function with args and kwargs.
336
337
Args:
338
function: Function to store
339
*args: Positional arguments for function
340
**kwargs: Keyword arguments for function
341
"""
342
def __init__(self, function, *args, **kwargs): ...
343
344
class CountCallbackInvoker:
345
"""
346
Invokes callback when internal count reaches zero.
347
348
Args:
349
callback (callable): Function to call when count reaches zero
350
"""
351
def __init__(self, callback): ...
352
353
def increment(self):
354
"""Increment the counter."""
355
356
def decrement(self):
357
"""Decrement the counter, calling callback if it reaches zero."""
358
359
def finalize(self):
360
"""Force callback invocation regardless of count."""
361
362
@property
363
def current_count(self) -> int:
364
"""
365
Current count value.
366
367
Returns:
368
int: Current count
369
"""
370
371
class TaskSemaphore:
372
"""
373
Semaphore for coordinating task execution with tagging support.
374
375
Args:
376
capacity (int): Maximum number of permits
377
"""
378
def __init__(self, capacity: int): ...
379
380
def acquire(self, task_tag, blocking: bool = True):
381
"""
382
Acquire a permit.
383
384
Args:
385
task_tag: Tag identifying the task type
386
blocking (bool): Whether to block if no permits available
387
388
Returns:
389
Token: Acquire token for later release
390
"""
391
392
def release(self, task_tag, acquire_token):
393
"""
394
Release a permit.
395
396
Args:
397
task_tag: Tag identifying the task type
398
acquire_token: Token from acquire() call
399
"""
400
401
class ChunksizeAdjuster:
402
"""
403
Adjusts chunk sizes to comply with S3 multipart upload limits.
404
"""
405
def adjust_chunksize(self, current_chunksize: int, file_size: int, max_parts: int = 10000) -> int:
406
"""
407
Adjust chunk size to ensure number of parts doesn't exceed limit.
408
409
Args:
410
current_chunksize (int): Current chunk size
411
file_size (int): Total file size
412
max_parts (int): Maximum number of parts allowed
413
414
Returns:
415
int: Adjusted chunk size
416
"""
417
```
418
419
## Usage Examples
420
421
### Basic File Chunk Reading
422
423
```python
424
from s3transfer.utils import ReadFileChunk
425
426
def progress_callback(bytes_read):
427
print(f"Read {bytes_read} bytes")
428
429
# Read a specific chunk of a file
430
with ReadFileChunk.from_filename(
431
'/tmp/large_file.dat',
432
start_byte=1024, # Start at byte 1024
433
chunk_size=8 * 1024 * 1024, # Read up to 8MB
434
callback=progress_callback
435
) as chunk:
436
data = chunk.read(1024) # Read 1KB
437
print(f"Current position: {chunk.tell()}")
438
439
chunk.seek(2048) # Seek to byte 2048 within chunk
440
more_data = chunk.read() # Read remaining data in chunk
441
```
442
443
### Progress Tracking for Stream Reading
444
445
```python
446
from s3transfer.utils import StreamReaderProgress
447
import boto3
448
449
def download_progress(bytes_read):
450
print(f"Downloaded {bytes_read} bytes")
451
452
# Download with progress tracking
453
client = boto3.client('s3')
454
response = client.get_object(Bucket='my-bucket', Key='large-file.dat')
455
456
# Wrap the streaming body with progress tracking
457
progress_stream = StreamReaderProgress(response['Body'], download_progress)
458
459
# Read in chunks
460
with open('/tmp/downloaded.dat', 'wb') as f:
461
while True:
462
chunk = progress_stream.read(8192) # 8KB chunks
463
if not chunk:
464
break
465
f.write(chunk)
466
```
467
468
### Deferred File Operations
469
470
```python
471
from s3transfer.utils import DeferredOpenFile
472
473
# Create deferred file (doesn't open yet)
474
deferred_file = DeferredOpenFile('/tmp/output.txt', 'w')
475
476
# File is opened only when first accessed
477
deferred_file.write(b'Hello, world!') # File opened here
478
deferred_file.write(b'More data') # File already open
479
480
print(f"Filename: {deferred_file.name}")
481
deferred_file.close()
482
```
483
484
### Advanced OS Utilities
485
486
```python
487
from s3transfer.utils import OSUtils
488
import os
489
490
osutil = OSUtils()
491
492
# File size operations
493
filename = '/tmp/test_file.dat'
494
file_size = osutil.get_file_size(filename)
495
print(f"File size: {file_size} bytes")
496
497
# Check if file is special (device, pipe, etc.)
498
if osutil.is_special_file(filename):
499
print("File is a special file")
500
else:
501
print("File is a regular file")
502
503
# Get temporary filename
504
temp_filename = osutil.get_temp_filename(filename)
505
print(f"Temporary filename: {temp_filename}")
506
507
# Safe file operations
508
osutil.remove_file('/tmp/might_not_exist.txt') # No error if doesn't exist
509
510
# Allocate space for large file (on supported filesystems)
511
try:
512
osutil.allocate('/tmp/large_file.dat', 1024 * 1024 * 1024) # 1GB
513
print("Space allocated successfully")
514
except OSError as e:
515
print(f"Space allocation failed: {e}")
516
```
517
518
### Chunk Size Adjustment
519
520
```python
521
from s3transfer.utils import ChunksizeAdjuster
522
523
adjuster = ChunksizeAdjuster()
524
525
# Adjust chunk size for large file to stay within S3 limits
526
file_size = 5 * 1024 * 1024 * 1024 # 5GB
527
current_chunk_size = 8 * 1024 * 1024 # 8MB
528
529
adjusted_size = adjuster.adjust_chunksize(
530
current_chunksize=current_chunk_size,
531
file_size=file_size,
532
max_parts=10000 # S3 limit
533
)
534
535
print(f"Original chunk size: {current_chunk_size}")
536
print(f"Adjusted chunk size: {adjusted_size}")
537
print(f"Number of parts: {file_size // adjusted_size}")
538
```
539
540
### Callback Management
541
542
```python
543
from s3transfer.utils import CountCallbackInvoker
544
545
def completion_callback():
546
print("All operations completed!")
547
548
# Create callback invoker that triggers when count reaches zero
549
invoker = CountCallbackInvoker(completion_callback)
550
551
# Simulate multiple operations
552
operations = ['upload1', 'upload2', 'upload3']
553
554
# Increment for each operation
555
for op in operations:
556
invoker.increment()
557
print(f"Started operation: {op}")
558
559
print(f"Current count: {invoker.current_count}")
560
561
# Decrement as operations complete
562
for op in operations:
563
invoker.decrement()
564
print(f"Completed operation: {op}, remaining: {invoker.current_count}")
565
# Callback is called when count reaches zero
566
```
567
568
### Task Coordination with Semaphores
569
570
```python
571
from s3transfer.utils import TaskSemaphore
572
573
# Create semaphore for limiting concurrent operations
574
semaphore = TaskSemaphore(capacity=5) # Max 5 concurrent operations
575
576
def perform_operation(task_id):
577
# Acquire permit
578
token = semaphore.acquire('upload_task')
579
try:
580
print(f"Performing operation {task_id}")
581
# Simulate work
582
time.sleep(1)
583
print(f"Completed operation {task_id}")
584
finally:
585
# Always release permit
586
semaphore.release('upload_task', token)
587
588
# Start multiple operations (only 5 will run concurrently)
589
import threading
590
591
threads = []
592
for i in range(10):
593
thread = threading.Thread(target=perform_operation, args=(i,))
594
threads.append(thread)
595
thread.start()
596
597
# Wait for all to complete
598
for thread in threads:
599
thread.join()
600
```
601
602
### File Chunk Reading with Transfer State
603
604
```python
605
from s3transfer.utils import ReadFileChunk
606
607
def transfer_progress(bytes_read):
608
print(f"Transfer progress: {bytes_read} bytes")
609
610
filename = '/tmp/large_upload.dat'
611
chunk_size = 64 * 1024 * 1024 # 64MB chunks
612
file_size = os.path.getsize(filename)
613
614
# Read file in chunks for multipart upload
615
chunks_processed = 0
616
start_byte = 0
617
618
while start_byte < file_size:
619
with ReadFileChunk.from_filename(
620
filename,
621
start_byte=start_byte,
622
chunk_size=chunk_size,
623
callback=transfer_progress
624
) as chunk:
625
# Signal that transfer is active
626
chunk.signal_transferring()
627
628
try:
629
# Process chunk (simulate upload)
630
data = chunk.read()
631
print(f"Processing chunk {chunks_processed + 1}, size: {len(data)}")
632
633
# Simulate upload process
634
bytes_uploaded = 0
635
while bytes_uploaded < len(data):
636
# Upload in smaller increments
637
increment = min(8192, len(data) - bytes_uploaded)
638
bytes_uploaded += increment
639
# Progress is automatically reported via callback
640
641
finally:
642
# Signal transfer is no longer active
643
chunk.signal_not_transferring()
644
645
chunks_processed += 1
646
start_byte += chunk_size
647
648
print(f"Processed {chunks_processed} chunks total")
649
```
650
651
## Utility Functions
652
653
### Progress and Callback Utilities
654
655
```python { .api }
656
def get_callbacks(subscribers, callback_type: str) -> List[callable]:
657
"""
658
Extract callbacks of a specific type from subscriber objects.
659
660
Args:
661
subscribers: List of subscriber objects
662
callback_type (str): Type of callback to extract
663
664
Returns:
665
list: List of callback functions
666
"""
667
668
def invoke_progress_callbacks(callbacks: List[callable], bytes_transferred: int):
669
"""
670
Invoke progress callbacks with bytes transferred.
671
672
Args:
673
callbacks: List of callback functions
674
bytes_transferred (int): Number of bytes transferred
675
"""
676
677
def calculate_num_parts(size: int, part_size: int) -> int:
678
"""
679
Calculate number of parts needed for multipart upload.
680
681
Args:
682
size (int): Total size in bytes
683
part_size (int): Size per part in bytes
684
685
Returns:
686
int: Number of parts needed
687
"""
688
689
def calculate_range_parameter(start_range: int, end_range: int) -> str:
690
"""
691
Calculate HTTP Range parameter for partial downloads.
692
693
Args:
694
start_range (int): Start byte position
695
end_range (int): End byte position
696
697
Returns:
698
str: Range parameter string (e.g., 'bytes=0-1023')
699
"""
700
701
def get_filtered_dict(original_dict: dict, allowed_keys: List[str]) -> dict:
702
"""
703
Filter dictionary to only include allowed keys.
704
705
Args:
706
original_dict (dict): Original dictionary
707
allowed_keys (list): List of allowed keys
708
709
Returns:
710
dict: Filtered dictionary
711
"""
712
713
def random_file_extension(num_digits: int = 8) -> str:
714
"""
715
Generate random file extension.
716
717
Args:
718
num_digits (int): Number of digits in extension
719
720
Returns:
721
str: Random file extension
722
"""
723
```
724
725
## Best Practices
726
727
### File Chunk Reading
728
729
1. **Use context managers**: Always use `with` statements for ReadFileChunk
730
2. **Handle large files**: Use appropriate chunk sizes for memory management
731
3. **Monitor progress**: Implement progress callbacks for user feedback
732
4. **Signal transfer state**: Use `signal_transferring()` and `signal_not_transferring()`
733
734
### Progress Tracking
735
736
1. **Provide meaningful feedback**: Use progress callbacks to inform users
737
2. **Handle zero-byte transfers**: Check for empty files or streams
738
3. **Aggregate progress**: Combine progress from multiple sources when needed
739
4. **Debounce callbacks**: Avoid excessive callback frequency for performance
740
741
### Resource Management
742
743
1. **Close files properly**: Use context managers or explicit close() calls
744
2. **Handle exceptions**: Ensure cleanup even when errors occur
745
3. **Limit memory usage**: Don't read entire large files into memory
746
4. **Validate file operations**: Check file existence and permissions
747
748
### OS Utilities
749
750
1. **Handle cross-platform differences**: Use OSUtils for portable file operations
751
2. **Check special files**: Use `is_special_file()` before operations
752
3. **Safe file removal**: Use `remove_file()` which handles missing files
753
4. **Temporary files**: Use `get_temp_filename()` for atomic operations