S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
npx @tessl/cli install tessl/pypi-airbyte-source-s3@4.14.0A production-ready Airbyte connector that syncs data from Amazon S3 and S3-compatible storage services. Built on the Airbyte file-based connector framework, it supports multiple file formats, authentication methods, and incremental synchronization with comprehensive error handling and backward compatibility.
pip install airbyte-source-s3from source_s3.run import run
from source_s3.v4 import SourceS3, Config, SourceS3StreamReader, CursorEntry point usage:
from source_s3.run import run
# Launch the connector
run()from source_s3.v4 import SourceS3, Config
# Create configuration
config = Config(
bucket="my-bucket",
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region_name="us-east-1"
)
# Create and launch source
source = SourceS3.create()
source.launch()The airbyte-source-s3 connector follows a modular architecture built on Airbyte's file-based connector framework with dual-version support:
Config, SourceS3StreamReader, and Cursor classesSourceS3Spec and IncrementalFileStreamS3, still supported for backward compatibilityZipFileHandler, DecompressedStream, and ZipContentReaderThe connector supports multiple file formats with configurable options:
The connector integrates seamlessly with AWS S3 and S3-compatible services, providing comprehensive file processing capabilities including compressed archives, incremental synchronization, and robust error handling.
Main source functionality including connector launching, configuration reading, and specification generation. These functions provide the primary entry points for running the S3 connector.
def run() -> None: ...
class SourceS3(FileBasedSource):
@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]: ...
@classmethod
def launch(cls, args: list[str] | None = None) -> None: ...
@classmethod
def create(cls, *, configured_catalog_path: Path | str | None = None) -> SourceS3: ...Configuration classes and specifications for both V4 and legacy V3 formats. Handles S3 authentication, bucket configuration, and file format specifications with full validation and schema generation.
class Config(AbstractFileBasedSpec):
bucket: str
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
role_arn: Optional[str]
endpoint: Optional[str]
region_name: Optional[str]
@classmethod
def schema(cls, *args, **kwargs) -> Dict[str, Any]: ...Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.
class SourceS3StreamReader(AbstractFileBasedStreamReader):
@property
def s3_client: BaseClient: ...
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]: ...
def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase: ...
class Cursor(DefaultFileBasedCursor):
def set_initial_state(self, value: StreamState) -> None: ...
def get_state(self) -> StreamState: ...Configuration classes for supported file formats including CSV, JSON Lines, Parquet, and Avro formats. Each format provides specific configuration options for parsing and processing.
class CsvFormat:
filetype: str = "csv"
delimiter: str
quote_char: str
encoding: Optional[str]
infer_datatypes: Optional[bool]
class JsonlFormat:
filetype: str = "jsonl"
newlines_in_values: bool
unexpected_field_behavior: UnexpectedFieldBehaviorEnum
block_size: int
class ParquetFormat:
filetype: str = "parquet"
columns: Optional[List[str]]
batch_size: int
buffer_size: int
class AvroFormat:
filetype: str = "avro"Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. Handles large ZIP files efficiently with streaming decompression and individual file access.
class ZipFileHandler:
def __init__(self, s3_client: BaseClient, config: Config): ...
def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]: ...
class DecompressedStream(io.IOBase):
def __init__(self, file_obj: IO[bytes], file_info: RemoteFileInsideArchive, buffer_size: int = BUFFER_SIZE_DEFAULT): ...
def read(self, size: int = -1) -> bytes: ...
class ZipContentReader:
def __init__(self, decompressed_stream: DecompressedStream, encoding: Optional[str] = None, buffer_size: int = BUFFER_SIZE_DEFAULT): ...
def read(self, size: int = -1) -> Union[str, bytes]: ...
class RemoteFileInsideArchive(RemoteFile):
start_offset: int
compressed_size: int
uncompressed_size: int
compression_method: intUtility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.
def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]: ...
def _get_s3_compatible_client_args(config: Config) -> dict: ...
class S3Exception(AirbyteTracedException):
def __init__(self, file_info: Union[List[FileInfo], FileInfo], internal_message: Optional[str] = None, ...): ...# Required imports
from typing import Optional, Union, List, Dict, Any, Callable, Tuple, Iterable, IO
from datetime import datetime, timedelta
from pathlib import Path
from io import IOBase
import zipfile
import multiprocessing as mp
from enum import Enum
# Configuration types
class DeliverRecords:
"""Configuration for delivering records as structured data"""
delivery_type: str = "use_records_transfer"
class DeliverRawFiles:
"""Configuration for delivering raw files"""
delivery_type: str = "use_raw_files"
# Stream types
class RemoteFile:
"""Represents a remote file with metadata"""
uri: str
last_modified: Optional[datetime]
size: Optional[int]
class RemoteFileInsideArchive(RemoteFile):
"""Represents a file inside a ZIP archive"""
start_offset: int
compressed_size: int
uncompressed_size: int
compression_method: int
class FileInfo:
"""File information for error context"""
key: str
size: Optional[int]
last_modified: Optional[datetime]
class StreamState:
"""Stream synchronization state"""
pass
class FileBasedStreamConfig:
"""Configuration for file-based streams"""
name: str
globs: List[str]
format: Union['CsvFormat', 'ParquetFormat', 'AvroFormat', 'JsonlFormat']
input_schema: Optional[str]
# File format types
class UnexpectedFieldBehaviorEnum(str, Enum):
ignore = "ignore"
infer = "infer"
error = "error"
class CsvFormat:
filetype: str = "csv"
delimiter: str
quote_char: str
encoding: Optional[str]
infer_datatypes: Optional[bool]
escape_char: Optional[str]
double_quote: bool
newlines_in_values: bool
additional_reader_options: Optional[str]
advanced_options: Optional[str]
block_size: int
class JsonlFormat:
filetype: str = "jsonl"
newlines_in_values: bool
unexpected_field_behavior: UnexpectedFieldBehaviorEnum
block_size: int
class ParquetFormat:
filetype: str = "parquet"
columns: Optional[List[str]]
batch_size: int
buffer_size: int
class AvroFormat:
filetype: str = "avro"
# Authentication types
class BaseClient:
"""S3 client interface from botocore"""
def get_object(self, **kwargs) -> Dict[str, Any]: ...
def list_objects_v2(self, **kwargs) -> Dict[str, Any]: ...
def head_object(self, **kwargs) -> Dict[str, Any]: ...
# Airbyte types
class AirbyteMessage:
"""Airbyte message object for data synchronization"""
type: str
record: Optional['AirbyteRecordMessage']
state: Optional['AirbyteStateMessage']
log: Optional['AirbyteLogMessage']
class AirbyteRecordMessage:
"""Airbyte record message"""
stream: str
data: Dict[str, Any]
emitted_at: int
class AirbyteRecordMessageFileReference:
"""Reference to a file in Airbyte record message"""
pass
class FileRecordData:
"""File record data container"""
pass
class ConnectorSpecification:
"""Airbyte connector specification"""
documentationUrl: str
connectionSpecification: Dict[str, Any]
# Error handling types
class FailureType:
"""Enumeration of failure types for error classification"""
system_error: str = "system_error"
config_error: str = "config_error"
transient_error: str = "transient_error"
class AirbyteTracedException(Exception):
"""Base exception class for Airbyte connectors"""
def __init__(self, internal_message: Optional[str] = None, message: Optional[str] = None,
failure_type: FailureType = FailureType.system_error, exception: BaseException = None): ...
# File reading types
class FileReadMode:
"""File reading mode enumeration"""
passfrom datetime import timedelta
from os import getenv
# File size limits
FILE_SIZE_LIMIT = 1_500_000_000 # 1.5GB maximum file size
# Default concurrency
DEFAULT_CONCURRENCY = 10
# Date formats
_DATE_FORMAT = "%Y-%m-%d"
_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# Migration settings
_V4_MIGRATION_BUFFER = timedelta(hours=1)
_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"
# ZIP file handling constants
BUFFER_SIZE_DEFAULT = 1024 * 1024 # 1MB default buffer size
MAX_BUFFER_SIZE_DEFAULT = 16 * BUFFER_SIZE_DEFAULT # 16MB max buffer
EOCD_SIGNATURE = b"\x50\x4b\x05\x06" # ZIP End of Central Directory signature
ZIP64_LOCATOR_SIGNATURE = b"\x50\x4b\x06\x07" # ZIP64 locator signature
# Legacy config transformation
SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
# AWS Environment
AWS_EXTERNAL_ID = getenv("AWS_ASSUME_ROLE_EXTERNAL_ID")