S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Configuration classes and specifications for both V4 and legacy V3 formats. Handles S3 authentication, bucket configuration, file format specifications with full validation and schema generation.
Primary configuration class for the V4 S3 connector with comprehensive validation and schema generation.
class Config(AbstractFileBasedSpec):
"""
Configuration specification for S3 connector V4.
Inherits from AbstractFileBasedSpec for file-based connector compatibility.
"""
bucket: str
"""S3 bucket name to sync data from"""
aws_access_key_id: Optional[str]
"""AWS access key ID for authentication (marked as secret)"""
aws_secret_access_key: Optional[str]
"""AWS secret access key for authentication (marked as secret)"""
role_arn: Optional[str]
"""AWS IAM role ARN for assume role authentication"""
endpoint: Optional[str]
"""S3-compatible endpoint URL for non-AWS services"""
region_name: Optional[str]
"""AWS region name where the bucket is located"""
delivery_method: Union[DeliverRecords, DeliverRawFiles]
"""Delivery method configuration for how data should be processed"""
@classmethod
def documentation_url(cls) -> AnyUrl:
"""
Returns the documentation URL for the connector.
Returns:
URL pointing to the connector documentation
"""
@root_validator
def validate_optional_args(cls, values):
"""
Validates configuration fields and their relationships.
Uses Pydantic root_validator decorator for comprehensive validation.
Args:
values: Configuration values to validate
Returns:
Validated configuration values
"""
@classmethod
def schema(cls, *args, **kwargs) -> Dict[str, Any]:
"""
Generates the configuration schema for the connector.
Returns:
Dictionary representing the JSON schema for configuration
"""Handles transformation from legacy V3 configurations to V4 format for backward compatibility.
class LegacyConfigTransformer:
"""
Transforms legacy V3 configurations to V4 format.
Ensures backward compatibility for existing connector deployments.
"""
@classmethod
def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
"""
Converts legacy V3 configuration to V4 format.
Args:
legacy_config: V3 configuration specification
Returns:
V4 format configuration dictionary
"""
@classmethod
def _create_globs(cls, path_pattern: str) -> List[str]:
"""
Creates glob patterns from V3 path patterns.
Args:
path_pattern: V3 path pattern string
Returns:
List of glob patterns for V4 format
"""
@classmethod
def _transform_seconds_to_micros(cls, datetime_str: str) -> str:
"""
Transforms datetime formats from seconds to microseconds precision.
Args:
datetime_str: Datetime string in V3 format
Returns:
Datetime string in V4 format
"""
@classmethod
def _transform_file_format(cls, format_options) -> Mapping[str, Any]:
"""
Transforms file format configurations from V3 to V4.
Args:
format_options: V3 format options
Returns:
V4 format configuration
"""
@classmethod
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Dict[str, Any]:
"""
Parses JSON configuration strings from V3 format.
Args:
options_field: Name of the options field
options_value: JSON string value or None
Returns:
Parsed configuration dictionary
"""
@staticmethod
def _filter_legacy_noops(advanced_options: Dict[str, Any]):
"""
Filters out legacy no-operation options that are no longer needed.
Args:
advanced_options: Dictionary of advanced configuration options
"""Legacy V3 configuration specification for backward compatibility, still actively supported by the connector.
class SourceS3Spec(SourceFilesAbstractSpec, BaseModel):
"""
Legacy V3 configuration specification for backward compatibility.
Contains nested S3Provider class for provider-specific settings.
"""
class S3Provider(BaseModel):
"""Provider-specific configuration fields for S3 access"""
bucket: str
"""S3 bucket name"""
aws_access_key_id: Optional[str]
"""AWS access key ID for authentication (marked as secret)"""
aws_secret_access_key: Optional[str]
"""AWS secret access key for authentication (marked as secret)"""
role_arn: Optional[str]
"""AWS IAM role ARN for assume role authentication"""
path_prefix: str
"""S3 key prefix to filter files"""
endpoint: str
"""S3-compatible endpoint URL for non-AWS services"""
region_name: Optional[str]
"""AWS region name where the bucket is located"""
start_date: Optional[str]
"""Start date for incremental sync (ISO format)"""
provider: S3Provider
"""S3 provider configuration"""
class SourceFilesAbstractSpec(BaseModel):
"""
Abstract specification for file-based sources.
Provides common configuration fields and schema processing methods.
"""
dataset: str
"""Output stream name (pattern: ^([A-Za-z0-9-_]+)$)"""
path_pattern: str
"""File pattern regex for replication"""
format: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat]
"""File format specification"""
user_schema: str
"""Manual schema enforcement (alias: "schema")"""
@staticmethod
def change_format_to_oneOf(schema: dict) -> dict:
"""
Transforms schema format specifications to oneOf structure.
Args:
schema: JSON schema dictionary
Returns:
Transformed schema with oneOf format specifications
"""
@staticmethod
def remove_enum_allOf(schema: dict) -> dict:
"""
Removes unsupported allOf structures from enum definitions.
Args:
schema: JSON schema dictionary
Returns:
Schema with allOf structures removed
"""
@staticmethod
def check_provider_added(schema: dict) -> None:
"""
Validates that provider property is properly added to schema.
Args:
schema: JSON schema dictionary
Raises:
ValidationError: If provider property is missing or invalid
"""
@staticmethod
def resolve_refs(schema: dict) -> dict:
"""
Resolves JSON schema references within the schema.
Args:
schema: JSON schema dictionary with references
Returns:
Schema with resolved references
"""
@classmethod
def schema(cls, *args, **kwargs) -> Dict[str, Any]:
"""
Generates schema with post-processing transformations.
Returns:
Processed JSON schema dictionary
"""from source_s3.v4 import Config
# Create configuration with AWS credentials
config = Config(
bucket="my-data-bucket",
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region_name="us-east-1",
delivery_method=DeliverRecords()
)from source_s3.v4 import Config
# Configure with IAM role assumption
config = Config(
bucket="secure-bucket",
role_arn="arn:aws:iam::123456789012:role/S3AccessRole",
region_name="us-west-2",
delivery_method=DeliverRawFiles()
)from source_s3.v4 import Config
# Configure for MinIO or other S3-compatible service
config = Config(
bucket="minio-bucket",
endpoint="https://minio.example.com",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
delivery_method=DeliverRecords()
)from source_s3.v4 import LegacyConfigTransformer
from source_s3.source import SourceS3Spec
# Transform V3 config to V4
legacy_spec = SourceS3Spec(...) # V3 configuration
v4_config = LegacyConfigTransformer.convert(legacy_spec)from source_s3.v4 import Config
# Generate configuration schema
schema = Config.schema()
print(schema) # JSON schema for the configurationInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-s3