CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-s3

S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

stream-operations.mddocs/

Stream Operations

Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.

Capabilities

Stream Reader

S3-specific stream reader that handles file discovery, reading, and upload operations with comprehensive S3 integration.

class SourceS3StreamReader(AbstractFileBasedStreamReader):
    """
    Handles S3 file reading and streaming operations.
    Inherits from AbstractFileBasedStreamReader for file-based connector compatibility.
    """
    
    FILE_SIZE_LIMIT = 1_500_000_000
    """Maximum file size limit (1.5GB)"""
    
    @property
    def config(self) -> Config:
        """
        Configuration getter/setter for the stream reader.
        
        Returns:
            Current Config instance
        """
    
    @property 
    def s3_client(self) -> BaseClient:
        """
        S3 client property with lazy loading.
        Creates and caches S3 client based on configuration.
        
        Returns:
            Configured S3 client instance
        """
    
    def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]:
        """
        Finds S3 files matching the specified glob patterns.
        
        Args:
            globs: List of glob patterns to match files
            prefix: Optional path prefix to filter files
            logger: Logger instance for operation logging
            
        Returns:
            Iterable of RemoteFile objects matching the patterns
        """
    
    def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase:
        """
        Opens an S3 file for reading with specified mode and encoding.
        
        Args:
            file: RemoteFile object representing the S3 file
            mode: File reading mode (text, binary, etc.)
            encoding: Optional text encoding for file reading
            logger: Logger instance for operation logging
            
        Returns:
            File-like object for reading the file content
        """
    
    def upload(self, file: RemoteFile, local_directory: str, logger) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
        """
        Downloads S3 file to local directory for processing.
        
        Args:
            file: RemoteFile object to download
            local_directory: Local directory path for file storage
            logger: Logger instance for operation logging
            
        Returns:
            Tuple of file record data and message reference
        """
    
    def file_size(self, file: RemoteFile) -> int:
        """
        Gets the size of an S3 file in bytes.
        
        Args:
            file: RemoteFile object
            
        Returns:
            File size in bytes
        """
    
    def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool:
        """
        Checks if file was modified after the configured start date.
        
        Args:
            last_modified_date: File's last modification timestamp
            
        Returns:
            True if file should be included based on modification date
        """
    
    def _get_iam_s3_client(self, client_kv_args: dict) -> BaseClient:
        """
        Creates S3 client with IAM role assumption.
        
        Args:
            client_kv_args: Client configuration arguments
            
        Returns:
            Configured S3 client with IAM authentication
        """
    
    def _construct_s3_uri(self, file: RemoteFile) -> str:
        """
        Constructs S3 URI for the given file.
        
        Args:
            file: RemoteFile object
            
        Returns:
            S3 URI string (s3://bucket/key)
        """
    
    def _page(self, s3, globs, bucket, prefix, seen, logger) -> Iterable[RemoteFile]:
        """
        Paginates through S3 objects matching the criteria.
        
        Args:
            s3: S3 client instance
            globs: Glob patterns for file matching
            bucket: S3 bucket name
            prefix: Optional key prefix
            seen: Set of already processed files
            logger: Logger instance
            
        Returns:
            Iterable of matching RemoteFile objects
        """
    
    def _handle_file(self, file):
        """
        Handles file processing for both regular and ZIP files.
        
        Args:
            file: File object to process
        """
    
    def _handle_zip_file(self, file):
        """
        Handles ZIP file extraction and processing.
        
        Args:
            file: ZIP file object to process
        """
    
    def _handle_regular_file(self, file):
        """
        Handles regular file processing.
        
        Args:
            file: Regular file object to process  
        """
    
    @staticmethod
    def create_progress_handler(file_size: int, local_file_path: str, logger) -> Callable:
        """
        Creates a progress handler for file download operations.
        
        Args:
            file_size: Total file size in bytes
            local_file_path: Local path where file is being saved
            logger: Logger instance for progress reporting
            
        Returns:
            Callable progress handler function
        """
    
    @staticmethod
    def _is_folder(file) -> bool:
        """
        Checks if S3 object represents a folder.
        
        Args:
            file: S3 object to check
            
        Returns:
            True if object is a folder, False otherwise
        """

Cursor Management

Manages incremental synchronization state and file tracking with support for legacy state migration.

class Cursor(DefaultFileBasedCursor):
    """
    Manages incremental sync state and file tracking.
    Inherits from DefaultFileBasedCursor with S3-specific enhancements.
    """
    
    _DATE_FORMAT = "%Y-%m-%d"
    """Date format string for cursor timestamps"""
    
    _LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
    """Legacy datetime format for V3 compatibility"""
    
    _V4_MIGRATION_BUFFER = timedelta(hours=1)
    """Buffer time for V3 to V4 migration"""
    
    _V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"
    """Field name for V3 minimum sync date"""
    
    def __init__(self, stream_config: FileBasedStreamConfig, **_):
        """
        Initialize cursor with stream configuration.
        
        Args:
            stream_config: Configuration for the file-based stream
        """
    
    def set_initial_state(self, value: StreamState) -> None:
        """
        Sets the initial cursor state, handling legacy format conversion.
        
        Args:
            value: Stream state to set as initial state
        """
    
    def get_state(self) -> StreamState:
        """
        Gets the current cursor state for incremental synchronization.
        
        Returns:
            Current stream state for persistence
        """
    
    def _should_sync_file(self, file: RemoteFile, logger) -> bool:
        """
        Determines if a file should be synchronized based on cursor state.
        
        Args:
            file: RemoteFile to evaluate
            logger: Logger instance for decision logging
            
        Returns:
            True if file should be synced, False otherwise
        """
    
    @staticmethod
    def _is_legacy_state(value: StreamState) -> bool:
        """
        Checks if the provided state is in legacy V3 format.
        
        Args:
            value: Stream state to check
            
        Returns:
            True if state is legacy format, False otherwise
        """
    
    @staticmethod
    def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]:
        """
        Converts legacy V3 state to V4 format.
        
        Args:
            legacy_state: V3 format stream state
            
        Returns:
            V4 format state dictionary
        """
    
    @staticmethod
    def _get_adjusted_date_timestamp(cursor_datetime: datetime, file_datetime: datetime) -> datetime:
        """
        Adjusts timestamps for proper cursor comparison.
        
        Args:
            cursor_datetime: Current cursor timestamp
            file_datetime: File modification timestamp
            
        Returns:
            Adjusted datetime for comparison
        """

Legacy Stream Implementation

S3-specific stream implementation for backward compatibility with V3 configurations.

class IncrementalFileStreamS3(IncrementalFileStream):
    """
    S3-specific incremental file stream implementation.
    Provides compatibility with legacy V3 stream operations.
    """
    
    @property
    def storagefile_class(self) -> type:
        """
        Returns the S3File class for file handling.
        
        Returns:
            S3File class type
        """
    
    def filepath_iterator(self, stream_state=None) -> Iterator[FileInfo]:
        """
        Iterates over S3 file paths for stream processing.
        
        Args:
            stream_state: Optional stream state for incremental sync
            
        Yields:
            FileInfo objects for each file to process
        """
    
    def _filter_by_last_modified_date(self, file=None, stream_state=None):
        """
        Filters files based on last modification date and stream state.
        
        Args:
            file: File object to filter
            stream_state: Current stream state for comparison
        """
    
    @staticmethod
    def is_not_folder(file) -> bool:
        """
        Checks if S3 object is not a folder.
        
        Args:
            file: S3 object to check
            
        Returns:
            True if object is not a folder, False otherwise
        """

Helper Functions

Utility functions for S3 client configuration and file handling.

def _get_s3_compatible_client_args(config: Config) -> dict:
    """
    Returns configuration for S3-compatible client creation.
    
    Args:
        config: Configuration object with S3 settings
        
    Returns:
        Dictionary with client configuration parameters
    """

Usage Examples

Basic Stream Reader Usage

from source_s3.v4 import SourceS3StreamReader, Config

# Create configuration
config = Config(
    bucket="my-bucket",
    region_name="us-east-1"
)

# Create stream reader
reader = SourceS3StreamReader(config=config)

# Find matching files
files = reader.get_matching_files(
    globs=["*.csv", "*.json"], 
    prefix="data/",
    logger=logger
)

# Process files
for file in files:
    with reader.open_file(file, mode="text", encoding="utf-8", logger=logger) as f:
        content = f.read()
        # Process file content

Cursor State Management

from source_s3.v4 import Cursor
from airbyte_cdk.sources.file_based.config import FileBasedStreamConfig

# Create stream configuration
stream_config = FileBasedStreamConfig(...)

# Initialize cursor
cursor = Cursor(stream_config)

# Set initial state (handles legacy format)
cursor.set_initial_state(previous_state)

# Check if file should be synced
should_sync = cursor._should_sync_file(file, logger)

# Get current state for persistence
current_state = cursor.get_state()

File Upload and Processing

from source_s3.v4 import SourceS3StreamReader

# Upload (download) file locally
reader = SourceS3StreamReader(config=config)
file_data, file_ref = reader.upload(
    file=remote_file,
    local_directory="/tmp/airbyte_local",
    logger=logger
)

# Check file size
size = reader.file_size(remote_file)
if size > SourceS3StreamReader.FILE_SIZE_LIMIT:
    # Handle large file
    pass

Legacy Stream Operations

from source_s3.stream import IncrementalFileStreamS3

# Create legacy stream
stream = IncrementalFileStreamS3(...)

# Iterate over files
for file_info in stream.filepath_iterator(stream_state=state):
    if IncrementalFileStreamS3.is_not_folder(file_info.file):
        # Process file
        pass

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-s3

docs

configuration.md

core-source.md

file-formats.md

index.md

stream-operations.md

utilities.md

zip-support.md

tile.json