or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdcore-source.mdfile-formats.mdindex.mdstream-operations.mdutilities.mdzip-support.md
tile.json

tessl/pypi-airbyte-source-s3

S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-source-s3@4.14.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-source-s3@4.14.0

index.mddocs/

Airbyte Source S3

A production-ready Airbyte connector that syncs data from Amazon S3 and S3-compatible storage services. Built on the Airbyte file-based connector framework, it supports multiple file formats, authentication methods, and incremental synchronization with comprehensive error handling and backward compatibility.

Package Information

  • Package Name: airbyte-source-s3
  • Package Type: pypi
  • Language: Python
  • Installation: pip install airbyte-source-s3
  • Version: 4.14.2

Core Imports

from source_s3.run import run
from source_s3.v4 import SourceS3, Config, SourceS3StreamReader, Cursor

Entry point usage:

from source_s3.run import run

# Launch the connector
run()

Basic Usage

from source_s3.v4 import SourceS3, Config

# Create configuration
config = Config(
    bucket="my-bucket",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    region_name="us-east-1"
)

# Create and launch source
source = SourceS3.create()
source.launch()

Architecture

The airbyte-source-s3 connector follows a modular architecture built on Airbyte's file-based connector framework with dual-version support:

Version Support

  • V4 (Current): File-based CDK implementation with Config, SourceS3StreamReader, and Cursor classes
  • V3 (Legacy/Deprecated): Legacy implementation with SourceS3Spec and IncrementalFileStreamS3, still supported for backward compatibility

Core Components

  • SourceS3: Main source class inheriting from FileBasedSource with V3-to-V4 config transformation
  • SourceS3StreamReader: Handles S3-specific file discovery, streaming, and ZIP file extraction
  • Config: V4 configuration specification with comprehensive S3 authentication support
  • Cursor: Manages incremental sync state with legacy state migration capabilities
  • LegacyConfigTransformer: Provides seamless V3-to-V4 configuration transformation
  • ZIP Support: Full ZIP file extraction with ZipFileHandler, DecompressedStream, and ZipContentReader

File Format Support

The connector supports multiple file formats with configurable options:

  • CSV: Delimiter, encoding, quote characters, escape characters
  • JSON Lines: Newline handling, unexpected field behavior, block size configuration
  • Parquet: Column selection, batch size, buffer size optimization
  • Avro: Native Avro format support

Authentication Methods

  • AWS Access Keys: Traditional access key ID and secret access key authentication
  • IAM Role Assumption: Role-based authentication with external ID support
  • Anonymous Access: Public bucket access without credentials
  • S3-Compatible Services: Custom endpoint support for MinIO, DigitalOcean Spaces, etc.

The connector integrates seamlessly with AWS S3 and S3-compatible services, providing comprehensive file processing capabilities including compressed archives, incremental synchronization, and robust error handling.

Capabilities

Core Source Operations

Main source functionality including connector launching, configuration reading, and specification generation. These functions provide the primary entry points for running the S3 connector.

def run() -> None: ...

class SourceS3(FileBasedSource):
    @classmethod
    def read_config(cls, config_path: str) -> Mapping[str, Any]: ...
    @classmethod
    def launch(cls, args: list[str] | None = None) -> None: ...
    @classmethod
    def create(cls, *, configured_catalog_path: Path | str | None = None) -> SourceS3: ...

Core Source Operations

Configuration Management

Configuration classes and specifications for both V4 and legacy V3 formats. Handles S3 authentication, bucket configuration, and file format specifications with full validation and schema generation.

class Config(AbstractFileBasedSpec):
    bucket: str
    aws_access_key_id: Optional[str]
    aws_secret_access_key: Optional[str]
    role_arn: Optional[str]
    endpoint: Optional[str]
    region_name: Optional[str]
    
    @classmethod
    def schema(cls, *args, **kwargs) -> Dict[str, Any]: ...

Configuration Management

Stream Operations

Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.

class SourceS3StreamReader(AbstractFileBasedStreamReader):
    @property
    def s3_client: BaseClient: ...
    
    def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]: ...
    def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase: ...

class Cursor(DefaultFileBasedCursor):
    def set_initial_state(self, value: StreamState) -> None: ...
    def get_state(self) -> StreamState: ...

Stream Operations

File Format Specifications

Configuration classes for supported file formats including CSV, JSON Lines, Parquet, and Avro formats. Each format provides specific configuration options for parsing and processing.

class CsvFormat:
    filetype: str = "csv"
    delimiter: str
    quote_char: str
    encoding: Optional[str]
    infer_datatypes: Optional[bool]

class JsonlFormat:
    filetype: str = "jsonl"
    newlines_in_values: bool
    unexpected_field_behavior: UnexpectedFieldBehaviorEnum
    block_size: int

class ParquetFormat:
    filetype: str = "parquet"
    columns: Optional[List[str]]
    batch_size: int
    buffer_size: int

class AvroFormat:
    filetype: str = "avro"

File Format Specifications

ZIP File Support

Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. Handles large ZIP files efficiently with streaming decompression and individual file access.

class ZipFileHandler:
    def __init__(self, s3_client: BaseClient, config: Config): ...
    def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]: ...

class DecompressedStream(io.IOBase):
    def __init__(self, file_obj: IO[bytes], file_info: RemoteFileInsideArchive, buffer_size: int = BUFFER_SIZE_DEFAULT): ...
    def read(self, size: int = -1) -> bytes: ...

class ZipContentReader:
    def __init__(self, decompressed_stream: DecompressedStream, encoding: Optional[str] = None, buffer_size: int = BUFFER_SIZE_DEFAULT): ...
    def read(self, size: int = -1) -> Union[str, bytes]: ...

class RemoteFileInsideArchive(RemoteFile):
    start_offset: int
    compressed_size: int
    uncompressed_size: int
    compression_method: int

ZIP File Support

Utilities and Error Handling

Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.

def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]: ...

def _get_s3_compatible_client_args(config: Config) -> dict: ...

class S3Exception(AirbyteTracedException):
    def __init__(self, file_info: Union[List[FileInfo], FileInfo], internal_message: Optional[str] = None, ...): ...

Utilities and Error Handling

Types

# Required imports
from typing import Optional, Union, List, Dict, Any, Callable, Tuple, Iterable, IO
from datetime import datetime, timedelta
from pathlib import Path
from io import IOBase
import zipfile
import multiprocessing as mp
from enum import Enum

# Configuration types
class DeliverRecords:
    """Configuration for delivering records as structured data"""
    delivery_type: str = "use_records_transfer"

class DeliverRawFiles:
    """Configuration for delivering raw files"""
    delivery_type: str = "use_raw_files"

# Stream types
class RemoteFile:
    """Represents a remote file with metadata"""
    uri: str
    last_modified: Optional[datetime]
    size: Optional[int]

class RemoteFileInsideArchive(RemoteFile):
    """Represents a file inside a ZIP archive"""
    start_offset: int
    compressed_size: int
    uncompressed_size: int
    compression_method: int

class FileInfo:
    """File information for error context"""
    key: str
    size: Optional[int]
    last_modified: Optional[datetime]

class StreamState:
    """Stream synchronization state"""
    pass

class FileBasedStreamConfig:
    """Configuration for file-based streams"""
    name: str
    globs: List[str]
    format: Union['CsvFormat', 'ParquetFormat', 'AvroFormat', 'JsonlFormat']
    input_schema: Optional[str]

# File format types
class UnexpectedFieldBehaviorEnum(str, Enum):
    ignore = "ignore"
    infer = "infer"
    error = "error"

class CsvFormat:
    filetype: str = "csv"
    delimiter: str
    quote_char: str
    encoding: Optional[str]
    infer_datatypes: Optional[bool]
    escape_char: Optional[str]
    double_quote: bool
    newlines_in_values: bool
    additional_reader_options: Optional[str]
    advanced_options: Optional[str]
    block_size: int

class JsonlFormat:
    filetype: str = "jsonl"
    newlines_in_values: bool
    unexpected_field_behavior: UnexpectedFieldBehaviorEnum
    block_size: int

class ParquetFormat:
    filetype: str = "parquet"
    columns: Optional[List[str]]
    batch_size: int
    buffer_size: int

class AvroFormat:
    filetype: str = "avro"

# Authentication types
class BaseClient:
    """S3 client interface from botocore"""
    def get_object(self, **kwargs) -> Dict[str, Any]: ...
    def list_objects_v2(self, **kwargs) -> Dict[str, Any]: ...
    def head_object(self, **kwargs) -> Dict[str, Any]: ...

# Airbyte types
class AirbyteMessage:
    """Airbyte message object for data synchronization"""
    type: str
    record: Optional['AirbyteRecordMessage']
    state: Optional['AirbyteStateMessage']
    log: Optional['AirbyteLogMessage']

class AirbyteRecordMessage:
    """Airbyte record message"""
    stream: str
    data: Dict[str, Any]
    emitted_at: int

class AirbyteRecordMessageFileReference:
    """Reference to a file in Airbyte record message"""
    pass

class FileRecordData:
    """File record data container"""
    pass

class ConnectorSpecification:
    """Airbyte connector specification"""
    documentationUrl: str
    connectionSpecification: Dict[str, Any]

# Error handling types
class FailureType:
    """Enumeration of failure types for error classification"""
    system_error: str = "system_error"
    config_error: str = "config_error"
    transient_error: str = "transient_error"

class AirbyteTracedException(Exception):
    """Base exception class for Airbyte connectors"""
    def __init__(self, internal_message: Optional[str] = None, message: Optional[str] = None, 
                 failure_type: FailureType = FailureType.system_error, exception: BaseException = None): ...

# File reading types
class FileReadMode:
    """File reading mode enumeration"""
    pass

Constants

from datetime import timedelta
from os import getenv

# File size limits
FILE_SIZE_LIMIT = 1_500_000_000  # 1.5GB maximum file size

# Default concurrency
DEFAULT_CONCURRENCY = 10

# Date formats
_DATE_FORMAT = "%Y-%m-%d"
_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

# Migration settings
_V4_MIGRATION_BUFFER = timedelta(hours=1)
_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"

# ZIP file handling constants
BUFFER_SIZE_DEFAULT = 1024 * 1024  # 1MB default buffer size
MAX_BUFFER_SIZE_DEFAULT = 16 * BUFFER_SIZE_DEFAULT  # 16MB max buffer
EOCD_SIGNATURE = b"\x50\x4b\x05\x06"  # ZIP End of Central Directory signature
ZIP64_LOCATOR_SIGNATURE = b"\x50\x4b\x06\x07"  # ZIP64 locator signature

# Legacy config transformation
SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

# AWS Environment
AWS_EXTERNAL_ID = getenv("AWS_ASSUME_ROLE_EXTERNAL_ID")