CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-s3

S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

zip-support.mddocs/

ZIP File Support

Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. The S3 connector provides efficient handling of ZIP archives, including streaming decompression and individual file access without downloading entire archives.

Capabilities

ZIP File Handler

Main class for discovering and accessing files within ZIP archives stored in S3, supporting both standard and ZIP64 formats.

class ZipFileHandler:
    """
    Handles ZIP file discovery and metadata extraction from S3.
    Supports both standard ZIP and ZIP64 formats with efficient partial file reading.
    """
    
    # ZIP format constants
    EOCD_SIGNATURE: bytes = b"\x50\x4b\x05\x06"
    """End of Central Directory signature"""
    
    ZIP64_LOCATOR_SIGNATURE: bytes = b"\x50\x4b\x06\x07"
    """ZIP64 End of Central Directory Locator signature"""
    
    EOCD_CENTRAL_DIR_START_OFFSET: int = 16
    """Offset to central directory start in EOCD record"""
    
    ZIP64_EOCD_OFFSET: int = 8
    """Offset to ZIP64 EOCD in locator record"""
    
    ZIP64_EOCD_SIZE: int = 56
    """Size of ZIP64 End of Central Directory record"""
    
    ZIP64_CENTRAL_DIR_START_OFFSET: int = 48
    """Offset to central directory start in ZIP64 EOCD"""
    
    def __init__(self, s3_client: BaseClient, config: Config):
        """
        Initialize ZIP file handler with S3 client and configuration.
        
        Args:
            s3_client: Configured S3 client for file access
            config: S3 connector configuration
        """
    
    def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]:
        """
        Extracts ZIP file metadata and returns list of contained files.
        
        Args:
            filename: S3 key of the ZIP file
            
        Returns:
            Tuple of (list of ZipInfo objects, central directory start offset)
            
        Raises:
            ValueError: If file is not a valid ZIP archive
            ClientError: If S3 access fails
        """
    
    def _fetch_data_from_s3(self, filename: str, start: int, size: Optional[int] = None) -> bytes:
        """
        Fetches specific byte range from S3 object.
        
        Args:
            filename: S3 key of the file
            start: Starting byte position
            size: Number of bytes to fetch (None for rest of file)
            
        Returns:
            Bytes data from the specified range
        """
    
    def _find_signature(
        self, 
        filename: str, 
        signature: bytes, 
        initial_buffer_size: int = BUFFER_SIZE_DEFAULT, 
        max_buffer_size: int = MAX_BUFFER_SIZE_DEFAULT
    ) -> Optional[bytes]:
        """
        Locates ZIP signature by reading backwards from end of file.
        
        Args:
            filename: S3 key of the ZIP file
            signature: Byte signature to search for
            initial_buffer_size: Initial buffer size for searching
            max_buffer_size: Maximum buffer size to prevent excessive memory usage
            
        Returns:
            Buffer containing the signature, or None if not found
        """
    
    def _fetch_zip64_data(self, filename: str) -> bytes:
        """
        Fetches ZIP64 Extended Information Extra Field data.
        
        Args:
            filename: S3 key of the ZIP file
            
        Returns:
            ZIP64 extra field data
        """
    
    def _get_central_directory_start(self, filename: str) -> int:
        """
        Determines the start offset of the central directory.
        
        Args:
            filename: S3 key of the ZIP file
            
        Returns:
            Byte offset where central directory begins
        """

Remote File Inside Archive

Extended RemoteFile class representing a file contained within a ZIP archive, including compression metadata.

class RemoteFileInsideArchive(RemoteFile):
    """
    Represents a file inside a ZIP archive with compression metadata.
    Extends RemoteFile with ZIP-specific information.
    """
    
    start_offset: int
    """Byte offset where compressed data begins in the ZIP file"""
    
    compressed_size: int
    """Size of the compressed data in bytes"""
    
    uncompressed_size: int
    """Size of the uncompressed data in bytes"""
    
    compression_method: int
    """ZIP compression method (0=stored, 8=deflated, etc.)"""

Decompressed Stream

Streaming decompression interface for reading compressed files from ZIP archives without loading entire files into memory.

class DecompressedStream(io.IOBase):
    """
    Provides streaming decompression of files within ZIP archives.
    Supports seek operations and efficient memory usage for large compressed files.
    """
    
    LOCAL_FILE_HEADER_SIZE: int = 30
    """Size of ZIP local file header"""
    
    NAME_LENGTH_OFFSET: int = 26
    """Offset to filename length in local file header"""
    
    def __init__(
        self, 
        file_obj: IO[bytes], 
        file_info: RemoteFileInsideArchive, 
        buffer_size: int = BUFFER_SIZE_DEFAULT
    ):
        """
        Initialize decompressed stream for a file inside ZIP archive.
        
        Args:
            file_obj: File-like object for the ZIP archive
            file_info: Metadata about the file inside the archive
            buffer_size: Buffer size for decompression operations
        """
    
    def read(self, size: int = -1) -> bytes:
        """
        Read decompressed data from the stream.
        
        Args:
            size: Number of bytes to read (-1 for all remaining data)
            
        Returns:
            Decompressed bytes data
        """
    
    def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
        """
        Seek to a specific position in the decompressed stream.
        
        Args:
            offset: Byte offset to seek to
            whence: Seek mode (SEEK_SET, SEEK_CUR, SEEK_END)
            
        Returns:
            New absolute position in the stream
        """
    
    def tell(self) -> int:
        """
        Get current position in the decompressed stream.
        
        Returns:
            Current byte position
        """
    
    def readable(self) -> bool:
        """
        Check if stream is readable.
        
        Returns:
            True if stream can be read from
        """
    
    def seekable(self) -> bool:
        """
        Check if stream supports seeking.
        
        Returns:
            True if stream supports seek operations
        """
    
    def close(self):
        """Close the decompressed stream and release resources."""
    
    def _calculate_actual_start(self, file_start: int) -> int:
        """
        Calculate actual start position accounting for local file header.
        
        Args:
            file_start: Start position from central directory
            
        Returns:
            Actual start position of compressed data
        """
    
    def _reset_decompressor(self):
        """Reset the decompression state for seeking operations."""
    
    def _decompress_chunk(self, chunk: bytes) -> bytes:
        """
        Decompress a chunk of data using the appropriate algorithm.
        
        Args:
            chunk: Compressed data chunk
            
        Returns:
            Decompressed data chunk
        """

ZIP Content Reader

High-level interface for reading content from files within ZIP archives, providing both text and binary reading capabilities.

class ZipContentReader:
    """
    High-level interface for reading content from ZIP archive files.
    Provides text and binary reading modes with encoding support.
    """
    
    def __init__(
        self, 
        decompressed_stream: DecompressedStream, 
        encoding: Optional[str] = None, 
        buffer_size: int = BUFFER_SIZE_DEFAULT
    ):
        """
        Initialize ZIP content reader.
        
        Args:
            decompressed_stream: DecompressedStream for the file
            encoding: Text encoding for string operations (None for binary mode)
            buffer_size: Buffer size for reading operations
        """
    
    def __iter__(self):
        """
        Iterator interface for reading lines from the file.
        
        Yields:
            Lines from the file (str if encoding specified, bytes otherwise)
        """
    
    def __next__(self) -> Union[str, bytes]:
        """
        Get next line from the file.
        
        Returns:
            Next line from file
            
        Raises:
            StopIteration: When end of file is reached
        """
    
    def __enter__(self) -> "ZipContentReader":
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Context manager exit."""
    
    def readline(self, limit: int = -1) -> Union[str, bytes]:
        """
        Read a single line from the file.
        
        Args:
            limit: Maximum number of characters/bytes to read
            
        Returns:
            Single line from file
        """
    
    def read(self, size: int = -1) -> Union[str, bytes]:
        """
        Read data from the file.
        
        Args:
            size: Number of characters/bytes to read (-1 for all)
            
        Returns:
            File content as string or bytes
        """
    
    def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
        """
        Seek to position in the file.
        
        Args:
            offset: Position to seek to
            whence: Seek mode
            
        Returns:
            New position in file
        """
    
    def close(self):
        """Close the content reader and release resources."""
    
    def tell(self) -> int:
        """
        Get current position in the file.
        
        Returns:
            Current position
        """
    
    @property
    def closed(self) -> bool:
        """
        Check if reader is closed.
        
        Returns:
            True if reader is closed
        """

Usage Examples

Basic ZIP File Processing

from source_s3.v4 import SourceS3StreamReader, Config
from source_s3.v4.zip_reader import ZipFileHandler, ZipContentReader, DecompressedStream

# Configure S3 connection
config = Config(
    bucket="my-data-bucket",
    aws_access_key_id="your-access-key",
    aws_secret_access_key="your-secret-key",
    region_name="us-east-1"
)

# Create stream reader and get S3 client
reader = SourceS3StreamReader()
reader.config = config
s3_client = reader.s3_client

# Initialize ZIP handler
zip_handler = ZipFileHandler(s3_client, config)

# Discover files in ZIP archive
zip_files, central_dir_offset = zip_handler.get_zip_files("data/archive.zip")

print(f"Found {len(zip_files)} files in archive:")
for zip_info in zip_files:
    print(f"  - {zip_info.filename} ({zip_info.file_size} bytes)")

Reading Individual Files from ZIP

import io
from source_s3.v4.zip_reader import RemoteFileInsideArchive, DecompressedStream, ZipContentReader

# Select a specific file from the ZIP
target_file = zip_files[0]  # First file in archive

# Create RemoteFileInsideArchive object
archive_file = RemoteFileInsideArchive(
    uri=f"s3://my-data-bucket/data/archive.zip/{target_file.filename}",
    start_offset=target_file.header_offset,
    compressed_size=target_file.compress_size,
    uncompressed_size=target_file.file_size,
    compression_method=target_file.compress_type,
    last_modified=None
)

# Open S3 object
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
s3_stream = s3_response['Body']

# Create decompressed stream
decompressed = DecompressedStream(s3_stream, archive_file)

# Read content with encoding (for text files)
with ZipContentReader(decompressed, encoding="utf-8") as reader:
    content = reader.read()
    print(f"File content ({len(content)} characters):")
    print(content[:500])  # First 500 characters

Processing CSV Files from ZIP

import csv
from io import StringIO

# Assuming we have a CSV file in the ZIP
csv_file = next(f for f in zip_files if f.filename.endswith('.csv'))

# Create archive file representation
csv_archive_file = RemoteFileInsideArchive(
    uri=f"s3://my-data-bucket/data/archive.zip/{csv_file.filename}",
    start_offset=csv_file.header_offset,
    compressed_size=csv_file.compress_size,
    uncompressed_size=csv_file.file_size,
    compression_method=csv_file.compress_type,
    last_modified=None
)

# Process CSV data
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
decompressed = DecompressedStream(s3_response['Body'], csv_archive_file)

with ZipContentReader(decompressed, encoding="utf-8") as reader:
    csv_content = reader.read()
    csv_reader = csv.DictReader(StringIO(csv_content))
    
    for row_num, row in enumerate(csv_reader):
        print(f"Row {row_num}: {row}")
        if row_num >= 5:  # Show first 5 rows
            break

Streaming Large Files from ZIP

# For large files, use streaming approach
large_file = max(zip_files, key=lambda f: f.file_size)

large_archive_file = RemoteFileInsideArchive(
    uri=f"s3://my-data-bucket/data/archive.zip/{large_file.filename}",
    start_offset=large_file.header_offset,
    compressed_size=large_file.compress_size,
    uncompressed_size=large_file.file_size,
    compression_method=large_file.compress_type,
    last_modified=None
)

# Stream content in chunks
s3_response = s3_client.get_object(Bucket="my-data-bucket", Key="data/archive.zip")
decompressed = DecompressedStream(s3_response['Body'], large_archive_file, buffer_size=64*1024)

with ZipContentReader(decompressed, encoding="utf-8") as reader:
    chunk_size = 1024 * 1024  # 1MB chunks
    total_size = 0
    
    while True:
        chunk = reader.read(chunk_size)
        if not chunk:
            break
            
        total_size += len(chunk)
        print(f"Processed {total_size} characters...")
        
        # Process chunk here
        # process_data_chunk(chunk)

Error Handling with ZIP Files

from botocore.exceptions import ClientError

try:
    # Attempt to process ZIP file
    zip_files, _ = zip_handler.get_zip_files("data/potentially-corrupt.zip")
    
    for zip_info in zip_files:
        try:
            archive_file = RemoteFileInsideArchive(
                uri=f"s3://my-data-bucket/data/potentially-corrupt.zip/{zip_info.filename}",
                start_offset=zip_info.header_offset,
                compressed_size=zip_info.compress_size,
                uncompressed_size=zip_info.file_size,
                compression_method=zip_info.compress_type,
                last_modified=None
            )
            
            s3_response = s3_client.get_object(
                Bucket="my-data-bucket", 
                Key="data/potentially-corrupt.zip"
            )
            decompressed = DecompressedStream(s3_response['Body'], archive_file)
            
            with ZipContentReader(decompressed, encoding="utf-8") as reader:
                content = reader.read()
                print(f"Successfully processed {zip_info.filename}")
                
        except Exception as e:
            print(f"Failed to process {zip_info.filename}: {e}")
            continue
            
except ValueError as e:
    print(f"Invalid ZIP file: {e}")
except ClientError as e:
    print(f"S3 access error: {e}")

Performance Considerations

Buffer Size Optimization

  • Small files: Use default buffer size (1MB) for optimal memory usage
  • Large files: Increase buffer size (4-16MB) for better I/O performance
  • Memory constraints: Decrease buffer size if memory is limited

Streaming vs. Full Read

  • Large files: Always use streaming approach to avoid memory issues
  • Small files: Full read can be more efficient for files under 10MB
  • Mixed sizes: Implement size-based strategy selection

S3 Transfer Optimization

  • Range requests: ZIP handler uses efficient byte-range requests
  • Connection reuse: S3 client connection pooling improves performance
  • Regional proximity: Use S3 buckets in same region as processing

Compression Method Support

  • Stored (method 0): No compression, fastest extraction
  • Deflated (method 8): Standard compression, good balance of speed/size
  • Other methods: Limited support, may require additional libraries

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-s3

docs

configuration.md

core-source.md

file-formats.md

index.md

stream-operations.md

utilities.md

zip-support.md

tile.json