CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-s3

S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

utilities.mddocs/

Utilities and Error Handling

Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.

Capabilities

Process Management

Functions for running operations in external processes with timeout management and multiprocessing support.

def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]:
    """
    Runs a function in an external process with timeout management.
    
    Args:
        fn: Function to execute in external process
        timeout: Initial timeout in seconds
        max_timeout: Maximum timeout in seconds
        logger: Logger instance for operation logging
        args: List of arguments to pass to the function
        
    Returns:
        Dictionary containing the function result and execution metadata
        
    Raises:
        TimeoutError: If function execution exceeds timeout limits
        ProcessError: If external process fails or crashes
    """

def multiprocess_queuer(func: Callable, queue: mp.Queue, *args, **kwargs) -> None:
    """
    Multiprocessor helper function for queue-based operations.
    
    Args:
        func: Function to execute
        queue: Multiprocessing queue for result communication
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function
    """

Data Processing

Utility functions for data handling and processing operations.

def get_value_or_json_if_empty_string(options: str = None) -> str:
    """
    Returns the provided options string or empty JSON if string is empty.
    
    Args:
        options: Options string to process, defaults to None
        
    Returns:
        Original options string or "{}" if empty/None
    """

S3 Client Configuration

Helper functions for S3 client configuration and compatibility.

def _get_s3_compatible_client_args(config: Config) -> dict:
    """
    Returns configuration for S3-compatible client creation.
    
    Args:
        config: Configuration object with S3 settings
        
    Returns:
        Dictionary with client configuration parameters for S3-compatible services
        
    Raises:
        ValueError: If configuration is invalid for S3-compatible services
    """

Message Serialization

Functions for converting between Airbyte message objects and JSON representations.

def airbyte_message_to_json(message: AirbyteMessage, *, newline: bool = False) -> str:
    """
    Converts an AirbyteMessage object to JSON string representation.
    
    Args:
        message: AirbyteMessage object to serialize
        newline: Whether to append newline character to JSON string
        
    Returns:
        JSON string representation of the Airbyte message
        
    Raises:
        SerializationError: If message cannot be serialized to JSON
    """

def airbyte_message_from_json(message_json: str) -> AirbyteMessage:
    """
    Creates an AirbyteMessage object from JSON string.
    
    Args:
        message_json: JSON string representation of Airbyte message
        
    Returns:
        AirbyteMessage object created from JSON
        
    Raises:
        DeserializationError: If JSON cannot be parsed into AirbyteMessage
        ValidationError: If JSON structure is invalid for AirbyteMessage
    """

Error Handling

Custom exception class for S3-specific error handling with file context information.

class S3Exception(AirbyteTracedException):
    """
    S3-specific exception handling with file context.
    
    Inherits from AirbyteTracedException to provide structured error reporting
    within the Airbyte framework with specific context about S3 operations.
    """
    
    def __init__(
        self,
        file_info: Union[List[FileInfo], FileInfo],
        internal_message: Optional[str] = None,
        message: Optional[str] = None, 
        failure_type: FailureType = FailureType.system_error,
        exception: BaseException = None
    ):
        """
        Initialize S3Exception with file context and error details.
        
        Args:
            file_info: File information providing context for the error.
                      Can be a single FileInfo object or list of FileInfo objects.
            internal_message: Internal error message for debugging and logging.
                             Not displayed to end users.
            message: User-facing error message. If not provided, a default
                    message will be generated based on the error context.
            failure_type: Type of failure (system_error, config_error, etc.).
                         Defaults to system_error for S3 operations.
            exception: Original exception that caused this S3Exception.
                      Used for error chaining and detailed debugging.
                      
        Example:
            try:
                # S3 operation
                pass
            except ClientError as e:
                raise S3Exception(
                    file_info=current_file,
                    internal_message=f"S3 client error: {e}",
                    message="Failed to access S3 file",
                    exception=e
                )
        """

Usage Examples

External Process Execution

from source_s3.utils import run_in_external_process
import logging

logger = logging.getLogger(__name__)

def expensive_operation(data):
    # Some CPU-intensive operation
    return {"processed": len(data), "result": "success"}

# Run in external process with timeout
try:
    result = run_in_external_process(
        fn=expensive_operation,
        timeout=30,
        max_timeout=120,
        logger=logger,
        args=[large_dataset]
    )
    print(f"Operation completed: {result}")
except TimeoutError:
    print("Operation timed out")

Multiprocessing Queue Operations

from source_s3.utils import multiprocess_queuer
import multiprocessing as mp

def worker_function(data, multiplier=2):
    return data * multiplier

# Set up multiprocessing
queue = mp.Queue()
process = mp.Process(
    target=multiprocess_queuer,
    args=(worker_function, queue, 10),
    kwargs={"multiplier": 3}
)

process.start()
result = queue.get()
process.join()

Data Processing Utilities

from source_s3.utils import get_value_or_json_if_empty_string

# Handle optional configuration
options = get_value_or_json_if_empty_string("")  # Returns "{}"
options = get_value_or_json_if_empty_string(None)  # Returns "{}"
options = get_value_or_json_if_empty_string('{"key": "value"}')  # Returns '{"key": "value"}'

S3 Client Configuration Utilities

from source_s3.v4.stream_reader import _get_s3_compatible_client_args
from source_s3.v4 import Config

# Configure for S3-compatible service
config = Config(
    bucket="minio-bucket",
    endpoint="https://minio.example.com",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin"
)

# Get S3-compatible client arguments
client_args = _get_s3_compatible_client_args(config)
print(client_args)  # Returns dict with endpoint_url, use_ssl, etc.

# Use with boto3 client
import boto3
s3_client = boto3.client(
    "s3",
    aws_access_key_id=config.aws_access_key_id,
    aws_secret_access_key=config.aws_secret_access_key,
    **client_args
)

Message Serialization

from source_s3.utils import airbyte_message_to_json, airbyte_message_from_json
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage

# Create Airbyte message
record = AirbyteRecordMessage(
    stream="users",
    data={"id": 1, "name": "John"},
    emitted_at=1234567890
)
message = AirbyteMessage(type="RECORD", record=record)

# Serialize to JSON
json_str = airbyte_message_to_json(message, newline=True)
print(json_str)

# Deserialize from JSON
reconstructed_message = airbyte_message_from_json(json_str)
assert reconstructed_message.record.data["name"] == "John"

S3 Exception Handling

from source_s3.exceptions import S3Exception
from source_s3.utils import FileInfo
from botocore.exceptions import ClientError

def process_s3_file(file_info: FileInfo):
    try:
        # S3 operation that might fail
        s3_client.get_object(Bucket="bucket", Key="key")
    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        
        if error_code == "NoSuchKey":
            raise S3Exception(
                file_info=file_info,
                internal_message=f"S3 key not found: {e}",
                message="The requested file was not found in S3",
                failure_type=FailureType.config_error,
                exception=e
            )
        elif error_code == "AccessDenied":
            raise S3Exception(
                file_info=file_info,
                internal_message=f"S3 access denied: {e}",
                message="Access denied to S3 resource. Check your credentials and permissions",
                failure_type=FailureType.config_error,
                exception=e
            )
        else:
            raise S3Exception(
                file_info=file_info,
                internal_message=f"Unexpected S3 error: {e}",
                message="An unexpected error occurred while accessing S3",
                exception=e
            )

# Usage with multiple files
try:
    for file_info in file_list:
        process_s3_file(file_info)
except S3Exception as e:
    logger.error(f"S3 error processing files: {e}")
    # Error includes file context for debugging

Error Context with Multiple Files

from source_s3.exceptions import S3Exception

def batch_process_files(file_list: List[FileInfo]):
    try:
        # Batch operation that might fail
        process_multiple_s3_files(file_list)
    except Exception as e:
        # Create exception with context for all affected files
        raise S3Exception(
            file_info=file_list,  # Pass entire list for context
            internal_message=f"Batch processing failed: {e}",
            message=f"Failed to process {len(file_list)} files from S3",
            exception=e
        )

Types

# Process management types
class ProcessError(Exception):
    """Exception raised when external process fails"""
    pass

class TimeoutError(Exception):
    """Exception raised when process execution times out"""
    pass

# Message serialization types  
class SerializationError(Exception):
    """Exception raised when message serialization fails"""
    pass

class DeserializationError(Exception):
    """Exception raised when JSON deserialization fails"""
    pass

class ValidationError(Exception):
    """Exception raised when message validation fails"""
    pass

# Error handling types
class FailureType:
    """Enumeration of failure types for error classification"""
    system_error: str
    config_error: str
    transient_error: str

class FileInfo:
    """File information for error context"""
    path: str
    size: Optional[int]
    last_modified: Optional[datetime]

class AirbyteMessage:
    """Airbyte message object for data synchronization"""
    type: str
    record: Optional[AirbyteRecordMessage]
    state: Optional[AirbyteStateMessage]
    log: Optional[AirbyteLogMessage]

class AirbyteTracedException(Exception):  
    """Base exception class for Airbyte connectors"""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-s3

docs

configuration.md

core-source.md

file-formats.md

index.md

stream-operations.md

utilities.md

zip-support.md

tile.json