0
# ZIP File Support
1
2
Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. The S3 connector provides efficient handling of ZIP archives, including streaming decompression and individual file access without downloading entire archives.
3
4
## Capabilities
5
6
### ZIP File Handler
7
8
Main class for discovering and accessing files within ZIP archives stored in S3, supporting both standard and ZIP64 formats.
9
10
```python { .api }
11
class ZipFileHandler:
12
"""
13
Handles ZIP file discovery and metadata extraction from S3.
14
Supports both standard ZIP and ZIP64 formats with efficient partial file reading.
15
"""
16
17
# ZIP format constants
18
EOCD_SIGNATURE: bytes = b"\x50\x4b\x05\x06"
19
"""End of Central Directory signature"""
20
21
ZIP64_LOCATOR_SIGNATURE: bytes = b"\x50\x4b\x06\x07"
22
"""ZIP64 End of Central Directory Locator signature"""
23
24
EOCD_CENTRAL_DIR_START_OFFSET: int = 16
25
"""Offset to central directory start in EOCD record"""
26
27
ZIP64_EOCD_OFFSET: int = 8
28
"""Offset to ZIP64 EOCD in locator record"""
29
30
ZIP64_EOCD_SIZE: int = 56
31
"""Size of ZIP64 End of Central Directory record"""
32
33
ZIP64_CENTRAL_DIR_START_OFFSET: int = 48
34
"""Offset to central directory start in ZIP64 EOCD"""
35
36
def __init__(self, s3_client: BaseClient, config: Config):
37
"""
38
Initialize ZIP file handler with S3 client and configuration.
39
40
Args:
41
s3_client: Configured S3 client for file access
42
config: S3 connector configuration
43
"""
44
45
def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]:
46
"""
47
Extracts ZIP file metadata and returns list of contained files.
48
49
Args:
50
filename: S3 key of the ZIP file
51
52
Returns:
53
Tuple of (list of ZipInfo objects, central directory start offset)
54
55
Raises:
56
ValueError: If file is not a valid ZIP archive
57
ClientError: If S3 access fails
58
"""
59
60
def _fetch_data_from_s3(self, filename: str, start: int, size: Optional[int] = None) -> bytes:
61
"""
62
Fetches specific byte range from S3 object.
63
64
Args:
65
filename: S3 key of the file
66
start: Starting byte position
67
size: Number of bytes to fetch (None for rest of file)
68
69
Returns:
70
Bytes data from the specified range
71
"""
72
73
def _find_signature(
74
self,
75
filename: str,
76
signature: bytes,
77
initial_buffer_size: int = BUFFER_SIZE_DEFAULT,
78
max_buffer_size: int = MAX_BUFFER_SIZE_DEFAULT
79
) -> Optional[bytes]:
80
"""
81
Locates ZIP signature by reading backwards from end of file.
82
83
Args:
84
filename: S3 key of the ZIP file
85
signature: Byte signature to search for
86
initial_buffer_size: Initial buffer size for searching
87
max_buffer_size: Maximum buffer size to prevent excessive memory usage
88
89
Returns:
90
Buffer containing the signature, or None if not found
91
"""
92
93
def _fetch_zip64_data(self, filename: str) -> bytes:
94
"""
95
Fetches ZIP64 Extended Information Extra Field data.
96
97
Args:
98
filename: S3 key of the ZIP file
99
100
Returns:
101
ZIP64 extra field data
102
"""
103
104
def _get_central_directory_start(self, filename: str) -> int:
105
"""
106
Determines the start offset of the central directory.
107
108
Args:
109
filename: S3 key of the ZIP file
110
111
Returns:
112
Byte offset where central directory begins
113
"""
114
```
115
116
### Remote File Inside Archive
117
118
Extended RemoteFile class representing a file contained within a ZIP archive, including compression metadata.
119
120
```python { .api }
121
class RemoteFileInsideArchive(RemoteFile):
122
"""
123
Represents a file inside a ZIP archive with compression metadata.
124
Extends RemoteFile with ZIP-specific information.
125
"""
126
127
start_offset: int
128
"""Byte offset where compressed data begins in the ZIP file"""
129
130
compressed_size: int
131
"""Size of the compressed data in bytes"""
132
133
uncompressed_size: int
134
"""Size of the uncompressed data in bytes"""
135
136
compression_method: int
137
"""ZIP compression method (0=stored, 8=deflated, etc.)"""
138
```
139
140
### Decompressed Stream
141
142
Streaming decompression interface for reading compressed files from ZIP archives without loading entire files into memory.
143
144
```python { .api }
145
class DecompressedStream(io.IOBase):
146
"""
147
Provides streaming decompression of files within ZIP archives.
148
Supports seek operations and efficient memory usage for large compressed files.
149
"""
150
151
LOCAL_FILE_HEADER_SIZE: int = 30
152
"""Size of ZIP local file header"""
153
154
NAME_LENGTH_OFFSET: int = 26
155
"""Offset to filename length in local file header"""
156
157
def __init__(
158
self,
159
file_obj: IO[bytes],
160
file_info: RemoteFileInsideArchive,
161
buffer_size: int = BUFFER_SIZE_DEFAULT
162
):
163
"""
164
Initialize decompressed stream for a file inside ZIP archive.
165
166
Args:
167
file_obj: File-like object for the ZIP archive
168
file_info: Metadata about the file inside the archive
169
buffer_size: Buffer size for decompression operations
170
"""
171
172
def read(self, size: int = -1) -> bytes:
173
"""
174
Read decompressed data from the stream.
175
176
Args:
177
size: Number of bytes to read (-1 for all remaining data)
178
179
Returns:
180
Decompressed bytes data
181
"""
182
183
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
184
"""
185
Seek to a specific position in the decompressed stream.
186
187
Args:
188
offset: Byte offset to seek to
189
whence: Seek mode (SEEK_SET, SEEK_CUR, SEEK_END)
190
191
Returns:
192
New absolute position in the stream
193
"""
194
195
def tell(self) -> int:
196
"""
197
Get current position in the decompressed stream.
198
199
Returns:
200
Current byte position
201
"""
202
203
def readable(self) -> bool:
204
"""
205
Check if stream is readable.
206
207
Returns:
208
True if stream can be read from
209
"""
210
211
def seekable(self) -> bool:
212
"""
213
Check if stream supports seeking.
214
215
Returns:
216
True if stream supports seek operations
217
"""
218
219
def close(self):
220
"""Close the decompressed stream and release resources."""
221
222
def _calculate_actual_start(self, file_start: int) -> int:
223
"""
224
Calculate actual start position accounting for local file header.
225
226
Args:
227
file_start: Start position from central directory
228
229
Returns:
230
Actual start position of compressed data
231
"""
232
233
def _reset_decompressor(self):
234
"""Reset the decompression state for seeking operations."""
235
236
def _decompress_chunk(self, chunk: bytes) -> bytes:
237
"""
238
Decompress a chunk of data using the appropriate algorithm.
239
240
Args:
241
chunk: Compressed data chunk
242
243
Returns:
244
Decompressed data chunk
245
"""
246
```
247
248
### ZIP Content Reader
249
250
High-level interface for reading content from files within ZIP archives, providing both text and binary reading capabilities.
251
252
```python { .api }
253
class ZipContentReader:
254
"""
255
High-level interface for reading content from ZIP archive files.
256
Provides text and binary reading modes with encoding support.
257
"""
258
259
def __init__(
260
self,
261
decompressed_stream: DecompressedStream,
262
encoding: Optional[str] = None,
263
buffer_size: int = BUFFER_SIZE_DEFAULT
264
):
265
"""
266
Initialize ZIP content reader.
267
268
Args:
269
decompressed_stream: DecompressedStream for the file
270
encoding: Text encoding for string operations (None for binary mode)
271
buffer_size: Buffer size for reading operations
272
"""
273
274
def __iter__(self):
275
"""
276
Iterator interface for reading lines from the file.
277
278
Yields:
279
Lines from the file (str if encoding specified, bytes otherwise)
280
"""
281
282
def __next__(self) -> Union[str, bytes]:
283
"""
284
Get next line from the file.
285
286
Returns:
287
Next line from file
288
289
Raises:
290
StopIteration: When end of file is reached
291
"""
292
293
def __enter__(self) -> "ZipContentReader":
294
"""Context manager entry."""
295
296
def __exit__(self, exc_type, exc_value, traceback) -> None:
297
"""Context manager exit."""
298
299
def readline(self, limit: int = -1) -> Union[str, bytes]:
300
"""
301
Read a single line from the file.
302
303
Args:
304
limit: Maximum number of characters/bytes to read
305
306
Returns:
307
Single line from file
308
"""
309
310
def read(self, size: int = -1) -> Union[str, bytes]:
311
"""
312
Read data from the file.
313
314
Args:
315
size: Number of characters/bytes to read (-1 for all)
316
317
Returns:
318
File content as string or bytes
319
"""
320
321
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
322
"""
323
Seek to position in the file.
324
325
Args:
326
offset: Position to seek to
327
whence: Seek mode
328
329
Returns:
330
New position in file
331
"""
332
333
def close(self):
334
"""Close the content reader and release resources."""
335
336
def tell(self) -> int:
337
"""
338
Get current position in the file.
339
340
Returns:
341
Current position
342
"""
343
344
@property
345
def closed(self) -> bool:
346
"""
347
Check if reader is closed.
348
349
Returns:
350
True if reader is closed
351
"""
352
```
353
354
## Usage Examples
355
356
### Basic ZIP File Processing
357
358
```python
359
from source_s3.v4 import SourceS3StreamReader, Config
360
from source_s3.v4.zip_reader import ZipFileHandler, ZipContentReader, DecompressedStream
361
362
# Configure S3 connection
363
config = Config(
364
bucket="my-data-bucket",
365
aws_access_key_id="your-access-key",
366
aws_secret_access_key="your-secret-key",
367
region_name="us-east-1"
368
)
369
370
# Create stream reader and get S3 client
371
reader = SourceS3StreamReader()
372
reader.config = config
373
s3_client = reader.s3_client
374
375
# Initialize ZIP handler
376
zip_handler = ZipFileHandler(s3_client, config)
377
378
# Discover files in ZIP archive
379
zip_files, central_dir_offset = zip_handler.get_zip_files("data/archive.zip")
380
381
print(f"Found {len(zip_files)} files in archive:")
382
for zip_info in zip_files:
383
print(f" - {zip_info.filename} ({zip_info.file_size} bytes)")
384
```
385
386
### Reading Individual Files from ZIP
387
388
```python
389
import io
390
from source_s3.v4.zip_reader import RemoteFileInsideArchive, DecompressedStream, ZipContentReader
391
392
# Select a specific file from the ZIP
393
target_file = zip_files[0] # First file in archive
394
395
# Create RemoteFileInsideArchive object
396
archive_file = RemoteFileInsideArchive(
397
uri=f"s3://my-data-bucket/data/archive.zip/{target_file.filename}",
398
start_offset=target_file.header_offset,
399
compressed_size=target_file.compress_size,
400
uncompressed_size=target_file.file_size,
401
compression_method=target_file.compress_type,
402
last_modified=None
403
)
404
405
# Open S3 object
406
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
407
s3_stream = s3_response['Body']
408
409
# Create decompressed stream
410
decompressed = DecompressedStream(s3_stream, archive_file)
411
412
# Read content with encoding (for text files)
413
with ZipContentReader(decompressed, encoding="utf-8") as reader:
414
content = reader.read()
415
print(f"File content ({len(content)} characters):")
416
print(content[:500]) # First 500 characters
417
```
418
419
### Processing CSV Files from ZIP
420
421
```python
422
import csv
423
from io import StringIO
424
425
# Assuming we have a CSV file in the ZIP
426
csv_file = next(f for f in zip_files if f.filename.endswith('.csv'))
427
428
# Create archive file representation
429
csv_archive_file = RemoteFileInsideArchive(
430
uri=f"s3://my-data-bucket/data/archive.zip/{csv_file.filename}",
431
start_offset=csv_file.header_offset,
432
compressed_size=csv_file.compress_size,
433
uncompressed_size=csv_file.file_size,
434
compression_method=csv_file.compress_type,
435
last_modified=None
436
)
437
438
# Process CSV data
439
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
440
decompressed = DecompressedStream(s3_response['Body'], csv_archive_file)
441
442
with ZipContentReader(decompressed, encoding="utf-8") as reader:
443
csv_content = reader.read()
444
csv_reader = csv.DictReader(StringIO(csv_content))
445
446
for row_num, row in enumerate(csv_reader):
447
print(f"Row {row_num}: {row}")
448
if row_num >= 5: # Show first 5 rows
449
break
450
```
451
452
### Streaming Large Files from ZIP
453
454
```python
455
# For large files, use streaming approach
456
large_file = max(zip_files, key=lambda f: f.file_size)
457
458
large_archive_file = RemoteFileInsideArchive(
459
uri=f"s3://my-data-bucket/data/archive.zip/{large_file.filename}",
460
start_offset=large_file.header_offset,
461
compressed_size=large_file.compress_size,
462
uncompressed_size=large_file.file_size,
463
compression_method=large_file.compress_type,
464
last_modified=None
465
)
466
467
# Stream content in chunks
468
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
469
decompressed = DecompressedStream(s3_response['Body'], large_archive_file, buffer_size=64*1024)
470
471
with ZipContentReader(decompressed, encoding="utf-8") as reader:
472
chunk_size = 1024 * 1024 # 1MB chunks
473
total_size = 0
474
475
while True:
476
chunk = reader.read(chunk_size)
477
if not chunk:
478
break
479
480
total_size += len(chunk)
481
print(f"Processed {total_size} characters...")
482
483
# Process chunk here
484
# process_data_chunk(chunk)
485
```
486
487
### Error Handling with ZIP Files
488
489
```python
490
from botocore.exceptions import ClientError
491
492
try:
493
# Attempt to process ZIP file
494
zip_files, _ = zip_handler.get_zip_files("data/potentially-corrupt.zip")
495
496
for zip_info in zip_files:
497
try:
498
archive_file = RemoteFileInsideArchive(
499
uri=f"s3://my-data-bucket/data/potentially-corrupt.zip/{zip_info.filename}",
500
start_offset=zip_info.header_offset,
501
compressed_size=zip_info.compress_size,
502
uncompressed_size=zip_info.file_size,
503
compression_method=zip_info.compress_type,
504
last_modified=None
505
)
506
507
s3_response = s3_client.get_object(
508
Bucket="my-data-bucket",
509
Key="data/potentially-corrupt.zip"
510
)
511
decompressed = DecompressedStream(s3_response['Body'], archive_file)
512
513
with ZipContentReader(decompressed, encoding="utf-8") as reader:
514
content = reader.read()
515
print(f"Successfully processed {zip_info.filename}")
516
517
except Exception as e:
518
print(f"Failed to process {zip_info.filename}: {e}")
519
continue
520
521
except ValueError as e:
522
print(f"Invalid ZIP file: {e}")
523
except ClientError as e:
524
print(f"S3 access error: {e}")
525
```
526
527
## Performance Considerations
528
529
### Buffer Size Optimization
530
- **Small files**: Use default buffer size (1MB) for optimal memory usage
531
- **Large files**: Increase buffer size (4-16MB) for better I/O performance
532
- **Memory constraints**: Decrease buffer size if memory is limited
533
534
### Streaming vs. Full Read
535
- **Large files**: Always use streaming approach to avoid memory issues
536
- **Small files**: Full read can be more efficient for files under 10MB
537
- **Mixed sizes**: Implement size-based strategy selection
538
539
### S3 Transfer Optimization
540
- **Range requests**: ZIP handler uses efficient byte-range requests
541
- **Connection reuse**: S3 client connection pooling improves performance
542
- **Regional proximity**: Use S3 buckets in same region as processing
543
544
### Compression Method Support
545
- **Stored (method 0)**: No compression, fastest extraction
546
- **Deflated (method 8)**: Standard compression, good balance of speed/size
547
- **Other methods**: Limited support, may require additional libraries