CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-s3

S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

configuration.mddocs/

Configuration Management

Configuration classes and specifications for both V4 and legacy V3 formats. Handles S3 authentication, bucket configuration, file format specifications with full validation and schema generation.

Capabilities

V4 Configuration Class

Primary configuration class for the V4 S3 connector with comprehensive validation and schema generation.

class Config(AbstractFileBasedSpec):
    """
    Configuration specification for S3 connector V4.
    Inherits from AbstractFileBasedSpec for file-based connector compatibility.
    """
    
    bucket: str
    """S3 bucket name to sync data from"""
    
    aws_access_key_id: Optional[str]
    """AWS access key ID for authentication (marked as secret)"""
    
    aws_secret_access_key: Optional[str]
    """AWS secret access key for authentication (marked as secret)"""
    
    role_arn: Optional[str]
    """AWS IAM role ARN for assume role authentication"""
    
    endpoint: Optional[str]
    """S3-compatible endpoint URL for non-AWS services"""
    
    region_name: Optional[str]
    """AWS region name where the bucket is located"""
    
    delivery_method: Union[DeliverRecords, DeliverRawFiles]
    """Delivery method configuration for how data should be processed"""
    
    @classmethod
    def documentation_url(cls) -> AnyUrl:
        """
        Returns the documentation URL for the connector.
        
        Returns:
            URL pointing to the connector documentation
        """
    
    @root_validator
    def validate_optional_args(cls, values):
        """
        Validates configuration fields and their relationships.
        Uses Pydantic root_validator decorator for comprehensive validation.
        
        Args:
            values: Configuration values to validate
            
        Returns:
            Validated configuration values
        """
    
    @classmethod
    def schema(cls, *args, **kwargs) -> Dict[str, Any]:
        """
        Generates the configuration schema for the connector.
        
        Returns:
            Dictionary representing the JSON schema for configuration
        """

Legacy Configuration Transformer

Handles transformation from legacy V3 configurations to V4 format for backward compatibility.

class LegacyConfigTransformer:
    """
    Transforms legacy V3 configurations to V4 format.
    Ensures backward compatibility for existing connector deployments.
    """
    
    @classmethod
    def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
        """
        Converts legacy V3 configuration to V4 format.
        
        Args:
            legacy_config: V3 configuration specification
            
        Returns:
            V4 format configuration dictionary
        """
    
    @classmethod
    def _create_globs(cls, path_pattern: str) -> List[str]:
        """
        Creates glob patterns from V3 path patterns.
        
        Args:
            path_pattern: V3 path pattern string
            
        Returns:
            List of glob patterns for V4 format
        """
    
    @classmethod
    def _transform_seconds_to_micros(cls, datetime_str: str) -> str:
        """
        Transforms datetime formats from seconds to microseconds precision.
        
        Args:
            datetime_str: Datetime string in V3 format
            
        Returns:
            Datetime string in V4 format
        """
    
    @classmethod
    def _transform_file_format(cls, format_options) -> Mapping[str, Any]:
        """
        Transforms file format configurations from V3 to V4.
        
        Args:
            format_options: V3 format options
            
        Returns:
            V4 format configuration
        """
    
    @classmethod
    def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Dict[str, Any]:
        """
        Parses JSON configuration strings from V3 format.
        
        Args:
            options_field: Name of the options field
            options_value: JSON string value or None
            
        Returns:
            Parsed configuration dictionary
        """
    
    @staticmethod
    def _filter_legacy_noops(advanced_options: Dict[str, Any]):
        """
        Filters out legacy no-operation options that are no longer needed.
        
        Args:
            advanced_options: Dictionary of advanced configuration options
        """

Legacy V3 Specification (SourceS3Spec)

Legacy V3 configuration specification for backward compatibility, still actively supported by the connector.

class SourceS3Spec(SourceFilesAbstractSpec, BaseModel):
    """
    Legacy V3 configuration specification for backward compatibility.
    Contains nested S3Provider class for provider-specific settings.
    """
    
    class S3Provider(BaseModel):
        """Provider-specific configuration fields for S3 access"""
        bucket: str
        """S3 bucket name"""
        
        aws_access_key_id: Optional[str]
        """AWS access key ID for authentication (marked as secret)"""
        
        aws_secret_access_key: Optional[str]
        """AWS secret access key for authentication (marked as secret)"""
        
        role_arn: Optional[str]
        """AWS IAM role ARN for assume role authentication"""
        
        path_prefix: str
        """S3 key prefix to filter files"""
        
        endpoint: str
        """S3-compatible endpoint URL for non-AWS services"""
        
        region_name: Optional[str]
        """AWS region name where the bucket is located"""
        
        start_date: Optional[str]
        """Start date for incremental sync (ISO format)"""
    
    provider: S3Provider
    """S3 provider configuration"""

class SourceFilesAbstractSpec(BaseModel):
    """
    Abstract specification for file-based sources.
    Provides common configuration fields and schema processing methods.
    """
    
    dataset: str
    """Output stream name (pattern: ^([A-Za-z0-9-_]+)$)"""
    
    path_pattern: str
    """File pattern regex for replication"""
    
    format: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat]  
    """File format specification"""
    
    user_schema: str
    """Manual schema enforcement (alias: "schema")"""
    
    @staticmethod
    def change_format_to_oneOf(schema: dict) -> dict:
        """
        Transforms schema format specifications to oneOf structure.
        
        Args:
            schema: JSON schema dictionary
            
        Returns:
            Transformed schema with oneOf format specifications
        """
    
    @staticmethod
    def remove_enum_allOf(schema: dict) -> dict:
        """
        Removes unsupported allOf structures from enum definitions.
        
        Args:
            schema: JSON schema dictionary
            
        Returns:
            Schema with allOf structures removed
        """
    
    @staticmethod
    def check_provider_added(schema: dict) -> None:
        """
        Validates that provider property is properly added to schema.
        
        Args:
            schema: JSON schema dictionary
            
        Raises:
            ValidationError: If provider property is missing or invalid
        """
    
    @staticmethod
    def resolve_refs(schema: dict) -> dict:
        """
        Resolves JSON schema references within the schema.
        
        Args:
            schema: JSON schema dictionary with references
            
        Returns:
            Schema with resolved references
        """
    
    @classmethod  
    def schema(cls, *args, **kwargs) -> Dict[str, Any]:
        """
        Generates schema with post-processing transformations.
        
        Returns:
            Processed JSON schema dictionary
        """

Usage Examples

Basic V4 Configuration

from source_s3.v4 import Config

# Create configuration with AWS credentials
config = Config(
    bucket="my-data-bucket",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE", 
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    region_name="us-east-1",
    delivery_method=DeliverRecords()
)

IAM Role Configuration

from source_s3.v4 import Config

# Configure with IAM role assumption
config = Config(
    bucket="secure-bucket",
    role_arn="arn:aws:iam::123456789012:role/S3AccessRole",
    region_name="us-west-2",
    delivery_method=DeliverRawFiles()
)

S3-Compatible Service Configuration

from source_s3.v4 import Config

# Configure for MinIO or other S3-compatible service
config = Config(
    bucket="minio-bucket",
    endpoint="https://minio.example.com",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    delivery_method=DeliverRecords()
)

Legacy Configuration Transformation

from source_s3.v4 import LegacyConfigTransformer
from source_s3.source import SourceS3Spec

# Transform V3 config to V4
legacy_spec = SourceS3Spec(...)  # V3 configuration
v4_config = LegacyConfigTransformer.convert(legacy_spec)

Schema Generation

from source_s3.v4 import Config

# Generate configuration schema
schema = Config.schema()
print(schema)  # JSON schema for the configuration

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-s3

docs

configuration.md

core-source.md

file-formats.md

index.md

stream-operations.md

utilities.md

zip-support.md

tile.json