S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Main source functionality providing the primary entry points and core operations for the Airbyte S3 connector. These functions handle connector launching, configuration management, and specification generation.
Main entry functions that launch the S3 connector and handle command-line execution.
def run() -> None:
"""
Main entry function that launches the SourceS3 connector.
Called when running the connector as a standalone application.
This function calls SourceS3.launch() to start the connector.
"""Main source implementation inheriting from FileBasedSource, providing the core connector functionality.
class SourceS3(FileBasedSource):
"""
Main S3 source implementation using the file-based CDK.
Attributes:
_concurrency_level: Set to DEFAULT_CONCURRENCY (10)
"""
@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]:
"""
Reads and transforms legacy V3 configurations to V4 format.
Args:
config_path: Path to the configuration file
Returns:
Configuration dictionary with V4 format
"""
@classmethod
def spec(cls, *args, **kwargs) -> ConnectorSpecification:
"""
Generates the connector specification schema.
Returns:
ConnectorSpecification object defining the connector's configuration schema
"""
@classmethod
def launch(cls, args: list[str] | None = None) -> None:
"""
Launches the source connector with optional command-line arguments.
Args:
args: Optional list of command-line arguments
"""
@classmethod
def create(cls, *, configured_catalog_path: Path | str | None = None) -> SourceS3:
"""
Creates a new SourceS3 instance.
Args:
configured_catalog_path: Optional path to the configured catalog
Returns:
New SourceS3 instance
"""Support for backward compatibility with V3 configurations through the SourceS3Spec class.
class SourceS3Spec(SourceFilesAbstractSpec, BaseModel):
"""
Legacy V3 configuration specification for backward compatibility.
Contains nested S3Provider class for provider-specific settings.
"""
class S3Provider:
"""Provider-specific configuration fields for S3 access"""
bucket: str
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
role_arn: Optional[str]
path_prefix: str
endpoint: str
region_name: Optional[str]
start_date: Optional[str]from source_s3.run import run
# Launch the connector with default configuration
run()from source_s3.v4 import SourceS3
from source_s3.source import SourceS3Spec
from pathlib import Path
# Create source instance
source = SourceS3.create(configured_catalog_path="path/to/catalog.json")
# Launch with custom arguments
SourceS3.launch(args=["--config", "config.json", "--catalog", "catalog.json"])from source_s3.v4 import SourceS3
# Read and transform configuration
config = SourceS3.read_config("config.json")
print(config) # V4 format configurationInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-s3