S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.
Functions for running operations in external processes with timeout management and multiprocessing support.
def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]:
"""
Runs a function in an external process with timeout management.
Args:
fn: Function to execute in external process
timeout: Initial timeout in seconds
max_timeout: Maximum timeout in seconds
logger: Logger instance for operation logging
args: List of arguments to pass to the function
Returns:
Dictionary containing the function result and execution metadata
Raises:
TimeoutError: If function execution exceeds timeout limits
ProcessError: If external process fails or crashes
"""
def multiprocess_queuer(func: Callable, queue: mp.Queue, *args, **kwargs) -> None:
"""
Multiprocessor helper function for queue-based operations.
Args:
func: Function to execute
queue: Multiprocessing queue for result communication
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
"""Utility functions for data handling and processing operations.
def get_value_or_json_if_empty_string(options: str = None) -> str:
"""
Returns the provided options string or empty JSON if string is empty.
Args:
options: Options string to process, defaults to None
Returns:
Original options string or "{}" if empty/None
"""Helper functions for S3 client configuration and compatibility.
def _get_s3_compatible_client_args(config: Config) -> dict:
"""
Returns configuration for S3-compatible client creation.
Args:
config: Configuration object with S3 settings
Returns:
Dictionary with client configuration parameters for S3-compatible services
Raises:
ValueError: If configuration is invalid for S3-compatible services
"""Functions for converting between Airbyte message objects and JSON representations.
def airbyte_message_to_json(message: AirbyteMessage, *, newline: bool = False) -> str:
"""
Converts an AirbyteMessage object to JSON string representation.
Args:
message: AirbyteMessage object to serialize
newline: Whether to append newline character to JSON string
Returns:
JSON string representation of the Airbyte message
Raises:
SerializationError: If message cannot be serialized to JSON
"""
def airbyte_message_from_json(message_json: str) -> AirbyteMessage:
"""
Creates an AirbyteMessage object from JSON string.
Args:
message_json: JSON string representation of Airbyte message
Returns:
AirbyteMessage object created from JSON
Raises:
DeserializationError: If JSON cannot be parsed into AirbyteMessage
ValidationError: If JSON structure is invalid for AirbyteMessage
"""Custom exception class for S3-specific error handling with file context information.
class S3Exception(AirbyteTracedException):
"""
S3-specific exception handling with file context.
Inherits from AirbyteTracedException to provide structured error reporting
within the Airbyte framework with specific context about S3 operations.
"""
def __init__(
self,
file_info: Union[List[FileInfo], FileInfo],
internal_message: Optional[str] = None,
message: Optional[str] = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None
):
"""
Initialize S3Exception with file context and error details.
Args:
file_info: File information providing context for the error.
Can be a single FileInfo object or list of FileInfo objects.
internal_message: Internal error message for debugging and logging.
Not displayed to end users.
message: User-facing error message. If not provided, a default
message will be generated based on the error context.
failure_type: Type of failure (system_error, config_error, etc.).
Defaults to system_error for S3 operations.
exception: Original exception that caused this S3Exception.
Used for error chaining and detailed debugging.
Example:
try:
# S3 operation
pass
except ClientError as e:
raise S3Exception(
file_info=current_file,
internal_message=f"S3 client error: {e}",
message="Failed to access S3 file",
exception=e
)
"""from source_s3.utils import run_in_external_process
import logging
logger = logging.getLogger(__name__)
def expensive_operation(data):
# Some CPU-intensive operation
return {"processed": len(data), "result": "success"}
# Run in external process with timeout
try:
result = run_in_external_process(
fn=expensive_operation,
timeout=30,
max_timeout=120,
logger=logger,
args=[large_dataset]
)
print(f"Operation completed: {result}")
except TimeoutError:
print("Operation timed out")from source_s3.utils import multiprocess_queuer
import multiprocessing as mp
def worker_function(data, multiplier=2):
return data * multiplier
# Set up multiprocessing
queue = mp.Queue()
process = mp.Process(
target=multiprocess_queuer,
args=(worker_function, queue, 10),
kwargs={"multiplier": 3}
)
process.start()
result = queue.get()
process.join()from source_s3.utils import get_value_or_json_if_empty_string
# Handle optional configuration
options = get_value_or_json_if_empty_string("") # Returns "{}"
options = get_value_or_json_if_empty_string(None) # Returns "{}"
options = get_value_or_json_if_empty_string('{"key": "value"}') # Returns '{"key": "value"}'from source_s3.v4.stream_reader import _get_s3_compatible_client_args
from source_s3.v4 import Config
# Configure for S3-compatible service
config = Config(
bucket="minio-bucket",
endpoint="https://minio.example.com",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin"
)
# Get S3-compatible client arguments
client_args = _get_s3_compatible_client_args(config)
print(client_args) # Returns dict with endpoint_url, use_ssl, etc.
# Use with boto3 client
import boto3
s3_client = boto3.client(
"s3",
aws_access_key_id=config.aws_access_key_id,
aws_secret_access_key=config.aws_secret_access_key,
**client_args
)from source_s3.utils import airbyte_message_to_json, airbyte_message_from_json
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage
# Create Airbyte message
record = AirbyteRecordMessage(
stream="users",
data={"id": 1, "name": "John"},
emitted_at=1234567890
)
message = AirbyteMessage(type="RECORD", record=record)
# Serialize to JSON
json_str = airbyte_message_to_json(message, newline=True)
print(json_str)
# Deserialize from JSON
reconstructed_message = airbyte_message_from_json(json_str)
assert reconstructed_message.record.data["name"] == "John"from source_s3.exceptions import S3Exception
from source_s3.utils import FileInfo
from botocore.exceptions import ClientError
def process_s3_file(file_info: FileInfo):
try:
# S3 operation that might fail
s3_client.get_object(Bucket="bucket", Key="key")
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "NoSuchKey":
raise S3Exception(
file_info=file_info,
internal_message=f"S3 key not found: {e}",
message="The requested file was not found in S3",
failure_type=FailureType.config_error,
exception=e
)
elif error_code == "AccessDenied":
raise S3Exception(
file_info=file_info,
internal_message=f"S3 access denied: {e}",
message="Access denied to S3 resource. Check your credentials and permissions",
failure_type=FailureType.config_error,
exception=e
)
else:
raise S3Exception(
file_info=file_info,
internal_message=f"Unexpected S3 error: {e}",
message="An unexpected error occurred while accessing S3",
exception=e
)
# Usage with multiple files
try:
for file_info in file_list:
process_s3_file(file_info)
except S3Exception as e:
logger.error(f"S3 error processing files: {e}")
# Error includes file context for debuggingfrom source_s3.exceptions import S3Exception
def batch_process_files(file_list: List[FileInfo]):
try:
# Batch operation that might fail
process_multiple_s3_files(file_list)
except Exception as e:
# Create exception with context for all affected files
raise S3Exception(
file_info=file_list, # Pass entire list for context
internal_message=f"Batch processing failed: {e}",
message=f"Failed to process {len(file_list)} files from S3",
exception=e
)# Process management types
class ProcessError(Exception):
"""Exception raised when external process fails"""
pass
class TimeoutError(Exception):
"""Exception raised when process execution times out"""
pass
# Message serialization types
class SerializationError(Exception):
"""Exception raised when message serialization fails"""
pass
class DeserializationError(Exception):
"""Exception raised when JSON deserialization fails"""
pass
class ValidationError(Exception):
"""Exception raised when message validation fails"""
pass
# Error handling types
class FailureType:
"""Enumeration of failure types for error classification"""
system_error: str
config_error: str
transient_error: str
class FileInfo:
"""File information for error context"""
path: str
size: Optional[int]
last_modified: Optional[datetime]
class AirbyteMessage:
"""Airbyte message object for data synchronization"""
type: str
record: Optional[AirbyteRecordMessage]
state: Optional[AirbyteStateMessage]
log: Optional[AirbyteLogMessage]
class AirbyteTracedException(Exception):
"""Base exception class for Airbyte connectors"""
passInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-s3