S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.
S3-specific stream reader that handles file discovery, reading, and upload operations with comprehensive S3 integration.
class SourceS3StreamReader(AbstractFileBasedStreamReader):
"""
Handles S3 file reading and streaming operations.
Inherits from AbstractFileBasedStreamReader for file-based connector compatibility.
"""
FILE_SIZE_LIMIT = 1_500_000_000
"""Maximum file size limit (1.5GB)"""
@property
def config(self) -> Config:
"""
Configuration getter/setter for the stream reader.
Returns:
Current Config instance
"""
@property
def s3_client(self) -> BaseClient:
"""
S3 client property with lazy loading.
Creates and caches S3 client based on configuration.
Returns:
Configured S3 client instance
"""
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]:
"""
Finds S3 files matching the specified glob patterns.
Args:
globs: List of glob patterns to match files
prefix: Optional path prefix to filter files
logger: Logger instance for operation logging
Returns:
Iterable of RemoteFile objects matching the patterns
"""
def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase:
"""
Opens an S3 file for reading with specified mode and encoding.
Args:
file: RemoteFile object representing the S3 file
mode: File reading mode (text, binary, etc.)
encoding: Optional text encoding for file reading
logger: Logger instance for operation logging
Returns:
File-like object for reading the file content
"""
def upload(self, file: RemoteFile, local_directory: str, logger) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
"""
Downloads S3 file to local directory for processing.
Args:
file: RemoteFile object to download
local_directory: Local directory path for file storage
logger: Logger instance for operation logging
Returns:
Tuple of file record data and message reference
"""
def file_size(self, file: RemoteFile) -> int:
"""
Gets the size of an S3 file in bytes.
Args:
file: RemoteFile object
Returns:
File size in bytes
"""
def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool:
"""
Checks if file was modified after the configured start date.
Args:
last_modified_date: File's last modification timestamp
Returns:
True if file should be included based on modification date
"""
def _get_iam_s3_client(self, client_kv_args: dict) -> BaseClient:
"""
Creates S3 client with IAM role assumption.
Args:
client_kv_args: Client configuration arguments
Returns:
Configured S3 client with IAM authentication
"""
def _construct_s3_uri(self, file: RemoteFile) -> str:
"""
Constructs S3 URI for the given file.
Args:
file: RemoteFile object
Returns:
S3 URI string (s3://bucket/key)
"""
def _page(self, s3, globs, bucket, prefix, seen, logger) -> Iterable[RemoteFile]:
"""
Paginates through S3 objects matching the criteria.
Args:
s3: S3 client instance
globs: Glob patterns for file matching
bucket: S3 bucket name
prefix: Optional key prefix
seen: Set of already processed files
logger: Logger instance
Returns:
Iterable of matching RemoteFile objects
"""
def _handle_file(self, file):
"""
Handles file processing for both regular and ZIP files.
Args:
file: File object to process
"""
def _handle_zip_file(self, file):
"""
Handles ZIP file extraction and processing.
Args:
file: ZIP file object to process
"""
def _handle_regular_file(self, file):
"""
Handles regular file processing.
Args:
file: Regular file object to process
"""
@staticmethod
def create_progress_handler(file_size: int, local_file_path: str, logger) -> Callable:
"""
Creates a progress handler for file download operations.
Args:
file_size: Total file size in bytes
local_file_path: Local path where file is being saved
logger: Logger instance for progress reporting
Returns:
Callable progress handler function
"""
@staticmethod
def _is_folder(file) -> bool:
"""
Checks if S3 object represents a folder.
Args:
file: S3 object to check
Returns:
True if object is a folder, False otherwise
"""Manages incremental synchronization state and file tracking with support for legacy state migration.
class Cursor(DefaultFileBasedCursor):
"""
Manages incremental sync state and file tracking.
Inherits from DefaultFileBasedCursor with S3-specific enhancements.
"""
_DATE_FORMAT = "%Y-%m-%d"
"""Date format string for cursor timestamps"""
_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
"""Legacy datetime format for V3 compatibility"""
_V4_MIGRATION_BUFFER = timedelta(hours=1)
"""Buffer time for V3 to V4 migration"""
_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"
"""Field name for V3 minimum sync date"""
def __init__(self, stream_config: FileBasedStreamConfig, **_):
"""
Initialize cursor with stream configuration.
Args:
stream_config: Configuration for the file-based stream
"""
def set_initial_state(self, value: StreamState) -> None:
"""
Sets the initial cursor state, handling legacy format conversion.
Args:
value: Stream state to set as initial state
"""
def get_state(self) -> StreamState:
"""
Gets the current cursor state for incremental synchronization.
Returns:
Current stream state for persistence
"""
def _should_sync_file(self, file: RemoteFile, logger) -> bool:
"""
Determines if a file should be synchronized based on cursor state.
Args:
file: RemoteFile to evaluate
logger: Logger instance for decision logging
Returns:
True if file should be synced, False otherwise
"""
@staticmethod
def _is_legacy_state(value: StreamState) -> bool:
"""
Checks if the provided state is in legacy V3 format.
Args:
value: Stream state to check
Returns:
True if state is legacy format, False otherwise
"""
@staticmethod
def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]:
"""
Converts legacy V3 state to V4 format.
Args:
legacy_state: V3 format stream state
Returns:
V4 format state dictionary
"""
@staticmethod
def _get_adjusted_date_timestamp(cursor_datetime: datetime, file_datetime: datetime) -> datetime:
"""
Adjusts timestamps for proper cursor comparison.
Args:
cursor_datetime: Current cursor timestamp
file_datetime: File modification timestamp
Returns:
Adjusted datetime for comparison
"""S3-specific stream implementation for backward compatibility with V3 configurations.
class IncrementalFileStreamS3(IncrementalFileStream):
"""
S3-specific incremental file stream implementation.
Provides compatibility with legacy V3 stream operations.
"""
@property
def storagefile_class(self) -> type:
"""
Returns the S3File class for file handling.
Returns:
S3File class type
"""
def filepath_iterator(self, stream_state=None) -> Iterator[FileInfo]:
"""
Iterates over S3 file paths for stream processing.
Args:
stream_state: Optional stream state for incremental sync
Yields:
FileInfo objects for each file to process
"""
def _filter_by_last_modified_date(self, file=None, stream_state=None):
"""
Filters files based on last modification date and stream state.
Args:
file: File object to filter
stream_state: Current stream state for comparison
"""
@staticmethod
def is_not_folder(file) -> bool:
"""
Checks if S3 object is not a folder.
Args:
file: S3 object to check
Returns:
True if object is not a folder, False otherwise
"""Utility functions for S3 client configuration and file handling.
def _get_s3_compatible_client_args(config: Config) -> dict:
"""
Returns configuration for S3-compatible client creation.
Args:
config: Configuration object with S3 settings
Returns:
Dictionary with client configuration parameters
"""from source_s3.v4 import SourceS3StreamReader, Config
# Create configuration
config = Config(
bucket="my-bucket",
region_name="us-east-1"
)
# Create stream reader
reader = SourceS3StreamReader(config=config)
# Find matching files
files = reader.get_matching_files(
globs=["*.csv", "*.json"],
prefix="data/",
logger=logger
)
# Process files
for file in files:
with reader.open_file(file, mode="text", encoding="utf-8", logger=logger) as f:
content = f.read()
# Process file contentfrom source_s3.v4 import Cursor
from airbyte_cdk.sources.file_based.config import FileBasedStreamConfig
# Create stream configuration
stream_config = FileBasedStreamConfig(...)
# Initialize cursor
cursor = Cursor(stream_config)
# Set initial state (handles legacy format)
cursor.set_initial_state(previous_state)
# Check if file should be synced
should_sync = cursor._should_sync_file(file, logger)
# Get current state for persistence
current_state = cursor.get_state()from source_s3.v4 import SourceS3StreamReader
# Upload (download) file locally
reader = SourceS3StreamReader(config=config)
file_data, file_ref = reader.upload(
file=remote_file,
local_directory="/tmp/airbyte_local",
logger=logger
)
# Check file size
size = reader.file_size(remote_file)
if size > SourceS3StreamReader.FILE_SIZE_LIMIT:
# Handle large file
passfrom source_s3.stream import IncrementalFileStreamS3
# Create legacy stream
stream = IncrementalFileStreamS3(...)
# Iterate over files
for file_info in stream.filepath_iterator(stream_state=state):
if IncrementalFileStreamS3.is_not_folder(file_info.file):
# Process file
passInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-s3