0
# Utilities and Error Handling
1
2
Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.
3
4
## Capabilities
5
6
### Process Management
7
8
Functions for running operations in external processes with timeout management and multiprocessing support.
9
10
```python { .api }
11
def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]:
12
"""
13
Runs a function in an external process with timeout management.
14
15
Args:
16
fn: Function to execute in external process
17
timeout: Initial timeout in seconds
18
max_timeout: Maximum timeout in seconds
19
logger: Logger instance for operation logging
20
args: List of arguments to pass to the function
21
22
Returns:
23
Dictionary containing the function result and execution metadata
24
25
Raises:
26
TimeoutError: If function execution exceeds timeout limits
27
ProcessError: If external process fails or crashes
28
"""
29
30
def multiprocess_queuer(func: Callable, queue: mp.Queue, *args, **kwargs) -> None:
31
"""
32
Multiprocessor helper function for queue-based operations.
33
34
Args:
35
func: Function to execute
36
queue: Multiprocessing queue for result communication
37
*args: Positional arguments for the function
38
**kwargs: Keyword arguments for the function
39
"""
40
```
41
42
### Data Processing
43
44
Utility functions for data handling and processing operations.
45
46
```python { .api }
47
def get_value_or_json_if_empty_string(options: str = None) -> str:
48
"""
49
Returns the provided options string or empty JSON if string is empty.
50
51
Args:
52
options: Options string to process, defaults to None
53
54
Returns:
55
Original options string or "{}" if empty/None
56
"""
57
```
58
59
### S3 Client Configuration
60
61
Helper functions for S3 client configuration and compatibility.
62
63
```python { .api }
64
def _get_s3_compatible_client_args(config: Config) -> dict:
65
"""
66
Returns configuration for S3-compatible client creation.
67
68
Args:
69
config: Configuration object with S3 settings
70
71
Returns:
72
Dictionary with client configuration parameters for S3-compatible services
73
74
Raises:
75
ValueError: If configuration is invalid for S3-compatible services
76
"""
77
```
78
79
### Message Serialization
80
81
Functions for converting between Airbyte message objects and JSON representations.
82
83
```python { .api }
84
def airbyte_message_to_json(message: AirbyteMessage, *, newline: bool = False) -> str:
85
"""
86
Converts an AirbyteMessage object to JSON string representation.
87
88
Args:
89
message: AirbyteMessage object to serialize
90
newline: Whether to append newline character to JSON string
91
92
Returns:
93
JSON string representation of the Airbyte message
94
95
Raises:
96
SerializationError: If message cannot be serialized to JSON
97
"""
98
99
def airbyte_message_from_json(message_json: str) -> AirbyteMessage:
100
"""
101
Creates an AirbyteMessage object from JSON string.
102
103
Args:
104
message_json: JSON string representation of Airbyte message
105
106
Returns:
107
AirbyteMessage object created from JSON
108
109
Raises:
110
DeserializationError: If JSON cannot be parsed into AirbyteMessage
111
ValidationError: If JSON structure is invalid for AirbyteMessage
112
"""
113
```
114
115
### Error Handling
116
117
Custom exception class for S3-specific error handling with file context information.
118
119
```python { .api }
120
class S3Exception(AirbyteTracedException):
121
"""
122
S3-specific exception handling with file context.
123
124
Inherits from AirbyteTracedException to provide structured error reporting
125
within the Airbyte framework with specific context about S3 operations.
126
"""
127
128
def __init__(
129
self,
130
file_info: Union[List[FileInfo], FileInfo],
131
internal_message: Optional[str] = None,
132
message: Optional[str] = None,
133
failure_type: FailureType = FailureType.system_error,
134
exception: BaseException = None
135
):
136
"""
137
Initialize S3Exception with file context and error details.
138
139
Args:
140
file_info: File information providing context for the error.
141
Can be a single FileInfo object or list of FileInfo objects.
142
internal_message: Internal error message for debugging and logging.
143
Not displayed to end users.
144
message: User-facing error message. If not provided, a default
145
message will be generated based on the error context.
146
failure_type: Type of failure (system_error, config_error, etc.).
147
Defaults to system_error for S3 operations.
148
exception: Original exception that caused this S3Exception.
149
Used for error chaining and detailed debugging.
150
151
Example:
152
try:
153
# S3 operation
154
pass
155
except ClientError as e:
156
raise S3Exception(
157
file_info=current_file,
158
internal_message=f"S3 client error: {e}",
159
message="Failed to access S3 file",
160
exception=e
161
)
162
"""
163
```
164
165
## Usage Examples
166
167
### External Process Execution
168
169
```python
170
from source_s3.utils import run_in_external_process
171
import logging
172
173
logger = logging.getLogger(__name__)
174
175
def expensive_operation(data):
176
# Some CPU-intensive operation
177
return {"processed": len(data), "result": "success"}
178
179
# Run in external process with timeout
180
try:
181
result = run_in_external_process(
182
fn=expensive_operation,
183
timeout=30,
184
max_timeout=120,
185
logger=logger,
186
args=[large_dataset]
187
)
188
print(f"Operation completed: {result}")
189
except TimeoutError:
190
print("Operation timed out")
191
```
192
193
### Multiprocessing Queue Operations
194
195
```python
196
from source_s3.utils import multiprocess_queuer
197
import multiprocessing as mp
198
199
def worker_function(data, multiplier=2):
200
return data * multiplier
201
202
# Set up multiprocessing
203
queue = mp.Queue()
204
process = mp.Process(
205
target=multiprocess_queuer,
206
args=(worker_function, queue, 10),
207
kwargs={"multiplier": 3}
208
)
209
210
process.start()
211
result = queue.get()
212
process.join()
213
```
214
215
### Data Processing Utilities
216
217
```python
218
from source_s3.utils import get_value_or_json_if_empty_string
219
220
# Handle optional configuration
221
options = get_value_or_json_if_empty_string("") # Returns "{}"
222
options = get_value_or_json_if_empty_string(None) # Returns "{}"
223
options = get_value_or_json_if_empty_string('{"key": "value"}') # Returns '{"key": "value"}'
224
```
225
226
### S3 Client Configuration Utilities
227
228
```python
229
from source_s3.v4.stream_reader import _get_s3_compatible_client_args
230
from source_s3.v4 import Config
231
232
# Configure for S3-compatible service
233
config = Config(
234
bucket="minio-bucket",
235
endpoint="https://minio.example.com",
236
aws_access_key_id="minioadmin",
237
aws_secret_access_key="minioadmin"
238
)
239
240
# Get S3-compatible client arguments
241
client_args = _get_s3_compatible_client_args(config)
242
print(client_args) # Returns dict with endpoint_url, use_ssl, etc.
243
244
# Use with boto3 client
245
import boto3
246
s3_client = boto3.client(
247
"s3",
248
aws_access_key_id=config.aws_access_key_id,
249
aws_secret_access_key=config.aws_secret_access_key,
250
**client_args
251
)
252
```
253
254
### Message Serialization
255
256
```python
257
from source_s3.utils import airbyte_message_to_json, airbyte_message_from_json
258
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage
259
260
# Create Airbyte message
261
record = AirbyteRecordMessage(
262
stream="users",
263
data={"id": 1, "name": "John"},
264
emitted_at=1234567890
265
)
266
message = AirbyteMessage(type="RECORD", record=record)
267
268
# Serialize to JSON
269
json_str = airbyte_message_to_json(message, newline=True)
270
print(json_str)
271
272
# Deserialize from JSON
273
reconstructed_message = airbyte_message_from_json(json_str)
274
assert reconstructed_message.record.data["name"] == "John"
275
```
276
277
### S3 Exception Handling
278
279
```python
280
from source_s3.exceptions import S3Exception
281
from source_s3.utils import FileInfo
282
from botocore.exceptions import ClientError
283
284
def process_s3_file(file_info: FileInfo):
285
try:
286
# S3 operation that might fail
287
s3_client.get_object(Bucket="bucket", Key="key")
288
except ClientError as e:
289
error_code = e.response["Error"]["Code"]
290
291
if error_code == "NoSuchKey":
292
raise S3Exception(
293
file_info=file_info,
294
internal_message=f"S3 key not found: {e}",
295
message="The requested file was not found in S3",
296
failure_type=FailureType.config_error,
297
exception=e
298
)
299
elif error_code == "AccessDenied":
300
raise S3Exception(
301
file_info=file_info,
302
internal_message=f"S3 access denied: {e}",
303
message="Access denied to S3 resource. Check your credentials and permissions",
304
failure_type=FailureType.config_error,
305
exception=e
306
)
307
else:
308
raise S3Exception(
309
file_info=file_info,
310
internal_message=f"Unexpected S3 error: {e}",
311
message="An unexpected error occurred while accessing S3",
312
exception=e
313
)
314
315
# Usage with multiple files
316
try:
317
for file_info in file_list:
318
process_s3_file(file_info)
319
except S3Exception as e:
320
logger.error(f"S3 error processing files: {e}")
321
# Error includes file context for debugging
322
```
323
324
### Error Context with Multiple Files
325
326
```python
327
from source_s3.exceptions import S3Exception
328
329
def batch_process_files(file_list: List[FileInfo]):
330
try:
331
# Batch operation that might fail
332
process_multiple_s3_files(file_list)
333
except Exception as e:
334
# Create exception with context for all affected files
335
raise S3Exception(
336
file_info=file_list, # Pass entire list for context
337
internal_message=f"Batch processing failed: {e}",
338
message=f"Failed to process {len(file_list)} files from S3",
339
exception=e
340
)
341
```
342
343
## Types
344
345
```python { .api }
346
# Process management types
347
class ProcessError(Exception):
348
"""Exception raised when external process fails"""
349
pass
350
351
class TimeoutError(Exception):
352
"""Exception raised when process execution times out"""
353
pass
354
355
# Message serialization types
356
class SerializationError(Exception):
357
"""Exception raised when message serialization fails"""
358
pass
359
360
class DeserializationError(Exception):
361
"""Exception raised when JSON deserialization fails"""
362
pass
363
364
class ValidationError(Exception):
365
"""Exception raised when message validation fails"""
366
pass
367
368
# Error handling types
369
class FailureType:
370
"""Enumeration of failure types for error classification"""
371
system_error: str
372
config_error: str
373
transient_error: str
374
375
class FileInfo:
376
"""File information for error context"""
377
path: str
378
size: Optional[int]
379
last_modified: Optional[datetime]
380
381
class AirbyteMessage:
382
"""Airbyte message object for data synchronization"""
383
type: str
384
record: Optional[AirbyteRecordMessage]
385
state: Optional[AirbyteStateMessage]
386
log: Optional[AirbyteLogMessage]
387
388
class AirbyteTracedException(Exception):
389
"""Base exception class for Airbyte connectors"""
390
pass
391
```