or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-source.mdfile-formats.mdindex.mdstream-operations.mdutilities.mdzip-support.md

utilities.mddocs/

0

# Utilities and Error Handling

1

2

Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.

3

4

## Capabilities

5

6

### Process Management

7

8

Functions for running operations in external processes with timeout management and multiprocessing support.

9

10

```python { .api }

11

def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]:

12

"""

13

Runs a function in an external process with timeout management.

14

15

Args:

16

fn: Function to execute in external process

17

timeout: Initial timeout in seconds

18

max_timeout: Maximum timeout in seconds

19

logger: Logger instance for operation logging

20

args: List of arguments to pass to the function

21

22

Returns:

23

Dictionary containing the function result and execution metadata

24

25

Raises:

26

TimeoutError: If function execution exceeds timeout limits

27

ProcessError: If external process fails or crashes

28

"""

29

30

def multiprocess_queuer(func: Callable, queue: mp.Queue, *args, **kwargs) -> None:

31

"""

32

Multiprocessor helper function for queue-based operations.

33

34

Args:

35

func: Function to execute

36

queue: Multiprocessing queue for result communication

37

*args: Positional arguments for the function

38

**kwargs: Keyword arguments for the function

39

"""

40

```

41

42

### Data Processing

43

44

Utility functions for data handling and processing operations.

45

46

```python { .api }

47

def get_value_or_json_if_empty_string(options: str = None) -> str:

48

"""

49

Returns the provided options string or empty JSON if string is empty.

50

51

Args:

52

options: Options string to process, defaults to None

53

54

Returns:

55

Original options string or "{}" if empty/None

56

"""

57

```

58

59

### S3 Client Configuration

60

61

Helper functions for S3 client configuration and compatibility.

62

63

```python { .api }

64

def _get_s3_compatible_client_args(config: Config) -> dict:

65

"""

66

Returns configuration for S3-compatible client creation.

67

68

Args:

69

config: Configuration object with S3 settings

70

71

Returns:

72

Dictionary with client configuration parameters for S3-compatible services

73

74

Raises:

75

ValueError: If configuration is invalid for S3-compatible services

76

"""

77

```

78

79

### Message Serialization

80

81

Functions for converting between Airbyte message objects and JSON representations.

82

83

```python { .api }

84

def airbyte_message_to_json(message: AirbyteMessage, *, newline: bool = False) -> str:

85

"""

86

Converts an AirbyteMessage object to JSON string representation.

87

88

Args:

89

message: AirbyteMessage object to serialize

90

newline: Whether to append newline character to JSON string

91

92

Returns:

93

JSON string representation of the Airbyte message

94

95

Raises:

96

SerializationError: If message cannot be serialized to JSON

97

"""

98

99

def airbyte_message_from_json(message_json: str) -> AirbyteMessage:

100

"""

101

Creates an AirbyteMessage object from JSON string.

102

103

Args:

104

message_json: JSON string representation of Airbyte message

105

106

Returns:

107

AirbyteMessage object created from JSON

108

109

Raises:

110

DeserializationError: If JSON cannot be parsed into AirbyteMessage

111

ValidationError: If JSON structure is invalid for AirbyteMessage

112

"""

113

```

114

115

### Error Handling

116

117

Custom exception class for S3-specific error handling with file context information.

118

119

```python { .api }

120

class S3Exception(AirbyteTracedException):

121

"""

122

S3-specific exception handling with file context.

123

124

Inherits from AirbyteTracedException to provide structured error reporting

125

within the Airbyte framework with specific context about S3 operations.

126

"""

127

128

def __init__(

129

self,

130

file_info: Union[List[FileInfo], FileInfo],

131

internal_message: Optional[str] = None,

132

message: Optional[str] = None,

133

failure_type: FailureType = FailureType.system_error,

134

exception: BaseException = None

135

):

136

"""

137

Initialize S3Exception with file context and error details.

138

139

Args:

140

file_info: File information providing context for the error.

141

Can be a single FileInfo object or list of FileInfo objects.

142

internal_message: Internal error message for debugging and logging.

143

Not displayed to end users.

144

message: User-facing error message. If not provided, a default

145

message will be generated based on the error context.

146

failure_type: Type of failure (system_error, config_error, etc.).

147

Defaults to system_error for S3 operations.

148

exception: Original exception that caused this S3Exception.

149

Used for error chaining and detailed debugging.

150

151

Example:

152

try:

153

# S3 operation

154

pass

155

except ClientError as e:

156

raise S3Exception(

157

file_info=current_file,

158

internal_message=f"S3 client error: {e}",

159

message="Failed to access S3 file",

160

exception=e

161

)

162

"""

163

```

164

165

## Usage Examples

166

167

### External Process Execution

168

169

```python

170

from source_s3.utils import run_in_external_process

171

import logging

172

173

logger = logging.getLogger(__name__)

174

175

def expensive_operation(data):

176

# Some CPU-intensive operation

177

return {"processed": len(data), "result": "success"}

178

179

# Run in external process with timeout

180

try:

181

result = run_in_external_process(

182

fn=expensive_operation,

183

timeout=30,

184

max_timeout=120,

185

logger=logger,

186

args=[large_dataset]

187

)

188

print(f"Operation completed: {result}")

189

except TimeoutError:

190

print("Operation timed out")

191

```

192

193

### Multiprocessing Queue Operations

194

195

```python

196

from source_s3.utils import multiprocess_queuer

197

import multiprocessing as mp

198

199

def worker_function(data, multiplier=2):

200

return data * multiplier

201

202

# Set up multiprocessing

203

queue = mp.Queue()

204

process = mp.Process(

205

target=multiprocess_queuer,

206

args=(worker_function, queue, 10),

207

kwargs={"multiplier": 3}

208

)

209

210

process.start()

211

result = queue.get()

212

process.join()

213

```

214

215

### Data Processing Utilities

216

217

```python

218

from source_s3.utils import get_value_or_json_if_empty_string

219

220

# Handle optional configuration

221

options = get_value_or_json_if_empty_string("") # Returns "{}"

222

options = get_value_or_json_if_empty_string(None) # Returns "{}"

223

options = get_value_or_json_if_empty_string('{"key": "value"}') # Returns '{"key": "value"}'

224

```

225

226

### S3 Client Configuration Utilities

227

228

```python

229

from source_s3.v4.stream_reader import _get_s3_compatible_client_args

230

from source_s3.v4 import Config

231

232

# Configure for S3-compatible service

233

config = Config(

234

bucket="minio-bucket",

235

endpoint="https://minio.example.com",

236

aws_access_key_id="minioadmin",

237

aws_secret_access_key="minioadmin"

238

)

239

240

# Get S3-compatible client arguments

241

client_args = _get_s3_compatible_client_args(config)

242

print(client_args) # Returns dict with endpoint_url, use_ssl, etc.

243

244

# Use with boto3 client

245

import boto3

246

s3_client = boto3.client(

247

"s3",

248

aws_access_key_id=config.aws_access_key_id,

249

aws_secret_access_key=config.aws_secret_access_key,

250

**client_args

251

)

252

```

253

254

### Message Serialization

255

256

```python

257

from source_s3.utils import airbyte_message_to_json, airbyte_message_from_json

258

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage

259

260

# Create Airbyte message

261

record = AirbyteRecordMessage(

262

stream="users",

263

data={"id": 1, "name": "John"},

264

emitted_at=1234567890

265

)

266

message = AirbyteMessage(type="RECORD", record=record)

267

268

# Serialize to JSON

269

json_str = airbyte_message_to_json(message, newline=True)

270

print(json_str)

271

272

# Deserialize from JSON

273

reconstructed_message = airbyte_message_from_json(json_str)

274

assert reconstructed_message.record.data["name"] == "John"

275

```

276

277

### S3 Exception Handling

278

279

```python

280

from source_s3.exceptions import S3Exception

281

from source_s3.utils import FileInfo

282

from botocore.exceptions import ClientError

283

284

def process_s3_file(file_info: FileInfo):

285

try:

286

# S3 operation that might fail

287

s3_client.get_object(Bucket="bucket", Key="key")

288

except ClientError as e:

289

error_code = e.response["Error"]["Code"]

290

291

if error_code == "NoSuchKey":

292

raise S3Exception(

293

file_info=file_info,

294

internal_message=f"S3 key not found: {e}",

295

message="The requested file was not found in S3",

296

failure_type=FailureType.config_error,

297

exception=e

298

)

299

elif error_code == "AccessDenied":

300

raise S3Exception(

301

file_info=file_info,

302

internal_message=f"S3 access denied: {e}",

303

message="Access denied to S3 resource. Check your credentials and permissions",

304

failure_type=FailureType.config_error,

305

exception=e

306

)

307

else:

308

raise S3Exception(

309

file_info=file_info,

310

internal_message=f"Unexpected S3 error: {e}",

311

message="An unexpected error occurred while accessing S3",

312

exception=e

313

)

314

315

# Usage with multiple files

316

try:

317

for file_info in file_list:

318

process_s3_file(file_info)

319

except S3Exception as e:

320

logger.error(f"S3 error processing files: {e}")

321

# Error includes file context for debugging

322

```

323

324

### Error Context with Multiple Files

325

326

```python

327

from source_s3.exceptions import S3Exception

328

329

def batch_process_files(file_list: List[FileInfo]):

330

try:

331

# Batch operation that might fail

332

process_multiple_s3_files(file_list)

333

except Exception as e:

334

# Create exception with context for all affected files

335

raise S3Exception(

336

file_info=file_list, # Pass entire list for context

337

internal_message=f"Batch processing failed: {e}",

338

message=f"Failed to process {len(file_list)} files from S3",

339

exception=e

340

)

341

```

342

343

## Types

344

345

```python { .api }

346

# Process management types

347

class ProcessError(Exception):

348

"""Exception raised when external process fails"""

349

pass

350

351

class TimeoutError(Exception):

352

"""Exception raised when process execution times out"""

353

pass

354

355

# Message serialization types

356

class SerializationError(Exception):

357

"""Exception raised when message serialization fails"""

358

pass

359

360

class DeserializationError(Exception):

361

"""Exception raised when JSON deserialization fails"""

362

pass

363

364

class ValidationError(Exception):

365

"""Exception raised when message validation fails"""

366

pass

367

368

# Error handling types

369

class FailureType:

370

"""Enumeration of failure types for error classification"""

371

system_error: str

372

config_error: str

373

transient_error: str

374

375

class FileInfo:

376

"""File information for error context"""

377

path: str

378

size: Optional[int]

379

last_modified: Optional[datetime]

380

381

class AirbyteMessage:

382

"""Airbyte message object for data synchronization"""

383

type: str

384

record: Optional[AirbyteRecordMessage]

385

state: Optional[AirbyteStateMessage]

386

log: Optional[AirbyteLogMessage]

387

388

class AirbyteTracedException(Exception):

389

"""Base exception class for Airbyte connectors"""

390

pass

391

```