S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. The S3 connector provides efficient handling of ZIP archives, including streaming decompression and individual file access without downloading entire archives.
Main class for discovering and accessing files within ZIP archives stored in S3, supporting both standard and ZIP64 formats.
class ZipFileHandler:
"""
Handles ZIP file discovery and metadata extraction from S3.
Supports both standard ZIP and ZIP64 formats with efficient partial file reading.
"""
# ZIP format constants
EOCD_SIGNATURE: bytes = b"\x50\x4b\x05\x06"
"""End of Central Directory signature"""
ZIP64_LOCATOR_SIGNATURE: bytes = b"\x50\x4b\x06\x07"
"""ZIP64 End of Central Directory Locator signature"""
EOCD_CENTRAL_DIR_START_OFFSET: int = 16
"""Offset to central directory start in EOCD record"""
ZIP64_EOCD_OFFSET: int = 8
"""Offset to ZIP64 EOCD in locator record"""
ZIP64_EOCD_SIZE: int = 56
"""Size of ZIP64 End of Central Directory record"""
ZIP64_CENTRAL_DIR_START_OFFSET: int = 48
"""Offset to central directory start in ZIP64 EOCD"""
def __init__(self, s3_client: BaseClient, config: Config):
"""
Initialize ZIP file handler with S3 client and configuration.
Args:
s3_client: Configured S3 client for file access
config: S3 connector configuration
"""
def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]:
"""
Extracts ZIP file metadata and returns list of contained files.
Args:
filename: S3 key of the ZIP file
Returns:
Tuple of (list of ZipInfo objects, central directory start offset)
Raises:
ValueError: If file is not a valid ZIP archive
ClientError: If S3 access fails
"""
def _fetch_data_from_s3(self, filename: str, start: int, size: Optional[int] = None) -> bytes:
"""
Fetches specific byte range from S3 object.
Args:
filename: S3 key of the file
start: Starting byte position
size: Number of bytes to fetch (None for rest of file)
Returns:
Bytes data from the specified range
"""
def _find_signature(
self,
filename: str,
signature: bytes,
initial_buffer_size: int = BUFFER_SIZE_DEFAULT,
max_buffer_size: int = MAX_BUFFER_SIZE_DEFAULT
) -> Optional[bytes]:
"""
Locates ZIP signature by reading backwards from end of file.
Args:
filename: S3 key of the ZIP file
signature: Byte signature to search for
initial_buffer_size: Initial buffer size for searching
max_buffer_size: Maximum buffer size to prevent excessive memory usage
Returns:
Buffer containing the signature, or None if not found
"""
def _fetch_zip64_data(self, filename: str) -> bytes:
"""
Fetches ZIP64 Extended Information Extra Field data.
Args:
filename: S3 key of the ZIP file
Returns:
ZIP64 extra field data
"""
def _get_central_directory_start(self, filename: str) -> int:
"""
Determines the start offset of the central directory.
Args:
filename: S3 key of the ZIP file
Returns:
Byte offset where central directory begins
"""Extended RemoteFile class representing a file contained within a ZIP archive, including compression metadata.
class RemoteFileInsideArchive(RemoteFile):
"""
Represents a file inside a ZIP archive with compression metadata.
Extends RemoteFile with ZIP-specific information.
"""
start_offset: int
"""Byte offset where compressed data begins in the ZIP file"""
compressed_size: int
"""Size of the compressed data in bytes"""
uncompressed_size: int
"""Size of the uncompressed data in bytes"""
compression_method: int
"""ZIP compression method (0=stored, 8=deflated, etc.)"""Streaming decompression interface for reading compressed files from ZIP archives without loading entire files into memory.
class DecompressedStream(io.IOBase):
"""
Provides streaming decompression of files within ZIP archives.
Supports seek operations and efficient memory usage for large compressed files.
"""
LOCAL_FILE_HEADER_SIZE: int = 30
"""Size of ZIP local file header"""
NAME_LENGTH_OFFSET: int = 26
"""Offset to filename length in local file header"""
def __init__(
self,
file_obj: IO[bytes],
file_info: RemoteFileInsideArchive,
buffer_size: int = BUFFER_SIZE_DEFAULT
):
"""
Initialize decompressed stream for a file inside ZIP archive.
Args:
file_obj: File-like object for the ZIP archive
file_info: Metadata about the file inside the archive
buffer_size: Buffer size for decompression operations
"""
def read(self, size: int = -1) -> bytes:
"""
Read decompressed data from the stream.
Args:
size: Number of bytes to read (-1 for all remaining data)
Returns:
Decompressed bytes data
"""
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
"""
Seek to a specific position in the decompressed stream.
Args:
offset: Byte offset to seek to
whence: Seek mode (SEEK_SET, SEEK_CUR, SEEK_END)
Returns:
New absolute position in the stream
"""
def tell(self) -> int:
"""
Get current position in the decompressed stream.
Returns:
Current byte position
"""
def readable(self) -> bool:
"""
Check if stream is readable.
Returns:
True if stream can be read from
"""
def seekable(self) -> bool:
"""
Check if stream supports seeking.
Returns:
True if stream supports seek operations
"""
def close(self):
"""Close the decompressed stream and release resources."""
def _calculate_actual_start(self, file_start: int) -> int:
"""
Calculate actual start position accounting for local file header.
Args:
file_start: Start position from central directory
Returns:
Actual start position of compressed data
"""
def _reset_decompressor(self):
"""Reset the decompression state for seeking operations."""
def _decompress_chunk(self, chunk: bytes) -> bytes:
"""
Decompress a chunk of data using the appropriate algorithm.
Args:
chunk: Compressed data chunk
Returns:
Decompressed data chunk
"""High-level interface for reading content from files within ZIP archives, providing both text and binary reading capabilities.
class ZipContentReader:
"""
High-level interface for reading content from ZIP archive files.
Provides text and binary reading modes with encoding support.
"""
def __init__(
self,
decompressed_stream: DecompressedStream,
encoding: Optional[str] = None,
buffer_size: int = BUFFER_SIZE_DEFAULT
):
"""
Initialize ZIP content reader.
Args:
decompressed_stream: DecompressedStream for the file
encoding: Text encoding for string operations (None for binary mode)
buffer_size: Buffer size for reading operations
"""
def __iter__(self):
"""
Iterator interface for reading lines from the file.
Yields:
Lines from the file (str if encoding specified, bytes otherwise)
"""
def __next__(self) -> Union[str, bytes]:
"""
Get next line from the file.
Returns:
Next line from file
Raises:
StopIteration: When end of file is reached
"""
def __enter__(self) -> "ZipContentReader":
"""Context manager entry."""
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit."""
def readline(self, limit: int = -1) -> Union[str, bytes]:
"""
Read a single line from the file.
Args:
limit: Maximum number of characters/bytes to read
Returns:
Single line from file
"""
def read(self, size: int = -1) -> Union[str, bytes]:
"""
Read data from the file.
Args:
size: Number of characters/bytes to read (-1 for all)
Returns:
File content as string or bytes
"""
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
"""
Seek to position in the file.
Args:
offset: Position to seek to
whence: Seek mode
Returns:
New position in file
"""
def close(self):
"""Close the content reader and release resources."""
def tell(self) -> int:
"""
Get current position in the file.
Returns:
Current position
"""
@property
def closed(self) -> bool:
"""
Check if reader is closed.
Returns:
True if reader is closed
"""from source_s3.v4 import SourceS3StreamReader, Config
from source_s3.v4.zip_reader import ZipFileHandler, ZipContentReader, DecompressedStream
# Configure S3 connection
config = Config(
bucket="my-data-bucket",
aws_access_key_id="your-access-key",
aws_secret_access_key="your-secret-key",
region_name="us-east-1"
)
# Create stream reader and get S3 client
reader = SourceS3StreamReader()
reader.config = config
s3_client = reader.s3_client
# Initialize ZIP handler
zip_handler = ZipFileHandler(s3_client, config)
# Discover files in ZIP archive
zip_files, central_dir_offset = zip_handler.get_zip_files("data/archive.zip")
print(f"Found {len(zip_files)} files in archive:")
for zip_info in zip_files:
print(f" - {zip_info.filename} ({zip_info.file_size} bytes)")import io
from source_s3.v4.zip_reader import RemoteFileInsideArchive, DecompressedStream, ZipContentReader
# Select a specific file from the ZIP
target_file = zip_files[0] # First file in archive
# Create RemoteFileInsideArchive object
archive_file = RemoteFileInsideArchive(
uri=f"s3://my-data-bucket/data/archive.zip/{target_file.filename}",
start_offset=target_file.header_offset,
compressed_size=target_file.compress_size,
uncompressed_size=target_file.file_size,
compression_method=target_file.compress_type,
last_modified=None
)
# Open S3 object
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
s3_stream = s3_response['Body']
# Create decompressed stream
decompressed = DecompressedStream(s3_stream, archive_file)
# Read content with encoding (for text files)
with ZipContentReader(decompressed, encoding="utf-8") as reader:
content = reader.read()
print(f"File content ({len(content)} characters):")
print(content[:500]) # First 500 charactersimport csv
from io import StringIO
# Assuming we have a CSV file in the ZIP
csv_file = next(f for f in zip_files if f.filename.endswith('.csv'))
# Create archive file representation
csv_archive_file = RemoteFileInsideArchive(
uri=f"s3://my-data-bucket/data/archive.zip/{csv_file.filename}",
start_offset=csv_file.header_offset,
compressed_size=csv_file.compress_size,
uncompressed_size=csv_file.file_size,
compression_method=csv_file.compress_type,
last_modified=None
)
# Process CSV data
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
decompressed = DecompressedStream(s3_response['Body'], csv_archive_file)
with ZipContentReader(decompressed, encoding="utf-8") as reader:
csv_content = reader.read()
csv_reader = csv.DictReader(StringIO(csv_content))
for row_num, row in enumerate(csv_reader):
print(f"Row {row_num}: {row}")
if row_num >= 5: # Show first 5 rows
break# For large files, use streaming approach
large_file = max(zip_files, key=lambda f: f.file_size)
large_archive_file = RemoteFileInsideArchive(
uri=f"s3://my-data-bucket/data/archive.zip/{large_file.filename}",
start_offset=large_file.header_offset,
compressed_size=large_file.compress_size,
uncompressed_size=large_file.file_size,
compression_method=large_file.compress_type,
last_modified=None
)
# Stream content in chunks
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
decompressed = DecompressedStream(s3_response['Body'], large_archive_file, buffer_size=64*1024)
with ZipContentReader(decompressed, encoding="utf-8") as reader:
chunk_size = 1024 * 1024 # 1MB chunks
total_size = 0
while True:
chunk = reader.read(chunk_size)
if not chunk:
break
total_size += len(chunk)
print(f"Processed {total_size} characters...")
# Process chunk here
# process_data_chunk(chunk)from botocore.exceptions import ClientError
try:
# Attempt to process ZIP file
zip_files, _ = zip_handler.get_zip_files("data/potentially-corrupt.zip")
for zip_info in zip_files:
try:
archive_file = RemoteFileInsideArchive(
uri=f"s3://my-data-bucket/data/potentially-corrupt.zip/{zip_info.filename}",
start_offset=zip_info.header_offset,
compressed_size=zip_info.compress_size,
uncompressed_size=zip_info.file_size,
compression_method=zip_info.compress_type,
last_modified=None
)
s3_response = s3_client.get_object(
Bucket="my-data-bucket",
Key="data/potentially-corrupt.zip"
)
decompressed = DecompressedStream(s3_response['Body'], archive_file)
with ZipContentReader(decompressed, encoding="utf-8") as reader:
content = reader.read()
print(f"Successfully processed {zip_info.filename}")
except Exception as e:
print(f"Failed to process {zip_info.filename}: {e}")
continue
except ValueError as e:
print(f"Invalid ZIP file: {e}")
except ClientError as e:
print(f"S3 access error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-s3